python - 在 Python 中使用 Nifi ExecuteScript 将附件上传到 Confluence
问题描述
我正在尝试使用 Nifi 的 ExecuteScript 处理器将 PDF 文件上传到 Confluence。我可以成功上传文件,但是当我下载并打开它时,它是空白的。我的转换一定有问题。谁能帮忙查一下?
所以这就是我的做法:
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile)return
def text = ''
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
flowFile = session?.putAttribute(flowFile, "file_content", text)
session.transfer(flowFile, /*ExecuteScript.*/ REL_SUCCESS)
3. ExecuteScript Python - 将 PDF 文件上传到 Confluence
这是我的#3代码。我认为这里有问题-->
import json
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
from org.apache.nifi.processor.io import OutputStreamCallback
class OutputWrite(OutputStreamCallback):
def __init__(self, obj):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
url = 'https://myconfluence.com/rest/api/content/12345/child/attachment'
auth = 'myauthorization'
file_name = 'mypdf.pdf'
file_content = flowFile.getAttribute('file_content')
s = requests.Session()
m = MultipartEncoder(fields={'file': (file_name, file_content, 'application/pdf')})
headers = {"X-Atlassian-Token":"nocheck", "Authorization":auth, "Content-Type":m.content_type}
r = s.post(url, data=m, headers=headers, verify=False)
session.write(flowFile, OutputWrite(json.loads(r.text)))
session.transfer(flowFile, REL_SUCCESS)
session.commit()
2019 年 6 月 28 日更新
我决定听从彼得的建议并合并代码 1 和 2。它仍然无法正常工作。以前,PDF 文件是 2MB,但它是空白的。现在,它的大小是 0KB。任何帮助将不胜感激!
import json
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
from org.apache.nifi.processor.io import OutputStreamCallback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback
class PyInputStreamCallback(InputStreamCallback):
def __init__(self):
pass
def process(self, inputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
class OutputWrite(OutputStreamCallback):
def __init__(self, obj):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf-8')))
text = ''
flowFile = session.get()
if(flowFile != None):
session.read(flowFile, PyInputStreamCallback())
confluence_attachment_api = flowFile.getAttribute('confluence_attachment_api')
confluence_authorization = flowFile.getAttribute('confluence_authorization')
file_name = flowFile.getAttribute('file_name')
s = requests.Session()
m = MultipartEncoder(fields={'file': (file_name, text, 'application/pdf')})
headers = {"X-Atlassian-Token":"nocheck", "Authorization":confluence_authorization, "Content-Type":m.content_type}
r = s.post(confluence_attachment_api, data=m, headers=headers, verify=False)
session.write(flowFile, OutputWrite(json.loads(r.text)))
session.transfer(flowFile, REL_SUCCESS)
session.commit()
解决方案
看起来您实际上并未发送 FlowFile 内容。相反,您发送的是一个名为file_content
文件内容的属性,这可能不是您想要的
您需要执行 asession.read
来获取文件流。下面的代码不能按原样工作,但显示了如何访问流。
class PyInputStreamCallback(InputStreamCallback):
def __init__(self):
pass
def process(self, inputStream):
m = MultipartEncoder(fields={'file': (file_name, inputStream, 'application/pdf')})
session.read(flowFile, PyInputStreamCallback())
参考:https ://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html
推荐阅读
- python - 有没有办法在 python 中打印出 JSON 文件的某些元素?
- xml - 当其中有叶子引用节点时,如何将有效的 YANG 实例数据放入格式良好的 XML 文档中?
- c# - 我为我的游戏制作了一个跳转脚本,但它不会跳转
- postgresql - 使用 plpgsql 函数创建物化视图时出现子事务错误
- .htaccess - 如何使用 htaccess 阻止部分 url?
- console - JetBrains 控制台中的自动完成
- r - r 在 2 列上滚动自定义函数
- android - 如何使用 BiometricPrompt 和 CryptoObject 同时获得解密和加密的 KeyStore 访问权限?
- ruby-on-rails - 日期时间对象的联合数组
- iis - 特定于应用程序的权限设置不授予本地激活权限