首页 > 解决方案 > 需要从 apache 光束(数据流)在 clickhouseIO 中插入行

问题描述

我正在阅读一个运行良好的 Pub/Sub 主题,现在我需要插入到 clickHouse 上的表中。

我正在学习,请原谅迟到。


        PipelineOptions options = PipelineOptionsFactory.create();


        //PubSubToDatabasesPipelineOptions options;
        Pipeline p = Pipeline.create(options);

        PCollection<String> inputFromPubSub = p.apply(namePrefix + "ReadFromPubSub",
                PubsubIO.readStrings().fromSubscription("projects/*********/subscriptions/crypto_bitcoin.dataflow.bigquery.transactions").withIdAttribute(PUBSUB_ID_ATTRIBUTE));



        PCollection<TransactionSmall> res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn<String, TransactionSmall>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String item = c.element();
                //System.out.print(item);
                Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
                //System.out.print(transaction);
                c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));
            }}));


        res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));

        p.run().waitUntilFinish();

我的 TransactionSmall.java

import java.io.Serializable;
import java.util.Date;

public class TransactionSmall implements Serializable {

    private Date created_dt;
    private String hash;

    private int number;

    public TransactionSmall(Date created_dt, String hash, int number) {
        this.created_dt = created_dt;
        this.hash = hash;
        this.number = number;
    }
}

我的表定义

clickhouse.us-east1-b.c.staging-btc-etl.internal :) CREATE TABLE litecoin.saurabh_blocks_small (`created_date` Date DEFAULT today(), `hash` String, `number` In) ENGINE = MergeTree(created_date, (hash, number), 8192)

CREATE TABLE litecoin.saurabh_blocks_small
(
    `created_date` Date, 
    `hash` String, 
    `number` In
)
ENGINE = MergeTree(created_date, (hash, number), 8192)

我收到类似的错误

java.lang.IllegalArgumentException: Type of @Element must match the DoFn typesaurabhReadFromPubSub2/ParMultiDo(Anonymous).output [PCollection]
    at org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation (ParDo.java:577)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo (ParDoTranslation.java:185)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate (ParDoTranslation.java:124)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:155)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload (ParDoTranslation.java:650)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable (ParDoTranslation.java:665)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches (PTransformMatchers.java:269)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform (Pipeline.java:282)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 (TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:460)
    at org.apache.beam.sdk.Pipeline.replace (Pipeline.java:260)
    at org.apache.beam.sdk.Pipeline.replaceAll (Pipeline.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:170)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:315)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:301)
    at io.blockchainetl.bitcoin.Trail.main (Trail.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)


在不显式创建对象的情况下实现这一目标的最佳方法和最干净的方法是什么?

谢谢

标签: google-cloud-dataflowapache-beam

解决方案


PCollection这很可能发生,因为 Beam在为其推断架构时依赖于编码器规范。推断 ClickhouseIO 转换的输入模式似乎有问题。

您可以通过指定具有模式推断的编码器(例如 AvroCoder)来强制 Beam 具有模式。你会这样做:

@DefaultCoder(AvroCoder.class)
public class TransactionSmall implements Serializable {

    private Date created_dt;
    private String hash;

    private int number;

    public TransactionSmall(Date created_dt, String hash, int number) {
        this.created_dt = created_dt;
        this.hash = hash;
        this.number = number;
    }
}

或者,您也可以在管道上设置 PCollection 的编码器:

PCollection<TransactionSmall> res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn<String, TransactionSmall>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        String item = c.element();
        Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
        c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));
     }}))
    .setCoder(AvroCoder.of(TransactionSmall.class));


res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));

推荐阅读