首页 > 解决方案 > AWS Lambda 中的 Tweepy search_tweets

问题描述

我想每 10 分钟获取最后 1000 条带有“比特币、比特币、卡尔达诺和 ADA”标签的推文。为此,我将在 AWS Lambda 函数中使用 Twitter API 和包“Tweepy”。

在这里你可以看到我的代码:

import pandas as pd
import sqlalchemy
import tweepy
import psycopg2
import os

# Credentials for Database connection
ENDPOINT = os.environ['ENDPOINT']
DB_NAME = os.environ['DBNAME']
USERNAME = os.environ['USERNAME']
PASSWORD = os.environ['PASSWORD']

# Credentials for Twitter-API
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCESS_TOKEN_SECRET = os.environ['ACCESS_TOKEN_SECRET']
CONSUMER_KEY = os.environ['CONSUMER_KEY']
CONSUMER_SECRET = os.environ['CONSUMER_KEY_SECRET']
BEARER_TOKEN = os.environ['BEARER_TOKEN']

def lambda_handler(event, context):
    
    # Build Twitter API
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN,ACCESS_TOKEN_SECRET)
    api = tweepy.API(auth, wait_on_rate_limit=True)
    
    # Get Twitter Data and prepare for Upload
    lst_time = []
    lst_text = []
    lst_like = []
    lst_retweet = []


    for i in tweepy.Cursor(api.search_tweets, q='Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets' ).items(10):
        lst_text.append(i.text)
        lst_like.append(i.favorite_count)
        lst_retweet.append(i.retweet_count)
        lst_time.append(i.created_at)

    
    # Prepare DataFrame for Upload to DataBase
    df = pd.DataFrame(list(zip(lst_time, lst_text, lst_like,lst_retweet)), columns = ['time', 'text', 'like', 'retweet'])
    
    df['date_new'] = pd.to_datetime(df['time']).dt.date
    df['time_new'] = pd.to_datetime(df['time']).dt.time
    
    df = df.drop(['time'], axis=1)
    
    
    
    # Build DB connection
    try:
        conn = psycopg2.connect("host={} dbname={} user={} password={}".format(ENDPOINT,DBNAME,USERNAME,PASSWORD))
    
    except psycopg2.Error as e:
        print("Error: Could not get Connection to DB")
        print(e)
    
    # Create DB Cursor
    try:
        cur = conn.cursor()
    except psycopg2.Error as e:
        print("Error: Could not get a cursor")
        print(e)
    
    # Set Autocommit
    conn.set_session(autocommit=True)
    
    # Create sqlalchemy engine
    engine = sqlalchemy.create_engine("postgresql://{}:{}@{}:5432/{}".format(USERNAME,PASSWORD,ENDPOINT,DBNAME))
    
    # Create Table in DB
    df.to_sql('Twitter_Keyword_search', engine, if_exists='append', index = False)
    
    # Close DB Connection
    cur.close()
    conn.close()
    
    print("success")
    

但是每次我想测试时,执行结果都是

Test Event Name
test

Response
{
  "errorMessage": "Failed to send request: HTTPSConnectionPool(host='api.twitter.com', port=443): Max retries exceeded with url: /1.1/search/tweets.json?q=Bitcoin+-filter%3Aretweets+OR+Cardano+-filter%3Aretweets+OR+ADA+-filter%3Aretweets+OR+BTC+-filter%3Aretweets (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f0e0fa0da90>, 'Connection to api.twitter.com timed out. (connect timeout=60)'))",
  "errorType": "TweepyException",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 34, in lambda_handler\n    for i in tweepy.Cursor(api.search_tweets, q='Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets' ).items(1000):\n",
    "  File \"/opt/python/tweepy/cursor.py\", line 86, in __next__\n    return self.next()\n",
    "  File \"/opt/python/tweepy/cursor.py\", line 286, in next\n    self.current_page = next(self.page_iterator)\n",
    "  File \"/opt/python/tweepy/cursor.py\", line 86, in __next__\n    return self.next()\n",
    "  File \"/opt/python/tweepy/cursor.py\", line 167, in next\n    data = self.method(max_id=self.max_id, parser=RawParser(), *self.args, **self.kwargs)\n",
    "  File \"/opt/python/tweepy/api.py\", line 33, in wrapper\n    return method(*args, **kwargs)\n",
    "  File \"/opt/python/tweepy/api.py\", line 46, in wrapper\n    return method(*args, **kwargs)\n",
    "  File \"/opt/python/tweepy/api.py\", line 1269, in search_tweets\n    ), q=q, **kwargs\n",
    "  File \"/opt/python/tweepy/api.py\", line 222, in request\n    raise TweepyException(f'Failed to send request: {e}').with_traceback(sys.exc_info()[2])\n",
    "  File \"/opt/python/tweepy/api.py\", line 219, in request\n    timeout=self.timeout, auth=auth, proxies=self.proxy\n",
    "  File \"/opt/python/requests/sessions.py\", line 542, in request\n    resp = self.send(prep, **send_kwargs)\n",
    "  File \"/opt/python/requests/sessions.py\", line 655, in send\n    r = adapter.send(request, **kwargs)\n",
    "  File \"/opt/python/requests/adapters.py\", line 504, in send\n    raise ConnectTimeout(e, request=request)\n"
  ]
}

