c# - C# Confluent.Kafka SetValueDeserializer 对象反序列化
问题描述
在我的消费者中,我想反序列化 Kafka protobuf 消息。键是字符串类型,但消息值是 protobuf 对象。我知道我必须为消息值创建自己的自定义反序列化器,但不知道如何创建一个。这是我的消费者实现,我需要替换标记的行:
using Confluent.Kafka;
using System;
using System.Threading;
namespace EventHubsForKafkaSample
{
class Worker1
{
public static void Consumer(string brokerList, string connStr, string consumergroup, string topic, string cacertlocation)
{
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SocketTimeoutMs = 60000, //this corresponds to the Consumer config `request.timeout.ms`
SessionTimeoutMs = 30000,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connStr,
SslCaLocation = cacertlocation,
GroupId = consumergroup,
AutoOffsetReset = AutoOffsetReset.Earliest,
BrokerVersionFallback = "1.0.0", //Event Hubs for Kafka Ecosystems supports Kafka v1.0+, a fallback to an older API will fail
//Debug = "security,broker,protocol" //Uncomment for librdkafka debugging information
};
using (var consumer = new ConsumerBuilder<string, ProtobufMessage>(config)
.SetKeyDeserializer(Deserializers.Utf8)
.SetValueDeserializer(Deserializers.Utf8) //<<-----
.Build())
{
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
consumer.Subscribe(topic);
Console.WriteLine("Consuming messages from topic: " + topic + ", broker(s): " + brokerList);
while (true)
{
try
{
var msg = consumer.Consume(cts.Token);
Console.WriteLine($"Received: '{msg.Value}'");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
catch (Exception e)
{
Console.WriteLine($"Error: {e.Message}");
}
}
}
}
}
public class ProtobufMessage
{
public DateTime timestamp { get; set; }
public int inputId { get; set; }
public double? value { get; set; }
public int sourceId { get; set; }
public string inputGuid { get; set; }
}
}
Protobuf 消息格式:
syntax = "proto3";
package ileco.chimp.proto;
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
option java_package = "ileco.chimp.proto";
option java_outer_classname = "FinalValueProtos";
message FinalValue {
google.protobuf.Timestamp timestamp = 1;
uint32 inputId = 2;
google.protobuf.DoubleValue value = 3;
uint32 sourceId = 4;
string inputGuid = 5;
}
解决方案
您需要使用
protoc
从架构生成 C# 类如果您使用 Schema Registry,则不需要自己的反序列化器。查看示例代码
using (var consumer = new ConsumerBuilder<string, YourProtoMessage>(consumerConfig) .SetValueDeserializer(new ProtobufDeserializer<YourProtoMessage>().AsSyncOverAsync())
如果您不使用模式注册表,那么您将需要通过实现来定义自己的反序列化器,IDeserializer
如另一个答案中所述
推荐阅读
- c - 在一个甚至不编辑它的函数调用之后取一个随机值的整数
- python - 使用补丁模拟 getpass
- php - 带有 php 添加“?”的 HTML 表单 在 url 而不是做 php 脚本
- spring-boot - 使用 spring boot 1.4.7 的 Spring session redis SSL 支持
- excel - 如何根据 Microsoft Excel 中的其他匹配列条件将一列数据拆分为两列?
- windows - 我是否需要在不同域上拥有相同的用户帐户才能通过批处理脚本运行 net use 命令?
- groovy - 防止 GroovyDefaultMethods join() 遮蔽 Java 类的 join() 方法?
- vba - 将多个 CSV 文件导入 1 个表 - 第一行不匹配
- sql - Oracle - 即时加入内联表或表
- c - 是否可以挂载远程文件系统并打开/mmap /dev/mem?