python - 如何为 Twitter API v2 最近搜索循环我的 python 代码?
问题描述
我对python很陌生,所以我正在寻求解决这个问题的帮助。我的目标是收集大约 10,000 条包含图像的推文并将其保存到 csv 文件中。由于 Twitter 的速率限制是每 15 分钟 450 个请求,理想情况下我想自动化这个过程。我看到的指南只使用了 tweepy 模块,但由于我不太了解它,所以我使用了 Twitter 上给出的示例 python 代码:
import requests
import pandas as pd
import os
import json
# To set your enviornment variables in your terminal run the following line:
os.environ['BEARER_TOKEN']=''
def auth():
return os.environ.get("BEARER_TOKEN")
def create_url():
query = "has:images lang:en -is:retweet"
tweet_fields = "tweet.fields=attachments,created_at,author_id"
expansions = "expansions=attachments.media_keys"
media_fields = "media.fields=media_key,preview_image_url,type,url"
max_results = "max_results=100"
url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
query, tweet_fields, expansions, media_fields, max_results
)
return url
def create_headers(bearer_token):
headers = {"Authorization": "Bearer {}".format(bearer_token)}
return headers
def connect_to_endpoint(url, headers):
response = requests.request("GET", url, headers=headers)
print(response.status_code)
if response.status_code != 200:
raise Exception(response.status_code, response.text)
return response.json()
def save_json(file_name, file_content):
with open(file_name, 'w', encoding='utf-8') as write_file:
json.dump(file_content, write_file, sort_keys=True, ensure_ascii=False, indent=4)
def main():
bearer_token = auth()
url = create_url()
headers = create_headers(bearer_token)
json_response = connect_to_endpoint(url, headers)
#Save the data as a json file
#save_json('collected_tweets.json', json_response)
#save tweets as csv
#df = pd.json_normalize(data=json_response)
df1 = pd.DataFrame(json_response['data'])
df1.to_csv('tweets_data.csv', mode="a")
df2 = pd.DataFrame(json_response['includes'])
df2.to_csv('tweets_includes_media.csv', mode="a")
print(json.dumps(json_response['meta'], sort_keys=True, indent=4))
if __name__ == "__main__":
main()
我应该如何更改此代码以使其在 Twitter 的 v2 速率限制内循环,或者使用 tweepy 会更好吗?
作为旁注,我确实意识到我的保存为 csv 的代码有问题,但这是我现在能做的最好的事情。
解决方案
这里有几件事要记住。
- Tweepy 尚未更新以使用 Twitter 的 API (V2) 的新版本,因此您在 Twitter 文档中找到的大部分时间可能与 Tweepy 提供的内容不符。Tweepy 在 V1 中仍然可以很好地工作,但是,某些推文匹配功能可能会有所不同,您只需要小心。
- 鉴于您提到的目标,尚不清楚您是否要使用“最近搜索”端点。例如,使用sample stream开始 1% 的流可能更容易。这是该端点的Twitter 示例代码。这样做的主要好处是,您可以在“后台”(参见下面的注释)中运行它,并带有一个条件,一旦您收集了 10k 条推文,就会终止该过程。这样,您就不必担心达到推文限制 - 默认情况下,Twitter 将您限制为查询量的约 1%(在您的情况下,
"has:images lang:en -is:retweet"
),并且只是实时收集这些推文。如果您想获取完整记录对于两个时间段之间的非转推英文推文,您需要将这些时间点添加到您的查询中,然后按照您上面的要求管理限制。查看start_time
并查看 API参考文档end_time
。
注意:要在后台运行脚本,请编写程序,然后
nohup nameofstreamingcode.py > logfile.log 2>&1 &
从终端执行它。任何正常的终端输出(即打印行和/或错误)都将被写入一个名为 的新文件中logfile.log
,&
命令末尾的之后)。
connect_to_endpoint(url, headers)
要使用最近搜索端点,您需要向函数中添加大量。此外,您可以使用另一个函数pause_until
,它是为我正在开发的 Twitter V2 API 包编写的(函数代码链接)。
def connect_to_endpoint(url, headers):
response = requests.request("GET", url, headers=headers)
# Twitter returns (in the header of the request object) how many
# requests you have left. Lets use this to our advantage
remaining_requests = int(response.headers["x-rate-limit-remaining"])
# If that number is one, we get the reset-time
# and wait until then, plus 15 seconds (your welcome Twitter).
# The regular 429 exception is caught below as well,
# however, we want to program defensively, where possible.
if remaining_requests == 1:
buffer_wait_time = 15
resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
pause_until(resume_time) ## Link to this code in above answer
# We still may get some weird errors from Twitter.
# We only care about the time dependent errors (i.e. errors
# that Twitter wants us to wait for).
# Most of these errors can be solved simply by waiting
# a little while and pinging Twitter again - so that's what we do.
if response.status_code != 200:
# Too many requests error
if response.status_code == 429:
buffer_wait_time = 15
resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
pause_until(resume_time) ## Link to this code in above answer
# Twitter internal server error
elif response.status_code == 500:
# Twitter needs a break, so we wait 30 seconds
resume_time = datetime.now().timestamp() + 30
print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
pause_until(resume_time) ## Link to this code in above answer
# Twitter service unavailable error
elif response.status_code == 503:
# Twitter needs a break, so we wait 30 seconds
resume_time = datetime.now().timestamp() + 30
print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
pause_until(resume_time) ## Link to this code in above answer
# If we get this far, we've done something wrong and should exit
raise Exception(
"Request returned an error: {} {}".format(
response.status_code, response.text
)
)
# Each time we get a 200 response, lets exit the function and return the response.json
if response.ok:
return response.json()
由于完整的查询结果将远大于您在每次查询时请求的 100 条推文,因此您需要在更大的查询中跟踪您的位置。这是通过next_token
.
要获得next_token
,其实很容易。只需从响应中的元字段中获取它。为了清楚起见,您可以像这样使用上述功能......
# Get response
response = connect_to_endpoint(url, headers)
# Get next_token
next_token = response["meta"]["next_token"]
然后需要在查询详细信息中传递此令牌,这些详细信息包含在您使用create_url()
函数创建的 url 中。这意味着您还需要将您的create_url()
功能更新为如下所示...
def create_url(pagination_token=None):
query = "has:images lang:en -is:retweet"
tweet_fields = "tweet.fields=attachments,created_at,author_id"
expansions = "expansions=attachments.media_keys"
media_fields = "media.fields=media_key,preview_image_url,type,url"
max_results = "max_results=100"
if pagination_token == None:
url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
query, tweet_fields, expansions, media_fields, max_results
)
else:
url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}&{}".format(
query, tweet_fields, expansions, media_fields, max_results, pagination_token
)
return url
更改上述功能后,您的代码应按以下方式流动。
- 发出请求
- 从
next_token
_response["meta"]["next_token"]
- 更新查询参数以
next_token
包含create_url()
- 冲洗并重复直到:
- 你会收到 10k 条推文
- 查询停止
最后一点:我不会尝试使用 pandas 数据框来编写您的文件。我会创建一个空列表,将每个新查询的结果附加到该列表,然后将字典对象的最终列表写入 json 文件(有关详细信息,请参阅此问题)。我已经了解到原始推文和熊猫数据框并不能很好地发挥作用。习惯 json 对象和字典的工作方式要好得多。
推荐阅读
- reactjs - 动态添加表单时,下拉选择器选择应为空
- html - Integromat 匹配模式 - 查找单词 Fandoms 并提供相关字符串
- javascript - 如何在本地存储中追加购物车项目,Jquery
- r - R validate package var_group - 定义一个变量输入列表?
- android - 在反应原生android中隐藏和显示时,TouchableOpacity 在绝对视图中不起作用
- javascript - 为什么我的javascript函数在从不同文件调用时返回未定义
- c# - 如何在 .NET Core 中使用 Azure SignalR 向特定用户发送消息
- c# - 在 Visual Studio 中使用现有项目
- swift - 我要通过。通过结构从 2 vc 到第一个 vc 的文本字段数据
- leaflet - 如何在我的 Leaflet 地图中添加更改数据变量可视化的按钮?