使用Apache Flink从Web获取JSON元素

在阅读了Apache Flink(官方文档,dataartisans)的几个文档页面以及官方存储库中提供的示例之后,我一直看到它们用作数据源的示例,用于流式处理已下载的文件,并始终连接到本地主机。

我正在尝试使用Apache Flink下载包含动态数据的JSON文件。 我的目的是试图建立我可以访问JSON文件作为Apache Flink的输入源的URL,而不是使用其他系统下载并使用Apache Flink处理下载的文件。

是否有可能建立与Apache Flink的这个网络连接?


您可以将要下载的URL定义为输入DataStream ,然后从MapFunction下载文档。 以下代码演示了这一点:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html");

inputURLs.map(new MapFunction<String, String>() {
    @Override
    public String map(String s) throws Exception {
        URL url = new URL(s);
        InputStream is = url.openStream();

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));

        StringBuilder builder = new StringBuilder();
        String line;

        try {
            while ((line = bufferedReader.readLine()) != null) {
                builder.append(line + "n");
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        try {
            bufferedReader.close();
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        return builder.toString();
    }
}).print();

env.execute("URL download job");
链接地址: http://www.djcxy.com/p/90473.html

上一篇: Get JSON elements from a web with Apache Flink

下一篇: Android ImageReader get NV21 format?