首页 > 解决方案 > 管道外的 Java 代码不会在 Dataflow 上运行

问题描述

看起来管道外的任何代码都不会在 Dataflow 上运行。在下面的示例中,我在方法中得到了一个NullPointerExceptionfor 。使用 Apache Beam/Dataflow 执行此操作的正确方法是什么?TableSchemaTableRowConverterFn.processElement

     private static TableSchema TableSchema;

     public static void main(String[] args) {

        try {
            TableSchema = TableSchemaReader.read(TableSchemaResource);
        } catch (IOException e) {
            log.error("Table schema can not be read from {}. Process aborted.", TableSchemaResource);
            return;
        }

        DataflowDfpOptions options = PipelineOptionsFactory.fromArgs(args)
                //.withValidation()
                .as(DataflowDfpOptions.class);

        Pipeline pipeline = Pipeline.create(options);

        Stopwatch sw = Stopwatch.createStarted();
        log.info("DFP data transfer from GS to BQ has started.");

        pipeline.apply("ReadFromStorage", TextIO.read()
                .from("gs://my-test/stream/*.gz")
                .withCompression(Compression.GZIP))
                .apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()))
                .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
                        .to(options.getTableId())
                        .withMethod(STREAMING_INSERTS)
                        .withCreateDisposition(CREATE_NEVER)
                        .withWriteDisposition(WRITE_APPEND)
                        .withSchema(TableSchema)); //todo: use withJsonScheme(String json) method instead


        pipeline.run().waitUntilFinish();

        log.info("DFP data transfer from GS to BQ is finished in {} seconds.", sw.elapsed(TimeUnit.SECONDS));
    }

    /**
     * Creates a TableRow from a CSV line
     */
    private static class TableRowConverterFn extends DoFn<String, TableRow> {

        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {

            String[] split = c.element().split(",");

            //Ignore the header line
            //Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
            if (split[0].equals("Time")) {
                log.info("Skipped header");
                return;
            }

            TableRow row = new TableRow();
            for (int i = 0; i < split.length; i++) {

                //This throws NEP!!!
                TableFieldSchema col = TableSchema.getFields().get(i);

                //String is the most common type, putting it in the first if clause for a little bit optimization.
                if (col.getType().equals("STRING")) {
                    row.set(col.getName(), split[i]);
                } else if (col.getType().equals("INTEGER")) {
                    row.set(col.getName(), Long.valueOf(split[i]));
                } else if (col.getType().equals("BOOLEAN")) {
                    row.set(col.getName(), Boolean.valueOf(split[i]));
                } else if (col.getType().equals("FLOAT")) {
                    row.set(col.getName(), Float.valueOf(split[i]));
                } else {
                    //Simply try to write it as a String if
                    //todo: Consider other BQ data types.
                    row.set(col.getName(), split[i]);
                }
            }
            c.output(row);
        }
    }

标签: javagoogle-cloud-dataflowapache-beam

解决方案


尽管此代码可能在 DirectRunner 中本地运行,但它确实不能在 DataflowRunner 中运行。原因如下:

main在您的函数之外创建的 DoFns无法使用 DataflowRunner 访问您的类的(甚至是静态的)变量。我相信(虽然不是 100% 肯定)这是因为 Dataflow 在云中运行时如何分阶段和序列化 DoFns。

以下是您可以解决此问题的方法:

private static class TableRowConverterFn extends DoFn<String, TableRow> {
    private static TableSchema tableSchema;

    public TableRowConverterFn(TableSchema tableSchema) {
        this.tableSchema = tableSchema;
    }

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
        // stuff
    }
}

然后在你的主函数调用中

.apply("TransformToTableRow", ParDo.of(new TableRowConverterFn(tableSchema)));

推荐阅读