How do you perform blocking IO in apache spark job?
What if, when I traverse RDD, I need to calculate values in dataset by calling external (blocking) service? How do you think that could be achieved?
val values: Future[RDD[Double]] = Future sequence tasks
I've tried to create a list of Futures, but as RDD id not Traversable, Future.sequence is not suitable.
I just wonder, if anyone had such a problem, and how did you solve it? What I'm trying to achieve is to get a parallelism on a single worker node, so I can call that external service 3000 times per second .
Probably, there is another solution, more suitable for spark, like having multiple working nodes on single host.
It's interesting to know, how do you cope with such a challenge? Thanks.
Here is answer to my own question:
val buckets = sc.textFile(logFile, 100)
val tasks: RDD[Future[Object]] = buckets map { item =>
future {
// call native code
}
}
val values = tasks.mapPartitions[Object] { f: Iterator[Future[Object]] =>
val searchFuture: Future[Iterator[Object]] = Future sequence f
Await result (searchFuture, JOB_TIMEOUT)
}
The idea here is, that we get the collection of partitions, where each partition is sent to the specific worker and is the smallest piece of work. Each that piece of work contains data, that could be processed by calling native code and sending that data.
'values' collection contains the data, that is returned from the native code and that work is done across the cluster.
Based on your answer, that the blocking call is to compare provided input with each individual item in the RDD, I would strongly consider rewriting the comparison in java/scala so that it can be run as part of your spark process. If the comparison is a "pure" function (no side effects, depends only on its inputs), it should be straightforward to re-implement, and the decrease in complexity and increase in stability in your spark process due to not having to make remote calls will probably make it worth it.
It seems unlikely that your remote service will be able to handle 3000 calls per second, so a local in-process version would be preferable.
If that is absolutely impossible for some reason, then you might be able to create a RDD transformation which turns your data into a RDD of futures, in pseudo-code:
val callRemote(data:Data):Future[Double] = ...
val inputData:RDD[Data] = ...
val transformed:RDD[Future[Double]] = inputData.map(callRemote)
And then carry on from there, computing on your Future[Double] objects.
If you know how much parallelism your remote process can handle, it might be best to abandon the Future mode and accept that it is a bottleneck resource.
val remoteParallelism:Int = 100 // some constant
val callRemoteBlocking(data:Data):Double = ...
val inputData:RDD[Data] = ...
val transformed:RDD[Double] = inputData.
coalesce(remoteParallelism).
map(callRemoteBlocking)
Your job will probably take quite some time, but it shouldn't flood your remote service and die horribly.
A final option is that if the inputs are reasonably predictable and the range of outcomes is consistent and limited to some reasonable number of outputs (millions or so), you could precompute them all as a data set using your remote service and find them at spark job time using a join.
链接地址: http://www.djcxy.com/p/81202.html