java - 从 Pubsub 读取并写入 GCS 的 Google 数据流作业非常慢(WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards)耗时太长
问题描述
目前我们有一个数据流作业,它从 pubsub 读取并使用 FileIO.writeDynamic 将 avro 文件写入 GCS,当我们使用 10000 events/sec 进行测试时,由于 WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards 非常慢,因此无法更快地处理。下面是我们用来编写的代码片段。我们如何改进
PCollection<Event> windowedWrites = input.apply("Global Window", Window.<Event>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterFirst.of(AfterPane.elementCountAtLeast(50000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils
.parseDuration(windowDuration))))).discardingFiredPanes());
return windowedWrites
.apply("WriteToAvroGCS", FileIO.<EventDestination, Five9Event>writeDynamic()
.by(groupFn)
.via(outputFn, Contextful.fn(
new SinkFn()))
.withTempDirectory(avroTempDirectory)
.withDestinationCoder(destinationCoder)
.withNumShards(1).withNaming(namingFn));
我们使用自定义文件命名,格式为 gs://tenantID.<>/eventname/dddd-mm-dd/<uniq_id-shardInder-of-numOfShards-pane-paneIndex.avro>
解决方案
正如罗伯特所说,当使用withNumShards(1)
Dataflow/Beam 时,无法并行化写入,使其发生在同一个工作人员身上。当捆绑包相对较高时,这对管道的性能有很大影响。我举了一个例子来证明这一点:
我运行了 3 个生成大量元素(~2gb)的管道,其中三个有 10 个n1-standard-1
工作人员,但有 1 个分片、10 个分片和 0 个分片(Dataflow 会选择分片的数量)。这就是他们的行为方式:
我们看到 0 或 10 个 Shard 与 1 个 Shard 的总时间之间存在很大差异。如果我们使用 1 个分片进行工作,我们会看到只有一个工作人员在做某事(我禁用了自动缩放):
正如 Reza 所提到的,发生这种情况是因为所有元素都需要洗牌到同一个工作人员中,所以它写入 1 分片。
请注意,我的示例是 Batch,它在线程方面的行为与 Streaming 不同,但对管道性能的影响足够相似(实际上,在 Streaming 中它可能更糟糕)。
这里有一个 Python 代码,因此您可以自己测试:
p = beam.Pipeline(options=pipeline_options)
def long_string_generator():
string = "Apache Beam is an open source, unified model for defining " \
"both batch and streaming data-parallel processing " \
"pipelines. Using one of the open source Beam SDKs, " \
"you build a program that defines the pipeline. The pipeline " \
"is then executed by one of Beam’s supported distributed " \
"processing back-ends, which include Apache Flink, Apache " \
"Spark, and Google Cloud Dataflow. "
word_choice = random.sample(string.split(" "), 20)
return " ".join(word_choice)
def generate_elements(element, amount=1):
return [(element, long_string_generator()) for _ in range(amount)]
(p | Create(range(1500))
| beam.FlatMap(generate_elements, amount=10000)
| WriteToText(known_args.output, num_shards=known_args.shards))
p.run()
推荐阅读
- microsoft-teams - Mulesoft Microsoft 团队连接器
- python - 请求库不工作?控制台中没有任何内容
- javascript - 在 React Native 中将 API 响应数据添加到之前的 AsyncStorage 数据中
- python - DAG runtime error with creating Airflow.cfg file
- mysql - Why Liquibase docker container can't connect to Mysql docker container?
- android - Why getItemCount() called multiple times in recycleView?
- multithreading - Flutter : Function need to wait
- scala - 通过简单的 akka-http 路由测试与演员陷入无限循环
- flutter - 用户做某事时如何更新状态?
- firebase - 从应用程序内打开的firebase动态链接