为什么sortBy转换触发Spark作业?
根据Spark文档,只有RDD操作可以触发Spark作业,并且在对其执行操作时对变换进行延迟评估。
我看到sortBy
转换函数被立即应用,它在SparkUI中显示为一个作业触发器。 为什么?
sortBy
使用实施sortByKey
取决于一个RangePartitioner
(JVM)或分区功能(Python)的。 当您调用sortBy
/ sortByKey
分区器(分区函数)时,会急切地初始化并对输入RDD进行采样以计算分区边界。 你看到的工作对应于这个过程。
仅当您对新创建的RDD
或其后代执行操作时,才会执行实际排序。
根据Spark文档,只有该操作会在Spark中触发一项作业,当对其执行操作时,转换将被延迟评估。
一般而言,你是对的,但正如你刚刚经历的那样,很少有例外, sortBy
就是其中之一(使用zipWithIndex
)。
事实上,它在Spark的JIRA中报告,并以Will not Fix解决方案结束。 请参阅SPARK-1021 sortByKey()在不应该时启动集群作业。
您可以看到启用DAGScheduler
日志记录的作业(以及稍后在Web UI中):
scala> sc.parallelize(0 to 8).sortBy(identity)
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
DEBUG DAGScheduler: submitStage(ResultStage 1)
DEBUG DAGScheduler: missing: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25)
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4)
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25
链接地址: http://www.djcxy.com/p/94715.html