google-cloud-platform - 在 BigQuery 上插入行:InsertAllRequest 与 BigQueryIO.writeTableRows()
问题描述
当我在 BigQuery 上使用 插入行时writeTableRows
,与InsertAllRequest
. 显然,有些东西没有正确设置。
用例 1:我编写了一个 Java 程序来使用 Twitter4j 处理“示例”Twitter 流。当一条推文出现时,我使用以下命令将其写入 BigQuery:
insertAllRequestBuilder.addRow(rowContent);
当我从我的 Mac 运行这个程序时,它每分钟将大约 1000 行直接插入到 BigQuery 表中。我认为通过在集群上运行 Dataflow 作业可以做得更好。
用例 2:当一条推文出现时,我将它写到 Google 的 PubSub 的一个主题中。我从我的 Mac 上运行它,它每分钟发送大约 1000 条消息。
我编写了一个Dataflow作业,它读取这个主题并使用BigQueryIO.writeTableRows()
. 我有一个 8 机器 Dataproc 集群。我使用DataflowRunner在该集群的主节点上开始了这项工作。它慢得令人难以置信!就像每 5 分钟左右 100 行一样。以下是相关代码的片段:
statuses.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
Status status = c.element();
row.set("Id", status.getId());
row.set("Text", status.getText());
row.set("RetweetCount", status.getRetweetCount());
row.set("FavoriteCount", status.getFavoriteCount());
row.set("Language", status.getLang());
row.set("ReceivedAt", null);
row.set("UserId", status.getUser().getId());
row.set("CountryCode", status.getPlace().getCountryCode());
row.set("Country", status.getPlace().getCountry());
c.output(row);
}
}))
.apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)//
.withSchema(schema)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
.withNumFileShards(1000)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
我究竟做错了什么?我应该使用“SparkRunner”吗?如何确认它在我的集群的所有节点上运行?
解决方案
使用 BigQuery,您可以:
- 流式传输数据。低延迟(高达每秒 10 万行)是有代价的。
- 批量输入数据。更高的延迟,令人难以置信的吞吐量,完全免费。
这就是你正在经历的差异。如果您只想摄取 1000 行,批处理会明显变慢。与 100 亿行相同,通过批处理将更快,而且是免费的。
Dataflow/BemBigQueryIO.writeTableRows
可以流式传输或批处理数据。
使用BigQueryIO.Write.Method.FILE_LOADS
粘贴的代码选择批处理。
推荐阅读
- ios - 从命令行在项目中查找 XCTest 目标
- google-cloud-platform - 从 BigQuery 连接数据集后,Google Data Studio 图表中出现“无数据”消息?
- python - 我想用 Pygame 做一个游戏。但是我的箭不能从正确的位置射出
- scala - 在 Apache Flink 中使用 Dropwizard 直方图度量时违反加载程序约束
- android - NavController 在设备旋转后没有当前导航节点
- javascript - 如果包含文本,jQuery 对多个 DIV 进行排序
- c# - 在 AX 2012 r2 中出现类似“Testing1”的错误不是 AxdEnum_XMLDocPurpose 的有效值
- loops - for循环中的变量范围
- python-3.x - 如何在单元格值为空时结束 iter_rows 循环?
- java - 正则表达式返回进程的 pid