c# - avro序列化的C# confluent kafka问题
问题描述
我正在使用 docker 在我的测试项目中运行来自https://github.com/confluentinc/cp-all-in-one的 kafka 和其他服务, 以及用于 kafka、avro 和 schemaRegistry 的融合 nuget 包。
如果要发送 json 消息,到目前为止我没有问题,但我正在努力发送 avro 序列化消息。
我看到了https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroSpecific示例,我尝试以相同的方式进行操作,但最终出现如下异常:
本地: C:\Users\lu95eb\source\repos\Kafka_playground\Kafka producer\KafkaService.cs:line 126 中 Kafka_producer.KafkaService.d__10.MoveNext()
处 Confluent.Kafka.Producer2.<ProduceAsync>d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter
1.GetResult() 的值序列化错误
除了内部异常
你调用的对象是空的。
在 Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializer
1.d__6.MoveNext() 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 在 System.Runtime.CompilerServices。 Confluent.Kafka.Producer`2.d__52.MoveNext() 处的 TaskAwaiter.ValidateEnd(Task 任务)
这是我的 SpecificRecord 类
public class UserInfo : ISpecificRecord
{
public string Name { get; set; }
public int[] Numbers { get; set; }
public Schema Schema => Schema.Parse(@"
{
""name"": ""UserInfo"",
""type"": ""record"",
""namespace"": ""kafka"",
""fields"": [
{
""name"": ""Name"",
""type"": ""string""
},
{
""name"": ""Numbers"",
""type"": {
""type"": ""array"",
""items"": ""int""
}
}
]
}
");
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return Name;
case 1: return Numbers;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
}
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: Name = (string)fieldValue; break;
case 1: Numbers = (int[])fieldValue; break;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
}
}
}
以及用于发送消息的方法
private async Task SendSpecificRecord(UserInfo userInfo)
{
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
using (var producer =
new ProducerBuilder<string, UserInfo>(new ProducerConfig { BootstrapServers = _brokerUrl })
.SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
.SetValueSerializer(new AvroSerializer<UserInfo>(schemaRegistry))
.Build())
{
var message = new Message<string, UserInfo>
{
Key = userInfo.Name,
Value = userInfo
};
await producer.ProduceAsync(SpecificTopic, message);
}
}
KafkaService.cs:第 126 行是await producer.ProduceAsync(SpecificTopic, message);
就像我在开始时写的那样,我对 schemaRegistry 没有任何问题——我已经注册了模式并且它们对 json 工作正常,我对主题、代理、消费者或其他任何东西都没有问题。
如果有人能指出我做错了什么,我将不胜感激。先感谢您。
解决方案
我遇到了同样的问题,并且在查看了 github 上的库代码后能够解决它。似乎架构注册表需要在您的类中实现名为 _SCHEMA 的 ISpecificRecord 的静态字段。
因此,如果您添加一个公共静态 _SCHEMA = Schema.Parse(....
并更改您的公共架构 => UserInfo._SCHEMA;
它可以在没有您的解决方法的情况下工作,它只是忽略架构注册表。
推荐阅读
- ruby-on-rails - 如何将 puma 配置为仅允许强 TLS 密码?
- javascript - 在使用 Jest 测试 React 应用程序时处理发布请求和令牌
- amazon-web-services - Metricbeat:退出:1 错误:创建 aws 指标集时出错:失败 DescribeRegions:UnauthorizedOperation:您未获得授权
- datepicker - Vue 3 日期选择器 Element-Plus
- reactjs - 如何在不使用固定 CSS 的情况下更改 React JS 中的 Swiper 高度或滑动宽度
- python - 分配 uint32_t 的 cython memoryview 的最佳方法
- javascript - 屏幕方向更改后如何在 iOS Safari 上保持应用全屏显示
- flutter - 在模拟器上运行 Flutter App 时出现 Java 错误
- haskell - 当在 Eithers 列表上运行 'sequence' 时,ghc 如何知道要列出哪个参数?
- c# - Regex - 匹配某个字符串并获取该字符串的整数值