c++ - librdkafka 线程不清理失败
问题描述
在我的应用程序中,我正在创建一个生产者和一个消费者。
conf->set("dr_cb", delivery_cb, errstr);
conf->set("event_cb", event_cb, errstr);
RdKafka::Producer::create(conf.get(), errstr)
conf->set("log_level", "0", errstr);
conf->set("group.id", group_id, errstr);
conf->set("client.id", m_kafka_client_id, errstr);
conf->set("auto.offset.reset", "earliest", errstr);
conf->set("rebalance_cb", rebalance_cb, errstr);
conf->set("statistics.interval.ms", "3000", errstr);
conf->set("event_cb", event_cb, errstr);
RdKafka::KafkaConsumer::create(conf.get(), errstr)
然后我尝试按如下方式获取元数据
err = _consumer->metadata(false, nullptr, &metadata, METADATA_TIMEOUT);
std::unique_ptr<RdKafka::Metadata> metadata_uptr(metadata); // Handover the raw pointer to the unique_ptr now
如果由于某种原因代理通信无法正常工作,我会收到错误消息,那么我的生产者和消费者会通过 unique_ptr -> 析构函数被删除。然后循环进行,直到应用程序成功连接到代理。
我观察到的是我看到很多线程被创建并留在那里。在某个时间点,计数达到 2000 个线程。
清理Kafka的正确方法是什么?
线程卡在这里
#0 0x00007f6acc031cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00000000004b8e65 in cnd_timedwait_ms (cnd=cnd@entry=0xab41b8, mtx=mtx@entry=0xab4190, timeout_ms=<optimized out>) at tinycthread.c:501
#2 0x00000000004853aa in rd_kafka_q_serve (rkq=0xab4190, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK,
callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:440
#3 0x000000000045a43c in rd_kafka_thread_main (arg=arg@entry=0xabf760) at rdkafka.c:1227
#4 0x00000000004b8c07 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#5 0x00007f6acc02de25 in start_thread () from /lib64/libpthread.so.0
#6 0x00007f6acaa7834d in clone () from /lib64/libc.so.6
我也尝试在失败时调用以下例程但没有帮助......
RdKafka::wait_destroyed(5000);
解决方案
RdKafka::KafkaConsumer 被包裹在另一个容器中,并且“myobject_wrapper->close()”没有被重定向到“RdKafka::KafkaConsumer::close”
在我的应用程序中解决了这个问题后,现在一切正常。
推荐阅读
- excel - 为什么不是所有的记录都从数据中返回
- regex - 如何为 Safari 创建正则表达式
- javascript - 有没有办法在 JSON 中转换 JS 数组或在 Android 中转换数组?
- c# - DataTrigger 在 RadioButton IsChecked 时更新 DatePicker
- node.js - 如何使用 aws-sdk 将文件同步上传到 S3?
- azure-cosmosdb - 我应该为子文档使用 _self SelfLink
- r - 根据另一个表中的并发日期范围划分列值?
- c++11 - 引用折叠规则是否适用于返回类型?
- python - 有没有办法在不定义布局的情况下将 QPushButton 对象动态添加到 QWidget?
- sql - 爆炸后如何从数组中删除一个元素