首页 > 解决方案 > 如何使用 Tweepy 通过带有语言和计数过滤器的主题标签流式传输推文?

问题描述

所以我想做的是来自 Twitters API 的直播推文:仅针对主题标签“Brexit”,仅使用英语,以及特定数量的推文(1k - 2k)。

到目前为止,我的代码将实时流式传输推文,但无论我以何种方式修改它,我要么最终忽略计数而只是无限期地流式传输,要么我得到错误。如果我将其更改为仅流式传输特定用户的推文,则计数功能可以工作,但它会忽略主题标签。如果我为给定的主题标签流式传输所有内容,它会完全忽略计数。我在尝试修复它方面做得很好,但我很缺乏经验,并且真的碰到了一堵砖墙。

如果我能在如何同时勾选所有这些方框方面获得一些帮助,将不胜感激!到目前为止,下面的代码只会无限期地流式传输“Brexit”推文,因此忽略 count=10

由于我在玩它,代码的底部有点乱,道歉:

import numpy as np
import pandas as pd
import tweepy
from tweepy import API
from tweepy import Cursor
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import Twitter_Credentials
import matplotlib.pyplot as plt

# Twitter client - hash out to stream all


class TwitterClient:
    def __init__(self, twitter_user=None):
        self.auth = TwitterAuthenticator().authenticate_twitter_app()
        self.twitter_client = API(self.auth)

        self.twitter_user = twitter_user

    def get_twitter_client_api(self):
        return self.twitter_client

# Twitter authenticator


class TwitterAuthenticator:
    def authenticate_twitter_app(self):
        auth = OAuthHandler(Twitter_Credentials.consumer_key, Twitter_Credentials.consumer_secret)
        auth.set_access_token(Twitter_Credentials.access_token, Twitter_Credentials.access_secret)
        return auth

class TwitterStreamer():
    # Class for streaming and processing live Tweets
    def __init__(self):
        self.twitter_authenticator = TwitterAuthenticator()

    def stream_tweets(self, fetched_tweets_filename, hash_tag_list):

        # this handles Twitter authentication and connection to Twitter API
        listener = TwitterListener(fetched_tweets_filename)
        auth = self.twitter_authenticator.authenticate_twitter_app()
        stream = Stream(auth, listener)
        # This line filters Twitter stream to capture data by keywords
        stream.filter(track=hash_tag_list)

# Twitter stream listener

class TwitterListener(StreamListener):
    # This is a listener class that prints incoming Tweets to stdout
    def __init__(self, fetched_tweets_filename):
        self.fetched_tweets_filename = fetched_tweets_filename

    def on_data(self, data):
        try:
            print(data)
            with open(self.fetched_tweets_filename, 'a') as tf:
                tf.write(data)
            return True
        except BaseException as e:
            print("Error on_data: %s" % str(e))
        return True

    def on_error(self, status):
        if status == 420:
            # Return false on data in case rate limit occurs
            return False
        print(status)

class TweetAnalyzer():
    # Functionality for analysing and categorising content from tweets

    def tweets_to_data_frame(self, tweets):
        df = pd.DataFrame(data=[tweet.text for tweet in tweets], columns=['tweets'])

        df['id'] = np.array([tweet.id for tweet in tweets])
        df['len'] = np.array([len(tweet.text) for tweet in tweets])
        df['date'] = np.array([tweet.created_at for tweet in tweets])
        df['source'] = np.array([tweet.source for tweet in tweets])
        df['likes'] = np.array([tweet.favorite_count for tweet in tweets])
        df['retweets'] = np.array([tweet.retweet_count for tweet in tweets])

        return df


if __name__ == "__main__":

    auth = OAuthHandler(Twitter_Credentials.consumer_key, Twitter_Credentials.consumer_secret)
    auth.set_access_token(Twitter_Credentials.access_token, Twitter_Credentials.access_secret)
    api = tweepy.API(auth)

    for tweet in Cursor(api.search, q="#brexit", count=10,
                               lang="en",
                               since="2019-04-03").items():
        fetched_tweets_filename = "tweets.json"
        twitter_streamer = TwitterStreamer()
        hash_tag_list = ["Brexit"]
        twitter_streamer.stream_tweets(fetched_tweets_filename, hash_tag_list)

标签: pythonpandasnumpytwittertweepy

解决方案


您正在尝试使用两种不同的方法来访问 Twitter API - 流是实时的,而搜索是一次性的 API 调用。

由于流是连续实时的,因此无法对其应用结果计数 - 代码只是打开一个连接,说“嘿,从现在开始向我发送所有包含hash_tag_list”的推文,然后坐下来听。然后,您进入StreamListener,对于收到的每条推文,您将它们写入文件。

可以在此处应用一个计数器,但您需要将它包装在您的StreamListener on_data处理程序中,并为收到的每条推文增加计数器。当你达到 1000 条推文时,请停止收听。

对于搜索选项,您有几个问题……第一个问题是您自 2019 年以来一直在请求推文,但标准搜索 API 只能返回 7 天。你显然只要求了 10 条推文。但是,按照您编写该方法的方式,实际发生的情况是,对于 API 返回的 10 个集合中的每个 Tweet,您创建一个实时流连接开始侦听和写入文件。所以这行不通。

您需要选择一个 - 要么搜索 1000 条推文并将它们写入文件(从不设置TwitterStreamer()),要么收听 1000 条推文并将它们写入文件(放下for Tweet in Cursor(api.search...并直接跳转到流媒体)。


推荐阅读