Using custom DataFlow unbounded source on DirectPipelineRunner

I'm writing a custom DataFlow unbounded data source that reads from Kafka 0.8. I'd like to run it locally using the DirectPipelineRunner. However, I'm getting the following stackstrace:

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
        at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
        at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)

Which makes some sense, as I haven't registered an evaluator for my custom source at any time.

Reading https://github.com/GoogleCloudPlatform/DataflowJavaSDK, it seems like only evaluators for bounded sources are registered. What's the recommended way to define and register an evaluator for an custom unbounded source?


DirectPipelineRunner currently runs over bounded input only. We are actively working on removing this restriction, and expect to release it shortly.

In the meanwhile, you can trivially turn any UnboundedSource into a BoundedSource , for testing purposes, by using withMaxNumRecords , as in the following example:

UnboundedSource<String> unboundedSource  = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
    p.apply(Read.from(unboundedSource).withMaxNumRecords(10));

See this issue on GitHub for more details.


Separately, there are several efforts on contributing the Kafka connector. You may want to engage with us and other contributors about that via our GitHub repository.

链接地址: http://www.djcxy.com/p/31632.html

上一篇: ipython在macbook上安装时出错

下一篇: 在DirectPipelineRunner上使用自定义的DataFlow无限源