java - 在 Dataflow 中手动发送 PubSub 消息
问题描述
如何在 Dataflow 中手动发送 PubSub 消息(也就是说,不使用 a PubsubIO
)?
导入(通过 Maven)google-cloud-dataflow-java-sdk-all 2.5.0
已经导入了一个版本com.google.pubsub.v1
,我无法找到一个简单的方法来向 Pubsub 主题发送消息(例如,这个版本不允许操作Publisher
实例,这是官方描述的方式文档)。
解决方案
你会考虑使用PubsubUnboundedSink
吗?快速示例:
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
public class PubsubTest {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.as(DataflowPipelineOptions.class);
// writes message to "output_topic"
TopicPath topic = PubsubClient.topicPathFromName(options.getProject(), "output_topic");
Pipeline p = Pipeline.create(options);
p
.apply("input string", Create.of("This is just a message"))
.apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}))
.apply("write to topic", new PubsubUnboundedSink(
PubsubJsonClient.FACTORY,
StaticValueProvider.of(topic), // topic
"timestamp", // timestamp attribute
"id", // ID attribute
5 // number of shards
));
p.run();
}
}
推荐阅读
- json - 更新 json 模型并保留/转换现有数据 - Swift
- python-3.x - 通过使用 Pandas Dataframe 排除特定单词来计算每一行
- machine-learning - 如何将训练结束回调添加到 AllenNLP 配置文件?
- debugging - 如何在 anaconda jupyter 实验室笔记本中运行调试器
- javascript - 导入单独文件时变量未定义
- loops - 为什么我的 6800 汇编代码打印随机值?
- nlp - 如何使用 SpaCy 或 NLTK 从英语的词根或引理词中生成所有派生词?
- c - 父进程结束后如何在C中获取子进程的返回值?
- html - 使用 django 静态文件在 safari 中无法正常工作的视频
- php - MySQL 语法错误:您的 SQL 语法有错误