google-api - google.cloud.pubsub - 流式拉取 PubSub 消息
问题描述
我目前正在对最新的 google-cloud-pubsub==0.35.4
pubsub 版本进行一些测试。我的意图是使用动态数量的订阅者客户端处理永无止境的流(负载变化)。
但是,当我有一个队列时说.. 600 条消息和 1 个客户端正在运行,然后添加其他客户端:
- 预期:所有剩余消息均匀分布在所有客户端
- Observed:只有新消息在客户端之间分发,任何旧消息都发送到预先存在的客户端
下面是我用于客户的简化版本(作为参考,我们将只运行低优先级的主题)。我不会包括发布者,因为它没有关系。
PRIORITY_HIGH = 1
PRIORITY_MEDIUM = 2
PRIORITY_LOW = 3
MESSAGE_LIMIT = 10
ACKS_PER_MIN = 100.00
ACKS_RATIO = {
PRIORITY_LOW: 100,
}
PRIORITY_TOPICS = {
PRIORITY_LOW: 'test_low',
}
PRIORITY_SEQUENCES = {
PRIORITY_LOW: [PRIORITY_LOW, PRIORITY_MEDIUM, PRIORITY_HIGH],
}
class Subscriber:
subscriber_client = None
subscriptions = {}
priority_queue = defaultdict(Queue.Queue)
priorities = []
def __init__(self):
logging.basicConfig()
self.subscriber_client = pubsub_v1.SubscriberClient()
for option, percentage in ACKS_RATIO.iteritems():
self.priorities += [option] * percentage
def subscribe_to_topic(self, topic, max_messages=10):
self.subscriptions[topic] = self.subscriber_client.subscribe(
BASE_TOPIC_PATH.format(project=PROJECT, topic=topic,),
self.process_message,
flow_control=pubsub_v1.types.FlowControl(
max_messages=max_messages,
),
)
def un_subscribe_from_topic(self, topic):
subscription = self.subscriptions.get(topic)
if subscription:
subscription.cancel()
del self.subscriptions[topic]
def process_message(self, message):
json_message = json.loads(message.data.decode('utf8'))
self.priority_queue[json_message['priority']].put(message)
def retrieve_message(self):
message = None
priority = random.choice(self.priorities)
ack_priorities = PRIORITY_SEQUENCES[priority]
for ack_priority in ack_priorities:
try:
message = self.priority_queue[ack_priority].get(block=False)
break
except Queue.Empty:
pass
return message
if __name__ == '__main__':
messages_acked = 0
pub_sub = Subscriber()
pub_sub.subscribe_to_topic(PRIORITY_TOPICS[PRIORITY_LOW], MESSAGE_LIMIT * 3)
while True:
msg = pub_sub.retrieve_message()
if msg:
json_msg = json.loads(msg.data.decode('utf8'))
msg.ack()
print ("%s - Akked Priority %s , High %s, Medium %s, Low %s" % (
datetime.datetime.now().strftime('%H:%M:%S'),
json_msg['priority'],
pub_sub.priority_queue[PRIORITY_HIGH].qsize(),
pub_sub.priority_queue[PRIORITY_MEDIUM].qsize(),
pub_sub.priority_queue[PRIORITY_LOW].qsize(),
))
time.sleep(60.0 / ACKS_PER_MIN)
我想知道这种行为是否是流拉动功能所固有的,或者是否存在可以改变这种行为的配置。
干杯!
解决方案
考虑到Cloud Pub/Sub 文档,Cloud Pub/sub 为每个订阅至少交付每个发布的消息一次,但是这种行为有一些例外:
- 无法在 7 天的最长保留时间内送达的邮件将被删除。
- 在创建给定订阅之前发布的消息将不会被传递。
换句话说,服务会将消息传递给在消息发布之前创建的订阅,因此旧消息将无法用于新订阅。据我所知,Cloud Pub/Sub 不提供改变这种行为的功能。
推荐阅读
- javascript - 简单的 JS 在 Tab 键跨浏览器中转换 Enter 键
- reactjs - ReactJS--我的数据用axios返回空
- python - 如何将 JSON 元素从字符串转换为数组
- sparql - 如何使用 SPARQL 返回完整的树?
- python - 在 python 中使用 enumerate() 访问列表中的“Actions”对象
- scala - Play: JSON Writes is not respected when class extends case class
- azure - creating multiple resources at the same time in terraform
- google-sheets - Google 电子表格 API:单元格中的注释
- python - BigTable:使用 Python 客户端提高 BigTable 查询性能
- beautifulsoup - 美丽的汤。如何获取包含特定单词的链接?