首页 > 解决方案 > ExecuteScript Nifi 的好的 Python 语法是什么?

问题描述

我正在尝试使用 Nifi 使用流式推文。在处理器出现一些问题后,我选择使用处理器和tweepyInvokeHTTP来完成我的工作。ExecuteScript

我想在流媒体中获取推文,并一一将它们提供给我的 Nifi 流程的其余部分。

为此,我编写了以下脚本:

from tweepy import Stream, API
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import json

import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript

# All API keys / access token
consumer_key = "something"
consumer_secret_key = "something"
access_token = "something"
access_token_secret = "something"

# Initialize du proxy
proxies = {
    "http": "http_proxy",
    "https": "https_proxy"
}


# Listener class that contains all API functions for streaming tweets.
class Listener(StreamListener):

    def __init__(self, nifi_session):
        super(Listener, self).__init__() # Overwrite constructor for declare flowfile NiFi
        self.nifi_session = nifi_session

    def on_data(self, status):
        # Convert string to json
        data = json.loads(status)

        # extract relevant information, for example, we use user's description only
        description = data['user']['description']

        # Give all this parameters at nifi
        session.putAttribute(self.nifi_session, 'data', description)
        session.transfer(self.nifi_session, ExecuteScript.REL_SUCCESS)
        session.commit() # Commit to next nifi processor

    def on_error(self, status):
        if status == 420:
            return False


# Set flowfile NiFi
flowFile = session.get()

# Set OAuth with keys and tokens
auth = OAuthHandler(consumer_key, consumer_secret_key)
auth.set_access_token(access_token, access_token_secret)
api = API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

listener = Listener()
twitterStream = Stream(api.auth, listener=listener, proxies=proxies)

# Try/catch for be sure that stream will be disconnect after error
try:
    twitterStream.filter(track=['nasa'])
except Exception as e:
    print("Exception !")
finally:
    print("...end")
    twitterStream.disconnect()

twitterStream.disconnect()

使用此程序,Nifi 上不会发生任何事情。但是,我确实在没有flowfileNifi 的情况下session在其他环境中进行了测试,并且效果很好。我想问题在于我与 Nifi 通信的语法。

编辑:我不能使用GetTwitter处理器,因为它不支持代理,我必须使用我的。

感谢帮助!

标签: pythonapache-nifitweepytwitter-streaming-api

解决方案


推荐阅读