首页 > 解决方案 > 在 Dataflow 中手动发送 PubSub 消息

问题描述

如何在 Dataflow 中手动发送 PubSub 消息(也就是说,不使用 a PubsubIO)?

导入(通过 Maven)google-cloud-dataflow-java-sdk-all 2.5.0已经导入了一个版本com.google.pubsub.v1,我无法找到一个简单的方法来向 Pubsub 主题发送消息(例如,这个版本不允许操作Publisher实例,这是官方描述的方式文档)。

标签: javagoogle-cloud-dataflowpublish-subscribeapache-beamgoogle-cloud-pubsub

解决方案


你会考虑使用PubsubUnboundedSink吗?快速示例:

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;


public class PubsubTest {

    public static void main(String[] args) {

        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
                        .as(DataflowPipelineOptions.class); 

        // writes message to "output_topic"
        TopicPath topic = PubsubClient.topicPathFromName(options.getProject(), "output_topic");

        Pipeline p = Pipeline.create(options);

        p
        .apply("input string", Create.of("This is just a message"))
        .apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                c.output(new PubsubMessage(c.element().getBytes(), null));          
            }
        }))
        .apply("write to topic", new PubsubUnboundedSink(
            PubsubJsonClient.FACTORY,
            StaticValueProvider.of(topic), // topic
            "timestamp", // timestamp attribute
            "id", // ID attribute
            5 // number of shards
        )); 

        p.run();
    }
}

推荐阅读