首页 > 解决方案 > 使用 php-enqueue 生成 avro 消息

问题描述

我正在研究一种使用php-enqueue生成从 php 到 kafka 的 avro 消息的方法。

他们的文档 指出您可以使用其他格式,包括 Apache Avro。

默认情况下,传输将消息序列化为 json 格式,但您可能希望使用其他格式,例如 Apache Avro。为此,您必须实现 Serializer 接口并将其设置为上下文、生产者或消费者。如果将序列化程序设置为上下文,它将被注入到上下文创建的所有消费者和生产者中。

<?php
use Enqueue\RdKafka\Serializer;
use Enqueue\RdKafka\RdKafkaMessage;

class FooSerializer implements Serializer
{
    public function toMessage($string) {}

    public function toString(RdKafkaMessage $message) {}
}

/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$context->setSerializer(new FooSerializer());

示例中的序列化程序正在与字符串相互转换。据我了解,Avro 格式是二进制的,那么在这种情况下自定义序列化程序应该如何工作?

标签: phpapache-kafkaavrokafka-producer-apiphp-enqueue

解决方案


PHP 字符串可以包含二进制数据。这是使用已在模式注册表中注册的模式 id 生成 avro 消息的部分实现。使用jaumo/avro实现对 avro 的序列化。

public function toString(RdKafkaMessage $message): string
{
    ...

    $message = json_decode($message->getBody(), true);

    $encodedHeader = $this->createAvroHeader($schemaId);
    $encodedMessage = Serde::encodeMessage($parsedSchema, $message);

    return $encodedHeader . $encodedMessage;
}

private function createAvroHeader(int $schemaId): string
{
    $binarySchemaId = hex2bin(sprintf("%08s", dechex($schemaId)));
    return pack("C", 0) . $binarySchemaId;
}

推荐阅读