首页 > 解决方案 > 重新连接后卡夫卡生产者阻塞

问题描述

我正在运行 kafka 的 bitnami docker 映像。操作系统是centos,java版本是11,bitnami版本是2.7.0,Kafka API是2.4.1 for java off maven。

我正在流式传输市场数据,因此每隔 100 毫秒就会更新一次。不使用acks,它运行良好。每天在固定的时间都有仪器的翻转,所以我断开了连接。我在 30 秒内关闭了制作人。我可能会丢失最后一条消息,但通常似乎完全关闭。然后我去获取新工具并重新启动流式市场数据,这个过程大约需要 30 秒。问题是现在生产者阻塞了 60 秒,超时。这种情况发生了几次,然后我们关闭了,因为我们现在远远落后于市场数据馈送。在测试系统上,主题有一个分区,另一端有一个消费者,通常跟上生产者的步伐。

我无法在调试时复制它,因为它似乎只在容器中运行市场数据馈送时发生 - Kafka 和 zookeeper 始终在容器中运行。如果我为请求释放了调试,那么我最终会得到一个多 GB 的日志文件,并且似乎导致错误不会发生,这表明存在时间问题。

提前感谢您的任何建议

要输出的类是:

public class QPOutput implements AutoCloseable
{
    public QPOutput(String writerTopic, String writerSchema, Properties writerProperties)
    {
        producerProperties.putAll(writerProperties);
        // Add default properties
        // Add it with bootstrap.servers
        String servers = writerProperties.getProperty("bootstrap.servers");
        producerProperties.put("bootstrap.servers", servers);

        producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProperties.put("max.in.flight.requests.per.connection", 1);

        topic = writerTopic;
        // set up avro reader
        schema = new Schema.Parser().parse(writerSchema);
        datumReader = new GenericDatumReader<GenericRecord>(schema);
        datumWriter = new GenericDatumWriter<GenericRecord>(schema);
    }

    public void  startWriter(QPModel model, String modelRunID)
    {
        modelCategoryName = model.getCategory() + "/" + model.getName();
        modelVersion = model.getVersion();

        runID = modelRunID;
  //      producer_properties['security_protocol'] = 'PLAINTEXT'
        producer = new KafkaProducer<>(producerProperties);        
    }
    
    public void stopWriter(long timeoutInMilliseconds)
    {
        // flush any writing
        //producer.flush();

        // Set timeout
        timeout = timeoutInMilliseconds;
        
        close();
    }
        
    public void write(String message) throws IOException
    {
        byte[] translatedMessage = this.translateToAvro(message);
        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, translatedMessage);
        // add model category and name
        record.headers().add("model_category_name", modelCategoryName.getBytes());
        // add model version
        record.headers().add("model_version", modelVersion.getBytes());
        // Add run id
        record.headers().add("run_id", runID.getBytes());
        // Send and force write
        producer.send(record);
    }
    
    @Override
    public void close()
    {
        // if producer isnt null
        if( null != producer)
        {
            // close producer
            producer.close(Duration.ofMillis(timeout));
            
            // set to null
            producer = null;
        }        
    }
    
    protected byte[] translateToAvro(String messageAsJson) throws IOException
    {
        InputStream input = new ByteArrayInputStream(messageAsJson.getBytes());
        DataInputStream din = new DataInputStream(input);

        JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

        GenericRecord datum = datumReader.read(null, decoder);

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        BinaryEncoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

        datumWriter.write(datum, e);
        e.flush();

        return outputStream.toByteArray();
    }

    protected long timeout = 30000;
    protected String topic;
    protected Schema schema;
    protected String runID;
    protected GenericDatumReader<GenericRecord> datumReader;
    protected GenericDatumWriter<GenericRecord> datumWriter;
    protected String groupID;
    protected Properties producerProperties = new Properties();
    protected Object callbackHandler;
    protected String modelCategoryName;
    protected String modelVersion;
    KafkaProducer<byte[], byte[]> producer = null;
}```

标签: javaapache-kafkaapache-kafka-connectconfluent-platformbitnami

解决方案


推荐阅读