python - 如何将数据发送到 EventHub 设置 PartitionId 而不是 PartitionKey (Python)
问题描述
我在 Microsoft Docs 看到有一种方法可以通过设置 PartitionId 而不是 PartitionKey(使用 C#)将数据发送到我想要的分区。
CreatePartitionSender(String) 创建一个可以将 EventData 直接发布到特定 EventHub 分区的 PartitionSender。
但是,我在 Python 中找不到相同的内容。
有什么可用的方法吗?
解决方案
我不太确定,但使用 python ,这是打开连接的方法
def open(self):
"""
Open the Sender using the supplied conneciton.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.
:param connection: The underlying client shared connection.
:type: connection: ~uamqp.connection.Connection
"""
self.running = True
if self.redirected:
self.target = self.redirected.address
self._handler = SendClient(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
这是初始化
def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True):
"""
Instantiate an EventHub event Sender handler.
:param client: The parent EventHubClient.
:type client: ~azure.eventhub.client.EventHubClient.
:param target: The URI of the EventHub to send to.
:type target: str
:param partition: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
:type partition: str
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is None, i.e. no keep alive pings.
:type keep_alive: int
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
"""
self.running = False
self.client = client
self.target = target
self.partition = partition
self.timeout = send_timeout
self.redirected = None
self.error = None
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.name = "EHSender-{}".format(uuid.uuid4())
if partition:
self.target += "/Partitions/" + partition
self.name += "-partition{}".format(partition)
self._handler = SendClient(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self._outcome = None
self._condition = None
我相信,下面的函数行只会创建一个分区发送者
if partition:
self.target += "/Partitions/" + partition
self.name += "-partition{}".format(partition)
参考
https://github.com/Azure/azure-event-hubs-python/blob/master/azure/eventhub/sender.py
希望能帮助到你。
推荐阅读
- c - 您如何使用 Nvidia Flex 设置模拟?
- prolog - Prolog - 一个卡住的简单程序(似乎进入无限循环)
- amazon-web-services - 如何停止/退出 AWS Glue 作业 (PySpark)?
- javascript - NPM:如何处理看似冲突的依赖关系?
- pytorch - 使用 pytorch 闪电的不同测试结果
- php - 地址标签的 WooCommerce 自定义本地化地址格式
- angular - 角材料自动完成不会触发 onChange 事件?
- eclipse - 运行 iOS appium 脚本时出错
- winapi - GetRawInputDeviceInfo 指示 RIDI_DEVICENAME 的缓冲区大小为 1 个字符
- laravel - Laravel Blade 在最终 HTML 中添加了不需要的空格