java - 重新连接后卡夫卡生产者阻塞
问题描述
我正在运行 kafka 的 bitnami docker 映像。操作系统是centos,java版本是11,bitnami版本是2.7.0,Kafka API是2.4.1 for java off maven。
我正在流式传输市场数据,因此每隔 100 毫秒就会更新一次。不使用acks,它运行良好。每天在固定的时间都有仪器的翻转,所以我断开了连接。我在 30 秒内关闭了制作人。我可能会丢失最后一条消息,但通常似乎完全关闭。然后我去获取新工具并重新启动流式市场数据,这个过程大约需要 30 秒。问题是现在生产者阻塞了 60 秒,超时。这种情况发生了几次,然后我们关闭了,因为我们现在远远落后于市场数据馈送。在测试系统上,主题有一个分区,另一端有一个消费者,通常跟上生产者的步伐。
我无法在调试时复制它,因为它似乎只在容器中运行市场数据馈送时发生 - Kafka 和 zookeeper 始终在容器中运行。如果我为请求释放了调试,那么我最终会得到一个多 GB 的日志文件,并且似乎导致错误不会发生,这表明存在时间问题。
提前感谢您的任何建议
要输出的类是:
public class QPOutput implements AutoCloseable
{
public QPOutput(String writerTopic, String writerSchema, Properties writerProperties)
{
producerProperties.putAll(writerProperties);
// Add default properties
// Add it with bootstrap.servers
String servers = writerProperties.getProperty("bootstrap.servers");
producerProperties.put("bootstrap.servers", servers);
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProperties.put("max.in.flight.requests.per.connection", 1);
topic = writerTopic;
// set up avro reader
schema = new Schema.Parser().parse(writerSchema);
datumReader = new GenericDatumReader<GenericRecord>(schema);
datumWriter = new GenericDatumWriter<GenericRecord>(schema);
}
public void startWriter(QPModel model, String modelRunID)
{
modelCategoryName = model.getCategory() + "/" + model.getName();
modelVersion = model.getVersion();
runID = modelRunID;
// producer_properties['security_protocol'] = 'PLAINTEXT'
producer = new KafkaProducer<>(producerProperties);
}
public void stopWriter(long timeoutInMilliseconds)
{
// flush any writing
//producer.flush();
// Set timeout
timeout = timeoutInMilliseconds;
close();
}
public void write(String message) throws IOException
{
byte[] translatedMessage = this.translateToAvro(message);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, translatedMessage);
// add model category and name
record.headers().add("model_category_name", modelCategoryName.getBytes());
// add model version
record.headers().add("model_version", modelVersion.getBytes());
// Add run id
record.headers().add("run_id", runID.getBytes());
// Send and force write
producer.send(record);
}
@Override
public void close()
{
// if producer isnt null
if( null != producer)
{
// close producer
producer.close(Duration.ofMillis(timeout));
// set to null
producer = null;
}
}
protected byte[] translateToAvro(String messageAsJson) throws IOException
{
InputStream input = new ByteArrayInputStream(messageAsJson.getBytes());
DataInputStream din = new DataInputStream(input);
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
GenericRecord datum = datumReader.read(null, decoder);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryEncoder e = EncoderFactory.get().binaryEncoder(outputStream, null);
datumWriter.write(datum, e);
e.flush();
return outputStream.toByteArray();
}
protected long timeout = 30000;
protected String topic;
protected Schema schema;
protected String runID;
protected GenericDatumReader<GenericRecord> datumReader;
protected GenericDatumWriter<GenericRecord> datumWriter;
protected String groupID;
protected Properties producerProperties = new Properties();
protected Object callbackHandler;
protected String modelCategoryName;
protected String modelVersion;
KafkaProducer<byte[], byte[]> producer = null;
}```
解决方案
推荐阅读
- azure - Terraform CLI 出现错误:ID 缺少 `slots` 元素
- python - 创建一个新的 python 列表,显示另一个列表中集合数字的累积总数,而不使用 for 循环
- python - 使用魔术方法比在 python 中使用运算符更快吗?
- python - 如何仅在 python 中部分读取 json 文件?
- service-worker - 如何收听来自 webhook 的响应?
- flutter - 为什么这个抖动功能在颤动中不起作用?
- python - 在 Scapy 参数中分配的值
- javascript - JQuery Dialog - 未保存更改时离开页面
- javascript - PhantomJS 获取特定元素的位置数组
- unit-testing - EJS:测试包含的 ejs 文件