首页 > 解决方案 > 如何使用 Python 在 ExecuteScript 中操作两个 csv 流文件?

问题描述

在我的流程中,我查询 Hive,然后更新文件名,然后我想将这些 csv 合并到一个带有多个电子表格的 Excel 工作簿中。我能够使用此代码将两个 csv 文件合并到一个带有多个电子表格的 Excel 工作簿中。如何让脚本使用 nifi 流中的两个文件,而不是从我的电脑上的目录中提取文件?我已经看到可以执行“flowFile = session.get()”但是这条线是否捕获了两个流文件?

import glob
import csv
import xlwt
import os
import xlsxwriter
import datetime
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

wb = xlsxwriter.Workbook("combined_at%s.xlsx" %             datetime.datetime.now().strftime('%H-%M-%S'))

flowFile = session.get()

replacer = ",[]\"\"'\'"
worksheet = wb.add_worksheet("make")
worksheet2 = wb.add_worksheet("ownership")
worksheet3 = wb.add_worksheet("marital")
worksheet4 = wb.add_worksheet("drivers")
worksheet5 = wb.add_worksheet("vehicles")
worksheet6 = wb.add_worksheet("age")
worksheet7 = wb.add_worksheet("vyear")

def printHashedEmail(split_row, worksheet, index):
    for y in replacer:
        split_row[0] = split_row[0].replace(y, "")
    worksheet.write(index, 0, split_row[0])
    return;

def printOtherOnes(split_row, worksheet,index,non_changing_index):
    for y in replacer:
        split_row[non_changing_index] = split_row[non_changing_index].replace(y, "")
    worksheet.write(index, 1, split_row[non_changing_index])
    return;

with open("1.csv") as csv1:
    i = 0
    j = 0
    for row in csv1:
        split_row = row.split(",")
        if split_row[2] != "":
            printHashedEmail(split_row, worksheet, i)
            printOtherOnes(split_row,worksheet,i,2)
            i = i+1
        if split_row[3].strip() != "":
            printHashedEmail(split_row, worksheet2, j)
            printOtherOnes(split_row, worksheet2, j, 3)
            j = j+1


with open("2.csv") as csv1:
    i = 0; k = 0; j = 0; l = 0;m = 0;
    for row in csv1:
        split_row = row.split(",")
        if split_row[2] != "":
            printHashedEmail(split_row, worksheet3, i)
            printOtherOnes(split_row, worksheet3, i, 2)
            i = i + 1
        if split_row[3].strip() != "":
            printHashedEmail(split_row, worksheet4, j)
            printOtherOnes(split_row, worksheet4, j, 3)
            j = j + 1
        if split_row[5] != "":
            printHashedEmail(split_row, worksheet5, l)
            printOtherOnes(split_row, worksheet5, l, 5)
            l = l + 1
        if split_row[4].strip() != "":
            printHashedEmail(split_row, worksheet6, k)
            printOtherOnes(split_row, worksheet6, k, 4)
            k = k + 1
        if split_row[6].strip() != "":
            printHashedEmail(split_row,worksheet7,m)
            printOtherOnes(split_row, worksheet7, m, 6)
            m = m + 1

wb.close()

print("Done")

这是 Nifi 流

操作后,我希望 excel 文件退出 ExecuteScriptProcessor 以便我可以用它做更多事情

标签: pythoncsvapache-nifi

解决方案


检查不同的session.get()方法

例如session.get(2)将尝试从传入队列中获取 2 个第一个文件。

如果你只有一个,你可以打电话session.rollback()把它送回队列。

但是这里的问题是队列中的文件可能与您预期的顺序不同。想象一下,传入队列中有 3 个文件。

session.get(FlowFileFilter filter)您可以从与某些属性匹配的传入队列 2 文件中进行选择。


推荐阅读