首页 > 解决方案 > 在 BigQuery 上插入行:InsertAllRequest 与 BigQueryIO.writeTableRows()

问题描述

当我在 BigQuery 上使用 插入行时writeTableRows,与InsertAllRequest. 显然,有些东西没有正确设置。

用例 1:我编写了一个 Java 程序来使用 Twitter4j 处理“示例”Twitter 流。当一条推文出现时,我使用以下命令将其写入 BigQuery:

insertAllRequestBuilder.addRow(rowContent);

当我从我的 Mac 运行这个程序时,它每分钟将大约 1000 行直接插入到 BigQuery 表中。我认为通过在集群上运行 Dataflow 作业可以做得更好。

用例 2:当一条推文出现时,我将它写到 Google 的 PubSub 的一个主题中。我从我的 Mac 上运行它,它每分钟发送大约 1000 条消息。

我编写了一个Dataflow作业,它读取这个主题并使用BigQueryIO.writeTableRows(). 我有一个 8 机器 Dataproc 集群。我使用DataflowRunner在该集群的主节点上开始了这项工作。它慢得令人难以置信!就像每 5 分钟左右 100 行一样。以下是相关代码的片段:

statuses.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
        TableRow row = new TableRow();
        Status status = c.element();
        row.set("Id", status.getId());
        row.set("Text", status.getText());
        row.set("RetweetCount", status.getRetweetCount());
        row.set("FavoriteCount", status.getFavoriteCount());
        row.set("Language", status.getLang());
        row.set("ReceivedAt", null);
        row.set("UserId", status.getUser().getId());
        row.set("CountryCode", status.getPlace().getCountryCode());
        row.set("Country", status.getPlace().getCountry());
        c.output(row);
    }
})) 
    .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)//
            .withSchema(schema)
            .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
            .withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
            .withNumFileShards(1000)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

我究竟做错了什么?我应该使用“SparkRunner”吗?如何确认它在我的集群的所有节点上运行?

标签: google-cloud-platformgoogle-bigquerygoogle-cloud-pubsubgoogle-cloud-dataprocdataflow

解决方案


使用 BigQuery,您可以:

  • 流式传输数据。低延迟(高达每秒 10 万行)是有代价的。
  • 批量输入数据。更高的延迟,令人难以置信的吞吐量,完全免费。

这就是你正在经历的差异。如果您只想摄取 1000 行,批处理会明显变慢。与 100 亿行相同,通过批处理将更快,而且是免费的。

Dataflow/BemBigQueryIO.writeTableRows可以流式传输或批处理数据。

使用BigQueryIO.Write.Method.FILE_LOADS粘贴的代码选择批处理。


推荐阅读