首页 > 解决方案 > 将 psycopg2 光标传递给 tweepy on_status() 方法

问题描述

我正在尝试将 psycopg2 光标传递给 tweepy 流。

连接池和游标在单独的文件中配置。只有游标作为参数传递给另一个名为get_tweet_topic.py. 我需要方法中的光标,on_status()因为我有一个查询需要它来执行。

我不知道如何将光标传递到类on_status()中的方法上MyStreamListener()

我得到的错误是:

2020-03-05T22:16:24.856945+00:00 app[worker.1]: self._target(*self._args, **self._kwargs)
2020-03-05T22:16:24.856945+00:00 app[worker.1]: File "/app/get_tweet_topic.py", line 81, in guess_topic_pipeline
2020-03-05T22:16:24.856946+00:00 app[worker.1]: status_streams.streaming_pipeline(api, cursor)
2020-03-05T22:16:24.856947+00:00 app[worker.1]: File "/app/status_streams.py", line 100, in streaming_pipeline
2020-03-05T22:16:24.856947+00:00 app[worker.1]: general_stream(api, cursor)
2020-03-05T22:16:24.856948+00:00 app[worker.1]: File "/app/status_streams.py", line 86, in general_stream
2020-03-05T22:16:24.856948+00:00 app[worker.1]: myStreamListener = MyStreamListener()
2020-03-05T22:16:24.856948+00:00 app[worker.1]: TypeError: __init__() missing 1 required positional argument: 'cursor'

代码:

状态流.py:

import tweepy
import os

import db_queries
import follow

#define class for the stream listener
class MyStreamListener(tweepy.StreamListener):

    def __init__(self, cursor):
        super().__init__()
        self.cursor = cursor
        #set counter to only get 1200 tweets
        self.counter = 0
        self.max = 1200

    #get tweets
    def on_status(self, status):
        if not status.retweeted:
            status_dict = {'created_at': status.created_at.strftime('%y-%m-&d %H:%M'),
                    'source_stream': 'general stream',
                    'status_id': status.id_str,
                    'user_id': status.user.id_str,
                    'screen_name': status.user.name,
                    'tweet_text': status.text,
                    'num_likes': status.favorite_count,
                    'num_retweets': status.retweet_count}

            created_at = status_dict['created_at']
            source_stream = status_dict['source_stream']
            status_id = status_dict['status_id']
            user_id = status_dict['user_id']
            screen_name = status_dict['screen_name']
            tweet_text = status_dict['tweet_text']
            num_likes = status_dict['num_likes']
            num_retweets = status_dict['num_retweets']

            db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)

        self.counter +=1
        if self.counter == self.max:
            return False


#get tweets from list of followers
def following_stream(api, cursor, user_name):
    try:
        for status in tweepy.Cursor(api.user_timeline, tweet_mode='extended', include_rts=False, screen_name=user_name).items(1):
            #ignore retweets
            if not status.retweeted:
                status_dict = {'created_at': status.created_at.strftime('%y-%m-%d %H:%M'),
                               'source_stream': 'following stream',
                               'status_id': status.id_str,
                               'user_id': status.user.id_str,
                               'screen_name': status.user.name,
                               'tweet_text':status.full_text,
                               'num_likes':status.favorite_count,
                               'num_retweets':status.retweet_count}

                created_at = status_dict['created_at']
                source_stream = status_dict['source_stream']
                status_id = status_dict['status_id']
                user_id = status_dict['user_id']
                screen_name = status_dict['screen_name']
                tweet_text = status_dict['tweet_text']
                num_likes = status_dict['num_likes']
                num_retweets = status_dict['num_retweets']

                db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)


#function that controls both streams
def streaming_pipeline(api, cursor):
    #get list of all users that are currently followed
    #iterate through the following_list and grab the single latest tweet
    following_list = follow.get_following(api)
    for user in following_list:
        f_stream = following_stream(api, cursor, user)

    #stream class is used here
    myStreamListener = MyStreamListener()
    stream = tweepy.Stream(auth=api.auth, listener=myStreamListener(cursor=self.cursor))
    stream.filter(languages=['en'], track=['the'])


    cursor.close()

get_tweet_topic.py 的相关部分:

def guess_topic_pipeline(api, conn, model, corpus, classifier):

    while True:
        cursor = conn.cursor()
        db_queries.create_temp_tweets_table(cursor)
        conn.commit()

        #use pipeline to grab tweets off twitter
        print('Retrieving statuses from streams...')
        status_streams.streaming_pipeline(api, cursor)
        print('Done retrieving...')

连接池代码的相关部分:

        #get connection from pool, pass cursor as an argument, start topic extration thread
        topic_conn = conn_pool.getconn()
        topic_extraction_thread = Thread(target=get_tweet_topic.guess_topic_pipeline, kwargs={'api':api, 'conn': topic_conn, 'model': lda_model, 'corpus': lda_id2word, 'classifier': lda_huber_classifier})
        topic_extraction_thread.start()
        #return connection when done
        conn_pool.putconn(topic_conn)

带有实际查询的 insert_raw_tweets_table() 函数:

def insert_raw_tweets_table(cursor, createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets):
    cursor.execute('INSERT INTO tempTweets(createdAt, sourceStream, statusID, userID, screenName, tweetText) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)', (createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets))

标签: pythonherokutwitterpsycopg2tweepy

解决方案


@MauriceMeyer 在评论中回答了这个问题,但为了清楚起见,这里是工作代码。

我忘记在类中引用游标,self.cursor并且在创建类的实例时忘记将游标作为参数传递。创建实例后,我将光标作为参数传递,这是不正确的。

正确代码:

class MyStreamListener(tweepy.StreamListener):

    def __init__(self, cursor):
        super().__init__()
        self.cursor = cursor
        #set counter to only get 1200 tweets
        self.counter = 0
        self.max = 1200

    #get tweets
    def on_status(self, status):
        if not status.retweeted:
            status_dict = {'created_at' : status.created_at.strftime('%y-%m-&d %H:%M'),
                           'source_stream' : 'general stream',
                           'status_id' : status.id_str,
                           'user_id' : status.user.id_str,
                           'screen_name' : status.user.name,
                           'tweet_text' : status.text,
                           'num_likes' : status.favorite_count,
                           'num_retweets' : status.retweet_count}

            created_at = status_dict['created_at']
            source_stream = status_dict['source_stream']
            status_id = status_dict['status_id']
            user_id = status_dict['user_id']
            screen_name = status_dict['screen_name']
            tweet_text = status_dict['tweet_text']
            num_likes = status_dict['num_likes']
            num_retweets = status_dict['num_retweets']

                                                   #▼ reference self.cursor here
            db_queries.insert_raw_tweets_table(self.cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)

        self.counter +=1
        if self.counter == self.max:
            return False




#stream class is used here                ▼ reference cursor here
    myStreamListener = MyStreamListener(cursor)
                                                                 #▼ removed reference to cursor here
    stream = tweepy.Stream(auth=api.auth, listener=myStreamListener)
    stream.filter(languages=['en'], track=['the'])

推荐阅读