python - 将 psycopg2 光标传递给 tweepy on_status() 方法
问题描述
我正在尝试将 psycopg2 光标传递给 tweepy 流。
连接池和游标在单独的文件中配置。只有游标作为参数传递给另一个名为get_tweet_topic.py
. 我需要方法中的光标,on_status()
因为我有一个查询需要它来执行。
我不知道如何将光标传递到类on_status()
中的方法上MyStreamListener()
。
我得到的错误是:
2020-03-05T22:16:24.856945+00:00 app[worker.1]: self._target(*self._args, **self._kwargs)
2020-03-05T22:16:24.856945+00:00 app[worker.1]: File "/app/get_tweet_topic.py", line 81, in guess_topic_pipeline
2020-03-05T22:16:24.856946+00:00 app[worker.1]: status_streams.streaming_pipeline(api, cursor)
2020-03-05T22:16:24.856947+00:00 app[worker.1]: File "/app/status_streams.py", line 100, in streaming_pipeline
2020-03-05T22:16:24.856947+00:00 app[worker.1]: general_stream(api, cursor)
2020-03-05T22:16:24.856948+00:00 app[worker.1]: File "/app/status_streams.py", line 86, in general_stream
2020-03-05T22:16:24.856948+00:00 app[worker.1]: myStreamListener = MyStreamListener()
2020-03-05T22:16:24.856948+00:00 app[worker.1]: TypeError: __init__() missing 1 required positional argument: 'cursor'
代码:
状态流.py:
import tweepy
import os
import db_queries
import follow
#define class for the stream listener
class MyStreamListener(tweepy.StreamListener):
def __init__(self, cursor):
super().__init__()
self.cursor = cursor
#set counter to only get 1200 tweets
self.counter = 0
self.max = 1200
#get tweets
def on_status(self, status):
if not status.retweeted:
status_dict = {'created_at': status.created_at.strftime('%y-%m-&d %H:%M'),
'source_stream': 'general stream',
'status_id': status.id_str,
'user_id': status.user.id_str,
'screen_name': status.user.name,
'tweet_text': status.text,
'num_likes': status.favorite_count,
'num_retweets': status.retweet_count}
created_at = status_dict['created_at']
source_stream = status_dict['source_stream']
status_id = status_dict['status_id']
user_id = status_dict['user_id']
screen_name = status_dict['screen_name']
tweet_text = status_dict['tweet_text']
num_likes = status_dict['num_likes']
num_retweets = status_dict['num_retweets']
db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)
self.counter +=1
if self.counter == self.max:
return False
#get tweets from list of followers
def following_stream(api, cursor, user_name):
try:
for status in tweepy.Cursor(api.user_timeline, tweet_mode='extended', include_rts=False, screen_name=user_name).items(1):
#ignore retweets
if not status.retweeted:
status_dict = {'created_at': status.created_at.strftime('%y-%m-%d %H:%M'),
'source_stream': 'following stream',
'status_id': status.id_str,
'user_id': status.user.id_str,
'screen_name': status.user.name,
'tweet_text':status.full_text,
'num_likes':status.favorite_count,
'num_retweets':status.retweet_count}
created_at = status_dict['created_at']
source_stream = status_dict['source_stream']
status_id = status_dict['status_id']
user_id = status_dict['user_id']
screen_name = status_dict['screen_name']
tweet_text = status_dict['tweet_text']
num_likes = status_dict['num_likes']
num_retweets = status_dict['num_retweets']
db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)
#function that controls both streams
def streaming_pipeline(api, cursor):
#get list of all users that are currently followed
#iterate through the following_list and grab the single latest tweet
following_list = follow.get_following(api)
for user in following_list:
f_stream = following_stream(api, cursor, user)
#stream class is used here
myStreamListener = MyStreamListener()
stream = tweepy.Stream(auth=api.auth, listener=myStreamListener(cursor=self.cursor))
stream.filter(languages=['en'], track=['the'])
cursor.close()
get_tweet_topic.py 的相关部分:
def guess_topic_pipeline(api, conn, model, corpus, classifier):
while True:
cursor = conn.cursor()
db_queries.create_temp_tweets_table(cursor)
conn.commit()
#use pipeline to grab tweets off twitter
print('Retrieving statuses from streams...')
status_streams.streaming_pipeline(api, cursor)
print('Done retrieving...')
连接池代码的相关部分:
#get connection from pool, pass cursor as an argument, start topic extration thread
topic_conn = conn_pool.getconn()
topic_extraction_thread = Thread(target=get_tweet_topic.guess_topic_pipeline, kwargs={'api':api, 'conn': topic_conn, 'model': lda_model, 'corpus': lda_id2word, 'classifier': lda_huber_classifier})
topic_extraction_thread.start()
#return connection when done
conn_pool.putconn(topic_conn)
带有实际查询的 insert_raw_tweets_table() 函数:
def insert_raw_tweets_table(cursor, createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets):
cursor.execute('INSERT INTO tempTweets(createdAt, sourceStream, statusID, userID, screenName, tweetText) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)', (createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets))
解决方案
@MauriceMeyer 在评论中回答了这个问题,但为了清楚起见,这里是工作代码。
我忘记在类中引用游标,self.cursor
并且在创建类的实例时忘记将游标作为参数传递。创建实例后,我将光标作为参数传递,这是不正确的。
正确代码:
class MyStreamListener(tweepy.StreamListener):
def __init__(self, cursor):
super().__init__()
self.cursor = cursor
#set counter to only get 1200 tweets
self.counter = 0
self.max = 1200
#get tweets
def on_status(self, status):
if not status.retweeted:
status_dict = {'created_at' : status.created_at.strftime('%y-%m-&d %H:%M'),
'source_stream' : 'general stream',
'status_id' : status.id_str,
'user_id' : status.user.id_str,
'screen_name' : status.user.name,
'tweet_text' : status.text,
'num_likes' : status.favorite_count,
'num_retweets' : status.retweet_count}
created_at = status_dict['created_at']
source_stream = status_dict['source_stream']
status_id = status_dict['status_id']
user_id = status_dict['user_id']
screen_name = status_dict['screen_name']
tweet_text = status_dict['tweet_text']
num_likes = status_dict['num_likes']
num_retweets = status_dict['num_retweets']
#▼ reference self.cursor here
db_queries.insert_raw_tweets_table(self.cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)
self.counter +=1
if self.counter == self.max:
return False
#stream class is used here ▼ reference cursor here
myStreamListener = MyStreamListener(cursor)
#▼ removed reference to cursor here
stream = tweepy.Stream(auth=api.auth, listener=myStreamListener)
stream.filter(languages=['en'], track=['the'])
推荐阅读
- python - pip 无法使用 Python 3.8.3 在 Alpine 3.12.0 上编译 wxPython 4.0.7+
- python - 使用熊猫在循环中创建多图?
- c# - Vsto 问题无法访问名称范围名称属性
- c - 在为 Apple Silicon (ARM) 处理器开发应用程序时,Swift 和 C 开发人员会遇到哪些问题?
- ios - iOS中使用谓词的小写模型对象
- php - 使用 AJAX 在 Wordpress Session 中设置选定的语言
- javascript - 我在我的 JS 代码中编写了 fadeOut 命令,在我的待办事项列表项目中,但是淡出不能正常工作并在谷歌控制台中显示错误?
- git - 跳过/失败运行测试时如何在 github-actions 中失败作业
- excel - 如何在非连续 ForEach 循环中使用变量
- google-maps - Flutter - 如何避免在路线之间重新加载谷歌地图?