首页 > 解决方案 > NiFi 1.5:在 groovy 中使用 ScriptedRecordSetWriter 编写自定义 XML

问题描述

我通过传递 avro 模式使用 ConvertRecord 处理器将 csv 转换为 xml。由于 NiFi 1.5 中没有 XMLRecordSetWriter,我使用的是 ScriptedRecordSetWriter。我从 nifi github 获得了示例 xml writer groovy 脚本

https://github.com/apache/nifi/blob/rel/nifi-1.5.0/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline。时髦的

我想自定义 xml,比如自定义记录标签、父标签和删除空标签。

输入 CSV

MovieID,Year,Genre
1,2000,Action
2,1997,Action|Adventure

使用示例代码的电流输出

<record>
 <MovieID>1</MovieID>
 <Year>2000</Year>
 <Genre>Action</Genre>
</record>
<record>
 <MovieID>2</MovieID>
 <Year>1997</Year>
 <Genre>Action|Adventure</Genre>
</record>

预期产出

<MovieList>
 <Movie>
  <MovieID>1</MovieID>
  <Year>2000</Year>
  <Genre>Action</Genre>
 </Movie>
 <Movie>
  <MovieID>2</MovieID>
  <Year>1997</Year>
  <Genre>Action|Adventure</Genre>
 </Movie>
</MovieList>

这是来自 nifi github 的示例代码

import groovy.xml.MarkupBuilder

import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.RecordSetWriterFactory
import org.apache.nifi.serialization.WriteResult
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.stream.io.NonCloseableOutputStream


class GroovyRecordSetWriter implements RecordSetWriter {
    private int recordCount = 0;
    private final OutputStream out;

    public GroovyRecordSetWriter(final OutputStream out) {
        this.out = out;
    }

    @Override
    WriteResult write(Record r) throws IOException {
        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
            new MarkupBuilder(osw).record {
                r.schema.fieldNames.each {fieldName ->
                    "$fieldName" r.getValue(fieldName)
                }
            }
        }

        recordCount++;
        WriteResult.of(1, [:])
    }

    @Override
    String getMimeType() {
        return 'application/xml'
    }

    @Override
    WriteResult write(final RecordSet rs) throws IOException {
        int count = 0

        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
            new MarkupBuilder(osw).recordSet {

                Record r
                while (r = rs.next()) {
                    count++

                    record {
                        rs.schema.fieldNames.each {fieldName ->
                            "$fieldName" r.getValue(fieldName)
                        }
                    }
                }
            }
        }
        WriteResult.of(count, [:])
    }

    public void beginRecordSet() throws IOException {
    }

    @Override
    public WriteResult finishRecordSet() throws IOException {
        return WriteResult.of(recordCount, [:]);
    }

    @Override
    public void close() throws IOException {
    }

    @Override
    public void flush() throws IOException {
    }
}

class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {

    @Override
    RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
        return null
    }

    @Override    
    RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException,     IOException {
        return new GroovyRecordSetWriter(out)
    }

}

writer = new GroovyRecordSetWriterFactory()

我是 groovy 和 nifi 的新手。我搜索了任何实现相同但找不到​​的示例。任何帮助或方向表示赞赏。谢谢!!

标签: groovyapache-nifi

解决方案


推荐阅读