你如何在apache spark工作中执行阻塞IO?
如果在我遍历RDD时,我需要通过调用外部(阻塞)服务来计算数据集中的值? 你认为如何实现?
val值: Future[RDD[Double]] = Future sequence tasks
我尝试创建期货列表,但由于RDD id不可穿越,Future.sequence并不适合。
我只是想知道,如果有人有这样的问题,你是如何解决的? 我试图做到的,是得到一个工作节点上的并行性,所以我可以调用外部服务每秒 3000次。
可能还有另一种解决方案,更适合火花,就像单个主机上有多个工作节点一样。
有趣的是知道,你如何应对这样的挑战? 谢谢。
这是我自己的问题的答案:
val buckets = sc.textFile(logFile, 100)
val tasks: RDD[Future[Object]] = buckets map { item =>
future {
// call native code
}
}
val values = tasks.mapPartitions[Object] { f: Iterator[Future[Object]] =>
val searchFuture: Future[Iterator[Object]] = Future sequence f
Await result (searchFuture, JOB_TIMEOUT)
}
这里的想法是,我们得到分区的集合,其中每个分区被发送给特定的工作人员,并且是最小的一部分工作。 每件作品都包含数据,可以通过调用本地代码并发送该数据来处理这些数据。
'values'集合包含从本地代码返回的数据,并且该工作在集群中完成。
根据你的回答,阻止调用是将提供的输入与RDD中的每个单独项目进行比较,我强烈考虑重写java / scala中的比较,以便它可以作为spark过程的一部分运行。 如果比较是一个“纯粹”功能(无副作用,仅取决于其输入),则应该直接重新实现,并且由于不必制作远程设备,复杂性和火花过程的稳定性会增加电话可能会让它值得。
您的远程服务似乎不太可能每秒处理3000个呼叫,因此本地进程中版本更可取。
如果由于某种原因绝对不可能,那么您可以创建一个RDD转换,将您的数据转换为以伪代码形式存在的期货RDD:
val callRemote(data:Data):Future[Double] = ...
val inputData:RDD[Data] = ...
val transformed:RDD[Future[Double]] = inputData.map(callRemote)
然后从那里继续,计算你的未来[双]对象。
如果您知道您的远程进程可以处理多少并行性,最好放弃Future模式并接受它是瓶颈资源。
val remoteParallelism:Int = 100 // some constant
val callRemoteBlocking(data:Data):Double = ...
val inputData:RDD[Data] = ...
val transformed:RDD[Double] = inputData.
coalesce(remoteParallelism).
map(callRemoteBlocking)
你的工作可能需要相当长的一段时间,但它不应该淹没你的远程服务并且可怕地死亡。
最后的选择是,如果输入是合理可预测的,并且结果范围是一致的并且限于某些合理数量的输出(数百万左右),则可以使用远程服务将它们全部预先计算为数据集并在火花中找到它们工作时间使用连接。
链接地址: http://www.djcxy.com/p/81201.html