Function Logs
START RequestId: e434987f-5204-42bf-8d54-8fd217aff1e0 Version: $LATEST
OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k
/opt/python/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
""")
[ERROR] TweepyException: Failed to send request: HTTPSConnectionPool(host='api.twitter.com', port=443): Max retries exceeded with url: /1.1/search/tweets.json?q=Bitcoin+-filter%3Aretweets+OR+Cardano+-filter%3Aretweets+OR+ADA+-filter%3Aretweets+OR+BTC+-filter%3Aretweets (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f0e0fa0da90>, 'Connection to api.twitter.com timed out. (connect timeout=60)'))
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 34, in lambda_handler
    for i in tweepy.Cursor(api.search_tweets, q='Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets' ).items(1000):
  File "/opt/python/tweepy/cursor.py", line 86, in __next__
    return self.next()
  File "/opt/python/tweepy/cursor.py", line 286, in next
    self.current_page = next(self.page_iterator)
  File "/opt/python/tweepy/cursor.py", line 86, in __next__
    return self.next()
  File "/opt/python/tweepy/cursor.py", line 167, in next
    data = self.method(max_id=self.max_id, parser=RawParser(), *self.args, **self.kwargs)
  File "/opt/python/tweepy/api.py", line 33, in wrapper
    return method(*args, **kwargs)
  File "/opt/python/tweepy/api.py", line 46, in wrapper
    return method(*args, **kwargs)
  File "/opt/python/tweepy/api.py", line 1269, in search_tweets
    ), q=q, **kwargs
  File "/opt/python/tweepy/api.py", line 222, in request
    raise TweepyException(f'Failed to send request: {e}').with_traceback(sys.exc_info()[2])
  File "/opt/python/tweepy/api.py", line 219, in request
    timeout=self.timeout, auth=auth, proxies=self.proxy
  File "/opt/python/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/opt/python/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/opt/python/requests/adapters.py", line 504, in send
    raise ConnectTimeout(e, request=request)
END RequestId: e434987f-5204-42bf-8d54-8fd217aff1e0
REPORT RequestId: e434987f-5204-42bf-8d54-8fd217aff1e0  Duration: 240254.49 ms  Billed Duration: 240255 ms  Memory Size: 1024 MB    Max Memory Used: 160 MB Init Duration: 2818.23 ms

Request ID
e434987f-5204-42bf-8d54-8fd217aff1e0

在互联网上搜索了几个小时后,我希望你能提供帮助。PS:对于那些通过阅读我的代码没有想到的人=>我是一个血腥的初学者

谢谢

标签: pythonpostgresqlamazon-web-servicesaws-lambdatweepy

解决方案


File "/var/task/lambda_function.py", line 34, in lambda_handler for i in tweepy.Cursor(api.search_tweets, q='Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets' ).items(1000):

....
raise ConnectTimeout(e, request=request)

因此,唯一重要的是脚本无法连接到 API 端点的原因。假设 Twitter 没有发生秘密中断,我首先要看的是脚本是否可以访问公共互联网上的任何内容。因此,我将首先部署一个简单的脚本,而不需要所有其他细节,它只是尝试GETexample.com. 如果这也出现超时错误(我怀疑它会发生),这意味着您没有为您的 lambdas 设置互联网连接以允许它们伸出援手。

在这种情况下,这不再是关于 Python 的问题。AWS 文档可能会有所帮助:“我如何为连接到 Amazon VPC 的 Lambda 函数提供 Internet 访问权限? ”,但确切的解决方案将取决于您组织的结构。


推荐阅读