python - ExecuteScript Nifi 的好的 Python 语法是什么?
问题描述
我正在尝试使用 Nifi 使用流式推文。在处理器出现一些问题后,我选择使用处理器和tweepyInvokeHTTP
来完成我的工作。ExecuteScript
我想在流媒体中获取推文,并一一将它们提供给我的 Nifi 流程的其余部分。
为此,我编写了以下脚本:
from tweepy import Stream, API
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
# All API keys / access token
consumer_key = "something"
consumer_secret_key = "something"
access_token = "something"
access_token_secret = "something"
# Initialize du proxy
proxies = {
"http": "http_proxy",
"https": "https_proxy"
}
# Listener class that contains all API functions for streaming tweets.
class Listener(StreamListener):
def __init__(self, nifi_session):
super(Listener, self).__init__() # Overwrite constructor for declare flowfile NiFi
self.nifi_session = nifi_session
def on_data(self, status):
# Convert string to json
data = json.loads(status)
# extract relevant information, for example, we use user's description only
description = data['user']['description']
# Give all this parameters at nifi
session.putAttribute(self.nifi_session, 'data', description)
session.transfer(self.nifi_session, ExecuteScript.REL_SUCCESS)
session.commit() # Commit to next nifi processor
def on_error(self, status):
if status == 420:
return False
# Set flowfile NiFi
flowFile = session.get()
# Set OAuth with keys and tokens
auth = OAuthHandler(consumer_key, consumer_secret_key)
auth.set_access_token(access_token, access_token_secret)
api = API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
listener = Listener()
twitterStream = Stream(api.auth, listener=listener, proxies=proxies)
# Try/catch for be sure that stream will be disconnect after error
try:
twitterStream.filter(track=['nasa'])
except Exception as e:
print("Exception !")
finally:
print("...end")
twitterStream.disconnect()
twitterStream.disconnect()
使用此程序,Nifi 上不会发生任何事情。但是,我确实在没有flowfile
Nifi 的情况下session
在其他环境中进行了测试,并且效果很好。我想问题在于我与 Nifi 通信的语法。
编辑:我不能使用GetTwitter
处理器,因为它不支持代理,我必须使用我的。
感谢帮助!
解决方案
推荐阅读
- c# - 忽略自签名证书错误 c# UWP
- r - 在 R 中编写双系列
- angular - 如何通过代码导航到嵌套路由器
- azure - 在 AzureDataFactory 中,无法将容器中的 .wav 文件集合配置为“数据集”
- android - Android - 存储活动结果返回 null
- java - 没有从 RandomAccessFile java 得到正确的输出
- magento - 将多个名义/经常性产品添加到购物车
- machine-learning - 为什么 GridSearchCV 在拟合数据的同时克隆估计器?
- jenkins - “Webhook to Jenkins for Bitbucket Server”仅在手动执行时有效
- android-studio - 如何在 Android Studio 编辑器中更改参数名称指示器的主题颜色?