apache-kafka - 使用消费者 api 订阅 Ktable 主题
问题描述
对于 Kafka 主题,我可以使用 confluent consumer api 订阅和接收消息。对于 Kafka Ktables,我可以使用 REST API 和 http 客户端进行订阅。所以我的问题是,也许可以不通过rest api而是通过confluent consumer api订阅Kafka表?
这就是我订阅主题的方式:
using (var consumer = new ConsumerBuilder<Ignore, string>(conf).Build())
{
try
{
consumer.Subscribe(this.TopicLookup);
while (true)
{
try
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
这就是我查询 KSQL 的方式:
using (var client = new HttpClient())
{
client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
var request = new HttpRequestMessage(HttpMethod.Post, url);
request.Method = HttpMethod.Post;
request.Content = new System.Net.Http.StringContent("{ \"ksql\": \"select * from userstream7table;\",\"streamsProperties\": { \"ksql.streams.auto.offset.reset\": \"earliest\"}}", Encoding.UTF8, "application/vnd.ksql.v1+json");
//request.Content.Headers.Add("Accept", "application/vnd.ksql.v1+json");
using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
{
using (var body = await response.Content.ReadAsStreamAsync())
using (var reader = new StreamReader(body))
while (!reader.EndOfStream)
Console.WriteLine(reader.ReadLine());
}
}
当我创建一个带有 REST/Ksql 主题的 Ktable 时,也会创建一个。如果我尝试订阅它,我不会收到任何消息。如果我使用 RESP API 查询这个 Ktable,我会得到所有消息。也许不可能以标准的方式使用这些主题?
解决方案
该表的主题只是一个 Kafka 主题,就像其他任何主题一样。您可以随意使用这些数据,包括使用消费者 API。
您可能遇到错误的原因可能是您没有设置正确的消费者配置。最可能的罪魁祸首是:
- 不寻求话题的开始,即不设置
auto.offset.reset
为earliest
。 - 配置了错误的解串器。我看到您使用 构建消费者
<Ignore, string>
,即忽略键和字符串值。但是值中的数据是字符串吗?这将取决于您的 value_format。
推荐阅读
- ios - 在双面(正面和背面)打印机上打印 WebView 内容,正面和背面具有不同的边距
- sql-server-2008 - Crystal Reports 无法识别 varchar(max) 列或数据类型
- arrays - 从嵌套的 mongo 对象中获取数组元素
- screenshot - 交付到 App Store 时,Fastlane 框架不支持的屏幕尺寸
- azure-aks - AKS create a Load Balancer itself. Purpose and usage of the sma ein not clear
- html - 电子邮件签名 - 标题横幅
- android - 我的应用程序在分析模式下运行时停止响应
- android - 用户如何在 TableLayout 中的特定单元格内写入,以便显示单元格中的用户文本?
- android - PackageName in DevicePolicyManager.clearPackagePersistentPreferredActivities(ComponentName admin,String packageName)
- objective-c - Error "request too large (413)" when trying to upload a PDF file to OneNote