python - 如何动态更改正在流式传输的推文类型并确定要发送给谁的消息?
问题描述
应用说明
因此,我正在尝试创建一个对推文进行实时情绪分析的应用程序(尽可能接近实时),并且这些推文必须基于用户输入。所以在我的应用程序的主页中,我有一个简单的搜索栏,用户可以在其中输入他们想要执行情绪分析的主题,当他们按下回车键时,它会将他们带到另一个页面,在那里他们会看到一个折线图显示所有数据都是实时的。
问题 1
我目前面临的第一个问题是,当两个或更多人提出请求时,我不知道如何让 tweepy 更改它正在跟踪的内容。如果我要在每次用户进行新查询时简单地断开并重新连接的全局流,那么它也会断开其他用户的连接,这是我不想要的。另一方面,如果我要为每个连接的用户分配一个流对象,那么这个策略应该有效。这仍然存在问题。鉴于此 StackOverflow 帖子,Twitter 似乎不允许您一次保持多个连接。
Tweepy 是否支持运行多个 Streams 来收集数据?
如果我仍然同意这一点,我就有可能被禁止我的 IP。所以这两种解决方案都不好。
问题 2
我遇到的最后一个问题是弄清楚消息属于谁。目前,我正在使用 RabbitMQ 将所有传入消息存储在一个名为twitter_topic_feed
. 对于我从 tweepy 收到的每条推文,我都会在该队列中发布它。然后 RabbiMQ 使用该消息并将其发送到每个可用的连接。显然,这种行为不是我想要的。考虑两个搜索披萨和运动的用户。当一个用户请求体育推文而另一个用户请求比萨推文时,两个用户都将收到与足球和比萨饼有关的推文。
一种想法是为每个可用连接创建一个具有唯一标识符的队列。标识符的形式为{Search Term}_{Hash ID}。为了生成哈希 ID,我可以使用 python 中可用的 UUID 包,并在连接打开时创建 ID,并在连接关闭时将其删除。当然,当他们关闭连接时,我还需要删除队列。我不确定这个解决方案的扩展性如何。如果我们有 10,000 个连接,我们将有 10,000 个队列,每个队列中可能存储大量消息。似乎它会非常占用内存。
设计
tornado Framework
对于 WebSocket,tweepy API
用于流式推文RabbitMQ
用于在 tweepy 收到新推文时将消息发布到队列。RabbitMQ 然后将使用该消息并将其发送到 WebSocket。
尝试(我目前所拥有的)
TweetStreamListener
使用 tweepy API 根据用户的输入来监听推文。无论收到什么推文,它都会计算该推文的极性并将其发布到 rabbitMQtwitter_topic_feed
队列。
import logging
from tweepy import StreamListener, OAuthHandler, Stream, API
from sentiment_analyzer import calculate_polarity_score
from constants import SETTINGS
auth = OAuthHandler(
SETTINGS["TWITTER_CONSUMER_API_KEY"], SETTINGS["TWITTER_CONSUMER_API_SECRET_KEY"])
auth.set_access_token(
SETTINGS["TWITTER_ACCESS_KEY"], SETTINGS["TWITTER_ACCESS_SECRET_KEY"])
api = API(auth, wait_on_rate_limit=True)
class TweetStreamListener(StreamListener):
def __init__(self):
self.api = api
self.stream = Stream(auth=self.api.auth, listener=self)
def start_listening(self):
pass
def on_status(self, status):
if not hasattr(status, 'retweeted_status'):
polarity = calculate_polarity_score(status.text)
message = {
'polarity': polarity,
'timestamp': status.created_at
}
# TODO(Luis) Need to figure who to send this message to.
logging.debug("Message received from Twitter: {0}".format(message))
# limit handling
def on_limit(self, status):
logging.info(
'Limit threshold exceeded. Status code: {0}'.format(status))
def on_timeout(self, status):
logging.error('Stream disconnected. continuing...')
return True # Don't kill the stream
"""
Summary: Callback that executes for any error that may occur. Whenever we get a 420 Error code, we simply
stop streaming tweets as we have reached our rate limit. This is due to making too many requests.
Returns: False if we are sending too many tweets, otherwise return true to keep the stream going.
"""
def on_error(self, status_code):
if status_code == 420:
logging.error(
'Encountered error code 420. Disconnecting the stream')
# returning False in on_data disconnects the stream
return False
else:
logging.error('Encountered error with status code: {}'.format(
status_code))
return True # Don't kill the stream
WS_Handler
负责维护一个打开的连接列表并将它收到的任何消息发送回每个客户端(这种行为是我不想要的)。
import logging
import json
from uuid import uuid4
from tornado.web import RequestHandler
from tornado.websocket import WebSocketHandler
class WSHandler(WebSocketHandler):
def check_origin(self, origin):
return True
@property
def sess_id(self):
return self._sess_id
def open(self):
self._sess_id = uuid4().hex
logging.debug('Connection established.')
self.application.pc.register_websocket(self._sess_id, self)
# When messages arrives via RabbitMQ, write it to websocket
def on_message(self, message):
logging.debug('Message received: {0}'.format(message))
self.application.pc.redirect_incoming_message(
self._sess_id, json.dumps(message))
def on_close(self):
logging.debug('Connection closed.')
self.application.pc.unregister_websocket(self._sess_id)
该PikaClient
模块包含 PikaClient,它将允许跟踪入站和出站通道以及跟踪当前运行的 websocket。
import logging
import pika
from constants import SETTINGS
from pika import PlainCredentials, ConnectionParameters
from pika.adapters.tornado_connection import TornadoConnection
pika.log = logging.getLogger(__name__)
class PikaClient(object):
INPUT_QUEUE_NAME = 'in_queue'
def __init__(self):
self.connected = False
self.connecting = False
self.connection = None
self.in_channel = None
self.out_channels = {}
self.websockets = {}
def connect(self):
if self.connecting:
return
self.connecting = True
# Setup rabbitMQ connection
credentials = PlainCredentials(
SETTINGS['RABBITMQ_USERNAME'], SETTINGS['RABBITMQ_PASSWORD'])
param = ConnectionParameters(
host=SETTINGS['RABBITMQ_HOST'], port=SETTINGS['RABBITMQ_PORT'], virtual_host='/', credentials=credentials)
return TornadoConnection(param, on_open_callback=self.on_connected)
def run(self):
self.connection = self.connect()
self.connection.ioloop.start()
def stop(self):
self.connected = False
self.connecting = False
self.connection.ioloop.stop()
def on_connected(self, unused_Connection):
self.connected = True
self.in_channel = self.connection.channel(self.on_conn_open)
def on_conn_open(self, channel):
self.in_channel.exchange_declare(
exchange='tornado_input', exchange_type='topic')
channel.queue_declare(
callback=self.on_input_queue_declare, queue=self.INPUT_QUEUE_NAME)
def on_input_queue_declare(self, queue):
self.in_channel.queue_bind(
callback=None, exchange='tornado_input', queue=self.INPUT_QUEUE_NAME, routing_key="#")
def register_websocket(self, sess_id, ws):
self.websockets[sess_id] = ws
self.create_out_channel(sess_id)
def unregister_websocket(self, sess_id):
self.websockets.pop(sess_id)
if sess_id in self.out_channels:
self.out_channels[sess_id].close()
def create_out_channel(self, sess_id):
def on_output_channel_creation(channel):
def on_output_queue_declaration(queue):
channel.basic_consume(self.on_message, queue=sess_id)
self.out_channels[sess_id] = channel
channel.queue_declare(callback=on_output_queue_declaration,
queue=sess_id, auto_delete=True, exclusive=True)
self.connection.channel(on_output_channel_creation)
def redirect_incoming_message(self, sess_id, message):
self.in_channel.basic_publish(
exchange='tornado_input', routing_key=sess_id, body=message)
def on_message(self, channel, method, header, body):
sess_id = method.routing_key
if sess_id in self.websockets:
self.websockets[sess_id].write_message(body)
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
channel.basic_reject(delivery_tag=method.delivery_tag)
Server.py
是应用程序的主要入口点。
import logging
import os
from tornado import web, ioloop
from tornado.options import define, options, parse_command_line
from client import PikaClient
from handlers import WSHandler, MainHandler
define("port", default=3000, help="run on the given port.", type=int)
define("debug", default=True, help="run in debug mode.", type=bool)
def main():
parse_command_line()
settings = {
"debug": options.debug,
"static_path": os.path.join(os.path.dirname(__file__), "web/static")
}
app = web.Application(
[
(r"/", MainHandler),
(r"/stream", WSHandler),
],
**settings
)
# Setup PikaClient
app.pc = PikaClient()
app.listen(options.port)
logging.info("Server running on http://localhost:3000")
try:
app.pc.run()
except KeyboardInterrupt:
app.pc.stop()
if __name__ == "__main__":
main()
解决方案
推荐阅读
- c - 在 RichEdit Winapi 中实现“选项卡完成”
- python - Django 时间戳
- asp.net-mvc - 如何将查询字符串中的对象传递给控制器?
- python - Canopy 环境中的 Python 错误“'float' 对象不能被解释为索引”
- rust - 如何检查远程 UDP 端口是否打开?
- coldfusion - 分割后的ColdFusion地板功能
- gmail - 如何以编程方式检查 Gmail 电子邮件地址是否存在
- http - Why is my port forwarding not working?
- android - Not able to access localhost on android device on the same network
- drupal-8 - Day and month name translation in Drupal8