groovy - NiFi 1.5:在 groovy 中使用 ScriptedRecordSetWriter 编写自定义 XML
问题描述
我通过传递 avro 模式使用 ConvertRecord 处理器将 csv 转换为 xml。由于 NiFi 1.5 中没有 XMLRecordSetWriter,我使用的是 ScriptedRecordSetWriter。我从 nifi github 获得了示例 xml writer groovy 脚本
我想自定义 xml,比如自定义记录标签、父标签和删除空标签。
输入 CSV
MovieID,Year,Genre
1,2000,Action
2,1997,Action|Adventure
使用示例代码的电流输出
<record>
<MovieID>1</MovieID>
<Year>2000</Year>
<Genre>Action</Genre>
</record>
<record>
<MovieID>2</MovieID>
<Year>1997</Year>
<Genre>Action|Adventure</Genre>
</record>
预期产出
<MovieList>
<Movie>
<MovieID>1</MovieID>
<Year>2000</Year>
<Genre>Action</Genre>
</Movie>
<Movie>
<MovieID>2</MovieID>
<Year>1997</Year>
<Genre>Action|Adventure</Genre>
</Movie>
</MovieList>
这是来自 nifi github 的示例代码
import groovy.xml.MarkupBuilder
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.RecordSetWriterFactory
import org.apache.nifi.serialization.WriteResult
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.stream.io.NonCloseableOutputStream
class GroovyRecordSetWriter implements RecordSetWriter {
private int recordCount = 0;
private final OutputStream out;
public GroovyRecordSetWriter(final OutputStream out) {
this.out = out;
}
@Override
WriteResult write(Record r) throws IOException {
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).record {
r.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
}
}
recordCount++;
WriteResult.of(1, [:])
}
@Override
String getMimeType() {
return 'application/xml'
}
@Override
WriteResult write(final RecordSet rs) throws IOException {
int count = 0
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).recordSet {
Record r
while (r = rs.next()) {
count++
record {
rs.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
}
}
}
}
WriteResult.of(count, [:])
}
public void beginRecordSet() throws IOException {
}
@Override
public WriteResult finishRecordSet() throws IOException {
return WriteResult.of(recordCount, [:]);
}
@Override
public void close() throws IOException {
}
@Override
public void flush() throws IOException {
}
}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
@Override
RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return null
}
@Override
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
return new GroovyRecordSetWriter(out)
}
}
writer = new GroovyRecordSetWriterFactory()
我是 groovy 和 nifi 的新手。我搜索了任何实现相同但找不到的示例。任何帮助或方向表示赞赏。谢谢!!
解决方案
推荐阅读
- excel - 奇怪的文字问题
- scala - 使用 scala 生成句子使用流
- json - 在 Flutter 中解析嵌套的 JSON 对象
- mysql - 如何在sql中选择*但不同的两列对?
- javascript - pageYOffset 的替代方案(在当前点的滚动位置上动画)
- xml - 电源外壳。如何更改目录中所有文件夹的 web.config xml 文件的密钥?
- android - Android Naviagtion navigateUp 弹出动画问题
- j2html - 如何在带有 j2html 的标题中使用样式
- c++ - 使用范围初始化向量与 lambda 内联初始化
- sas - SAS回归