Spark streaming DStream RDD to get file name
Spark streaming textFileStream
and fileStream
can monitor a directory and process the new files in a Dstream RDD.
How to get the file names that are being processed by the DStream RDD at that particular interval?
fileStream
produces UnionRDD
of NewHadoopRDD
s. The good part about NewHadoopRDD
s created by sc.newAPIHadoopFile
is that their name
s are set to their paths.
Here's the example of what you can do with that knowledge:
def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
.transform( rdd =>
new UnionRDD(rdd.context,
rdd.dependencies.map( dep =>
dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
)
)
)
def transformByFile[U: ClassTag](unionrdd: RDD[String],
transformFunc: String => RDD[String] => RDD[U]): RDD[U] = {
new UnionRDD(unionrdd.context,
unionrdd.dependencies.map{ dep =>
if (dep.rdd.isEmpty) None
else {
val filename = dep.rdd.name
Some(
transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
.setName(filename)
)
}
}.flatten
)
}
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("Process by file")
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(30))
val dstream = namesTextFileStream(ssc, "/some/directory")
def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
rdd.map(line => (filename, line))
val transformed = dstream.
transform(rdd => transformByFile(rdd, byFileTransformer))
// Do some stuff with transformed
ssc.start()
ssc.awaitTermination()
}
对于那些需要一些Java代码而不是Scala的人:
JavaPairInputDStream<LongWritable, Text> textFileStream =
jsc.fileStream(
inputPath,
LongWritable.class,
Text.class,
TextInputFormat.class,
FileInputDStream::defaultFilter,
false
);
JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> {
UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd();
List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava();
List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> {
if (depRdd.isEmpty()) {
return null;
}
JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD();
String filename = depRdd.name();
JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename);
return newDep.rdd();
}).filter(t -> t != null).collect(Collectors.toList());
Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq();
ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);
return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD();
});
链接地址: http://www.djcxy.com/p/84094.html