Spark streaming DStream RDD to get file name

Spark streaming textFileStream and fileStream can monitor a directory and process the new files in a Dstream RDD.

How to get the file names that are being processed by the DStream RDD at that particular interval?


fileStream produces UnionRDD of NewHadoopRDD s. The good part about NewHadoopRDD s created by sc.newAPIHadoopFile is that their name s are set to their paths.

Here's the example of what you can do with that knowledge:

def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
  ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
    .transform( rdd =>
      new UnionRDD(rdd.context,
        rdd.dependencies.map( dep =>
          dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
        )
      )
    )

def transformByFile[U: ClassTag](unionrdd: RDD[String],
                                 transformFunc: String => RDD[String] => RDD[U]): RDD[U] = {
  new UnionRDD(unionrdd.context,
    unionrdd.dependencies.map{ dep =>
      if (dep.rdd.isEmpty) None
      else {
        val filename = dep.rdd.name
        Some(
          transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
            .setName(filename)
        )
      }
    }.flatten
  )
}

def main(args: Array[String]) = {
  val conf = new SparkConf()
    .setAppName("Process by file")
    .setMaster("local[2]")

  val ssc = new StreamingContext(conf, Seconds(30))

  val dstream = namesTextFileStream(ssc, "/some/directory")

  def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
    rdd.map(line => (filename, line))

  val transformed = dstream.
    transform(rdd => transformByFile(rdd, byFileTransformer))

  // Do some stuff with transformed

  ssc.start()
  ssc.awaitTermination()
}

对于那些需要一些Java代码而不是Scala的人:

JavaPairInputDStream<LongWritable, Text> textFileStream = 
        jsc.fileStream(
            inputPath, 
            LongWritable.class, 
            Text.class,
            TextInputFormat.class, 
            FileInputDStream::defaultFilter,
            false
        );
JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> {
        UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd();
        List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava();
        List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> {
            if (depRdd.isEmpty()) {
                return null;
            }
            JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD();
            String filename = depRdd.name();
            JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename);
            return newDep.rdd();
        }).filter(t -> t != null).collect(Collectors.toList());
        Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq();
        ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);
        return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD();
});
链接地址: http://www.djcxy.com/p/84094.html

上一篇: Ansible:从自定义模块中访问主机/组变量

下一篇: Spark流媒体DStream RDD获取文件名