php - 使用 php-enqueue 生成 avro 消息
问题描述
我正在研究一种使用php-enqueue生成从 php 到 kafka 的 avro 消息的方法。
他们的文档 指出您可以使用其他格式,包括 Apache Avro。
默认情况下,传输将消息序列化为 json 格式,但您可能希望使用其他格式,例如 Apache Avro。为此,您必须实现 Serializer 接口并将其设置为上下文、生产者或消费者。如果将序列化程序设置为上下文,它将被注入到上下文创建的所有消费者和生产者中。
<?php use Enqueue\RdKafka\Serializer; use Enqueue\RdKafka\RdKafkaMessage; class FooSerializer implements Serializer { public function toMessage($string) {} public function toString(RdKafkaMessage $message) {} } /** @var \Enqueue\RdKafka\RdKafkaContext $context */ $context->setSerializer(new FooSerializer());
示例中的序列化程序正在与字符串相互转换。据我了解,Avro 格式是二进制的,那么在这种情况下自定义序列化程序应该如何工作?
解决方案
PHP 字符串可以包含二进制数据。这是使用已在模式注册表中注册的模式 id 生成 avro 消息的部分实现。使用jaumo/avro实现对 avro 的序列化。
public function toString(RdKafkaMessage $message): string
{
...
$message = json_decode($message->getBody(), true);
$encodedHeader = $this->createAvroHeader($schemaId);
$encodedMessage = Serde::encodeMessage($parsedSchema, $message);
return $encodedHeader . $encodedMessage;
}
private function createAvroHeader(int $schemaId): string
{
$binarySchemaId = hex2bin(sprintf("%08s", dechex($schemaId)));
return pack("C", 0) . $binarySchemaId;
}
推荐阅读
- python - 不和谐机器人嵌入
- javascript - 为什么在 map 异步循环中更改参数仅适用于第一个异步函数调用?
- python - 如何使用 groupy 创建数据框,以便分组标准包含在数据中
- python - 当我尝试获取所有盒子分数时出现回溯错误
- java - Textview 中的大十进制值支持
- javascript - 无法使用客户端签名调用将图像上传到 Cloudinary - 签名无效
- angular - 在转换管道中使用链式可观察对象
- javascript - 从嵌入消息 Discord.JS 中读取信息
- c - 为什么我的字符串在离开它所包含的 for 循环后会改变内容
- c - 用C语言绘制表格