在DirectPipelineRunner上使用自定义的DataFlow无限源

我正在编写一个自定义的DataFlow无界数据源,它可以从Kafka 0.8中读取。 我想使用DirectPipelineRunner在本地运行它。 但是,我得到以下stackstrace:

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
        at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
        at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)

这是有道理的,因为我在任何时候都没有为自定义源注册评估者。

阅读https://github.com/GoogleCloudPlatform/DataflowJavaSDK,似乎只有有限源的评估者才被注册。 为定制无限源定义和注册评估程序的推荐方法是什么?


DirectPipelineRunner目前仅运行有界输入。 我们正在积极努力消除这一限制,并希望很快发布。

与此同时,通过使用withMaxNumRecords ,可以轻松地将任何UnboundedSource转换为BoundedSource ,以用于测试目的,如下例所示:

UnboundedSource<String> unboundedSource  = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
    p.apply(Read.from(unboundedSource).withMaxNumRecords(10));

有关更多详细信息,请参阅GitHub上的此问题。


另外,在贡献Kafka连接器方面还有几项努力。 您可能想通过我们的GitHub存储库与我们和其他贡献者进行交流。

链接地址: http://www.djcxy.com/p/31631.html

上一篇: Using custom DataFlow unbounded source on DirectPipelineRunner

下一篇: Paypal ssl handshake faliure