c# - 更改模式注册表格式后,Kafka Consumer 消息未收到
问题描述
我是 Kafka 的新手,并且能够以某种方式运行 kafka Avro 消费者和生产者。生产者正在生成消息,我已成功将其传递给消费者。这是我的生产者代码片段:
static async void AvroProducer()
{
string bootstrapServers = "localhost:9092";
string schemaRegistryUrl = "Production163:8081";
string topicName = "player";
string groupName = "avro-generic-example-group";
var s = (RecordSchema)RecordSchema.Parse(
@"{
""namespace"": ""Confluent.Kafka.Examples.AvroSpecific"",
""type"": ""record"",
""name"": ""User"",
""fields"": [
{""name"": ""name"", ""type"": ""string""},
{""name"": ""favorite_number"", ""type"": [""int"", ""null""]},
{""name"": ""favorite_color"", ""type"": [""string"", ""null""]}
]
}"
);
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
using (var producer =
new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
.SetKeySerializer(new AsyncAvroSerializer<string>(schemaRegistry))
.SetValueSerializer(new AsyncAvroSerializer<GenericRecord>(schemaRegistry))
.Build())
{
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
int i = 0;
string text;
while ((text = Console.ReadLine()) != "q")
{
var record = new GenericRecord(s);
record.Add("name", text);
record.Add("favorite_number", i++);
record.Add("favorite_color", "blue");
producer.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
.ContinueWith(task => task.IsFaulted
? $"error producing message: {task.Exception.Message}"
: $"produced to: {task.Result.TopicPartitionOffset}");
}
}
Console.ReadLine();
}
正如您在上面的代码中看到的,我正在使用记录方案,但我正在尝试这个方案:
//this is the new schema try
var s = (RecordSchema)RecordSchema.Parse(
@"{
""type"": ""record"",
""name"": ""TestingMsg"",
""doc"": ""Sample"",
""fields"": [
{
""name"": ""key"",
""type"": ""string""
},
{
""name"": ""Time"",
""type"": ""long""
},
{
""name"": ""sourceSeconds"",
""type"": ""long""
},
{
""name"": ""serverT"",
""type"": ""long""
},
{
""name"": ""statusCode"",
""type"": ""int""
}
]
}"
);
我正在尝试使用的新版本,但由于我没有在消费者中收到消息,因此无法正常工作。这是消费者:
void KafkaReader(CancellationToken cancellationToken)
{
Debug.Log("kafka reader started. . .");
// Set up your Kafka connection here.
while (_keepThreadRunning)
{
using (CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
using (IConsumer<string, GenericRecord> consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
//using (IConsumer<string, GenericRecord> consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers})
.SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
.SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((_, e) => Debug.Log($"Error: {e.Reason}"))
.Build())
{
Debug.Log("subscribe" );
consumer.Subscribe(topicName);
while (true)
{
ConsumeResult<string, GenericRecord> consumeResult = consumer.Consume(cancellationToken);//TimeSpan.FromMilliseconds(50000)//new TimeSpan(0,0,1)
_stringsReceived.Enqueue(consumeResult.Value.ToString());
if (consumeResult != null)
{
Debug.Log($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
}
else
{
Debug.Log("consumer Result is null");
}
//yield return new WaitForSeconds(1);
}
}
}
GetComponent<KafkaServerConfigUI>().KafkaDisconnected();
// Disconnect and clean up your connection here.
}
请记住,我只是使用批处理文件运行默认的 apache Kafka 注册表。
D:\ApachKafka\confluent\confluent-5.2.1\bin\windows\schema-registry-start.bat D:\ApachKafka\confluent\confluent-5.2.1\etc\schema-registry\schema-registry.properties
我做错了什么?我需要将架构注册到任何地方吗?
解决方案
要进行任何更改或使用新架构,您必须注册架构。我错过了这个东西,因此我没有收到消费者的消息。这是帮助您注册模式的简短 python 脚本。
使用脚本,您必须提供模式注册表的 URL(以 http:// 开头,而不仅仅是主机名和端口)、模式应注册的主题以及模式的路径。
这是我注册我的架构的方式
感谢 Ref:Avro 和 Schema 注册表
推荐阅读
- laravel - 当我在 Drupal 中发布新内容时,如何向所有移动设备发送通知?Laravel 也是我的身份验证后端
- javascript - 选择具有不同页面的网格
- c++ - 创建一个结构,其属性类型是具有第一个结构类型属性的结构
- java - 如何在 Firebase 实时数据库(Android 开发)中获取特定子项的键值?
- python - 需要一些帮助:_tkinter.TclError: 无法识别图像文件中的数据
- html - 浏览器渲染引擎使用盒子模型来表示html元素?a)如果是,那么浏览器可以在没有盒子模型的情况下呈现 html 元素吗?乙)?
- javascript - js循环和对象数组的问题
- c++ - C ++算法对连续的整数块求和
- javascript - Vuejs如何为表格中的每一行添加一个类
- android - 在没有单选按钮的 Jetpack Compose 中创建切换按钮组