首页 > 解决方案 > 如何为 Twitter API v2 最近搜索循环我的 python 代码?

问题描述

我对python很陌生,所以我正在寻求解决这个问题的帮助。我的目标是收集大约 10,000 条包含图像的推文并将其保存到 csv 文件中。由于 Twitter 的速率限制是每 15 分钟 450 个请求,理想情况下我想自动化这个过程。我看到的指南只使用了 tweepy 模块,但由于我不太了解它,所以我使用了 Twitter 上给出的示例 python 代码:

import requests
import pandas as pd
import os
import json

# To set your enviornment variables in your terminal run the following line:
os.environ['BEARER_TOKEN']=''


def auth():
    return os.environ.get("BEARER_TOKEN")


def create_url():
    query = "has:images lang:en -is:retweet"
    tweet_fields = "tweet.fields=attachments,created_at,author_id"
    expansions = "expansions=attachments.media_keys"
    media_fields = "media.fields=media_key,preview_image_url,type,url"
    max_results = "max_results=100"
    url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
        query, tweet_fields, expansions, media_fields, max_results
    )
    return url


def create_headers(bearer_token):
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers


def connect_to_endpoint(url, headers):
    response = requests.request("GET", url, headers=headers)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()

def save_json(file_name, file_content):
    with open(file_name, 'w', encoding='utf-8') as write_file:
        json.dump(file_content, write_file, sort_keys=True, ensure_ascii=False, indent=4)

def main():
    bearer_token = auth()
    url = create_url()
    headers = create_headers(bearer_token)
    json_response = connect_to_endpoint(url, headers)
    
    #Save the data as a json file
    #save_json('collected_tweets.json', json_response)
    
    #save tweets as csv
    #df = pd.json_normalize(data=json_response)
    df1 = pd.DataFrame(json_response['data'])
    df1.to_csv('tweets_data.csv', mode="a")
    df2 = pd.DataFrame(json_response['includes'])
    df2.to_csv('tweets_includes_media.csv', mode="a")
    print(json.dumps(json_response['meta'], sort_keys=True, indent=4))

if __name__ == "__main__":
    main()

我应该如何更改此代码以使其在 Twitter 的 v2 速率限制内循环,或者使用 tweepy 会更好吗?

作为旁注,我确实意识到我的保存为 csv 的代码有问题,但这是我现在能做的最好的事情。

标签: pythontwitter

解决方案


这里有几件事要记住。

  • Tweepy 尚未更新以使用 Twitter 的 API (V2) 的新版本,因此您在 Twitter 文档中找到的大部分时间可能与 Tweepy 提供的内容不符。Tweepy 在 V1 中仍然可以很好地工作,但是,某些推文匹配功能可能会有所不同,您只需要小心。
  • 鉴于您提到的目标,尚不清楚您是否要使用“最近搜索”端点。例如,使用sample stream开始 1% 的流可能更容易。这是该端点的Twitter 示例代码。这样做的主要好处是,您可以在“后台”(参见下面的注释)中运行它,并带有一个条件,一旦您收集了 10k 条推文,就会终止该过程。这样,您就不必担心达到推文限制 - 默认情况下,Twitter 将您限制为查询量的约 1%(在您的情况下,"has:images lang:en -is:retweet"),并且只是实时收集这些推文。如果您想获取完整记录对于两个时间段之间的非转推英文推文,您需要将这些时间点添加到您的查询中,然后按照您上面的要求管理限制。查看start_time并查看 API参考文档end_time

注意:要在后台运行脚本,请编写程序,然后nohup nameofstreamingcode.py > logfile.log 2>&1 &从终端执行它。任何正常的终端输出(即打印行和/或错误)都将被写入一个名为 的新文件中logfile.log&命令末尾的之后)。

  • connect_to_endpoint(url, headers)要使用最近搜索端点,您需要向函数中添加大量。此外,您可以使用另一个函数pause_until,它是为我正在开发的 Twitter V2 API 包编写的(函数代码链接)。
def connect_to_endpoint(url, headers):
    response = requests.request("GET", url, headers=headers)

    # Twitter returns (in the header of the request object) how many
    # requests you have left. Lets use this to our advantage
    remaining_requests = int(response.headers["x-rate-limit-remaining"])
    
    # If that number is one, we get the reset-time
    #   and wait until then, plus 15 seconds (your welcome Twitter).
    # The regular 429 exception is caught below as well,
    #   however, we want to program defensively, where possible.
    if remaining_requests == 1:
        buffer_wait_time = 15
        resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
        print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
        pause_until(resume_time)  ## Link to this code in above answer

    # We still may get some weird errors from Twitter.
    # We only care about the time dependent errors (i.e. errors
    #   that Twitter wants us to wait for).
    # Most of these errors can be solved simply by waiting
    #   a little while and pinging Twitter again - so that's what we do.
    if response.status_code != 200:

        # Too many requests error
        if response.status_code == 429:
            buffer_wait_time = 15
            resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
            print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
            pause_until(resume_time)  ## Link to this code in above answer

        # Twitter internal server error
        elif response.status_code == 500:
            # Twitter needs a break, so we wait 30 seconds
            resume_time = datetime.now().timestamp() + 30
            print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
            pause_until(resume_time)  ## Link to this code in above answer

        # Twitter service unavailable error
        elif response.status_code == 503:
            # Twitter needs a break, so we wait 30 seconds
            resume_time = datetime.now().timestamp() + 30
            print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
            pause_until(resume_time)  ## Link to this code in above answer

        # If we get this far, we've done something wrong and should exit
        raise Exception(
            "Request returned an error: {} {}".format(
                response.status_code, response.text
            )
        )

    # Each time we get a 200 response, lets exit the function and return the response.json
    if response.ok:
        return response.json()

由于完整的查询结果将远大于您在每次查询时请求的 100 条推文,因此您需要在更大的查询中跟踪您的位置。这是通过next_token.

要获得next_token,其实很容易。只需从响应中的元字段中获取它。为了清楚起见,您可以像这样使用上述功能......

# Get response
response = connect_to_endpoint(url, headers)
# Get next_token
next_token = response["meta"]["next_token"]

然后需要在查询详细信息中传递此令牌,这些详细信息包含在您使用create_url()函数创建的 url 中。这意味着您还需要将您的create_url()功能更新为如下所示...

def create_url(pagination_token=None):
    query = "has:images lang:en -is:retweet"
    tweet_fields = "tweet.fields=attachments,created_at,author_id"
    expansions = "expansions=attachments.media_keys"
    media_fields = "media.fields=media_key,preview_image_url,type,url"
    max_results = "max_results=100"
    if pagination_token == None:
        url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
            query, tweet_fields, expansions, media_fields, max_results
        )
    else:
        url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}&{}".format(
            query, tweet_fields, expansions, media_fields, max_results, pagination_token
        )
    return url

更改上述功能后,您的代码应按以下方式流动。

  1. 发出请求
  2. next_token_response["meta"]["next_token"]
  3. 更新查询参数以next_token包含create_url()
  4. 冲洗并重复直到:
    1. 你会收到 10k 条推文
    2. 查询停止

最后一点:我不会尝试使用 pandas 数据框来编写您的文件。我会创建一个空列表,将每个新查询的结果附加到该列表,然后将字典对象的最终列表写入 json 文件(有关详细信息,请参阅此问题)。我已经了解到原始推文和熊猫数据框并不能很好地发挥作用。习惯 json 对象和字典的工作方式要好得多。


推荐阅读