首页 > 解决方案 > 我的 Nifi python 脚本有什么问题,请提出建议

问题描述

我正在尝试读取来自 Nifi 处理器的 json 数据,我使用下面的代码通过计算将一个键值更新到每条记录中,但我遇到了**list indices must be integers in <script> at line number 43**问题。在此处输入图像描述.

json file手动添加时相同的代码工作正常

我的代码是:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback


class FlowFileParser(StreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream, outputStream):
        finalResp = []
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        data = json.loads(text)
        newObj = data['priority']
        for k in data:
            resp = self.performCalculation(k)
            finalResp.append(resp)

        log.info(" newObj ",newObj)
        log.info(" newObj ",newObj)
        outputStream.write(bytearray(finalResp.encode('utf-8')))

    def performCalculation(self,k):
            a = int(k['a'])
            b = int(k['b'])
            log.info(a)
            log.info(b)
            total=sum((a,b))
            log.info(total)
            x = {"x":total}
            k.update(x)
            return k

flowFile = session.get()
if flowFile != None:
    #flowFile = session.putAttribute(flowFile, "priority", "5")
    priority = FlowFileParser()
    session.write(flowFile,priority)
    flowFile = session.putAttribute(flowFile, "filename", str(priority))
    session.transfer(flowFile, REL_SUCCESS)

我的 json 文件有

  [{
    "a":1,
    "b":2,
    "id":1
},{
    "a":1,
    "b":2,
    "id":2
}]

标签: pythonsessionapache-nifi

解决方案


我改变了我的脚本,它现在工作正常。谢谢大家的建议

class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
            try:
                # Read input FlowFile content
                input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
                input = json.loads(input_text)
    
                # Transform content
                resp = input
                resp['total'] = resp['a'] + resp['b']
    
                # Write output content
                output_text = json.dumps(resp)
                outputStream.write(StringUtil.toBytes(output_text))
            except:
                traceback.print_exc(file=sys.stdout)
                raise
flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)

我使用此链接引用https://github.com/BatchIQ/nifi-scripting-samples


推荐阅读