docker - Kafka Client Timeout 60000ms 在分区位置确定之前过期
问题描述
我正在尝试将 Flink 连接到 Kafka 消费者
我正在使用 Docker Compose 构建 4 个容器 zookeeper、kafka、Flink JobManager 和 Flink TaskManager。
对于 zookeeper 和 Kafka,我使用的是 wurstmeister 图像,对于 Flink,我使用的是官方图像。
码头工人-compose.yml
version: '3.1'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
hostname: zookeeper
expose:
- "2181"
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.11-2.0.0
depends_on:
- zookeeper
ports:
- "9092:9092"
hostname: kafka
links:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: 'pipeline:1:1:compact'
jobmanager:
build: ./flink_pipeline
depends_on:
- kafka
links:
- zookeeper
- kafka
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
JOB_MANAGER_RPC_ADDRESS: jobmanager
BOOTSTRAP_SERVER: kafka:9092
ZOOKEEPER: zookeeper:2181
taskmanager:
image: flink
expose:
- "6121"
- "6122"
links:
- jobmanager
- zookeeper
- kafka
depends_on:
- jobmanager
command: taskmanager
# links:
# - "jobmanager:jobmanager"
environment:
JOB_MANAGER_RPC_ADDRESS: jobmanager
当我向 Dispatcher 提交一个简单的作业时,作业失败并出现以下错误:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition pipeline-0 could be determined
我的工作代码是:
public class Main {
public static void main( String[] args ) throws Exception
{
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
Properties properties = new Properties();
String bootstrapServer = System.getenv("BOOTSTRAP_SERVER");
String zookeeperServer = System.getenv("ZOOKEEPER");
if (bootstrapServer == null) {
System.exit(1);
}
properties.setProperty("zookeeper", zookeeperServer);
properties.setProperty("bootstrap.servers", bootstrapServer);
properties.setProperty("group.id", "pipeline-analysis");
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<String>("pipeline", new SimpleStringSchema(), properties);
// kafkaConsumer.setStartFromGroupOffsets();
kafkaConsumer.setStartFromLatest();
DataStream<String> stream = env.addSource(kafkaConsumer);
// Defining Pipeline here
// Printing Outputs
stream.print();
env.execute("Stream Pipeline");
}
}
解决方案
我知道我参加聚会迟到了,但我遇到了完全相同的错误。就我而言,我没有正确设置 TopicPartitions。我的主题有 2 个分区,我的生产者正在生成消息,但它是 spark 流应用程序,作为我的消费者,它并没有真正启动并在 60 秒后放弃并抱怨同样的错误。
我有错误的代码 -
List<TopicPartition> topicPartitionList = Arrays.asList(new topicPartition(topicName, Integer.parseInt(numPartition)));
正确的代码 -
List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
for (int i = 0; i < Integer.parseInt(numPartitions); i++) {
topicPartitionList.add(new TopicPartition(topicName, i));
}
推荐阅读
- c# - 无法从用法中推断出类型参数。尝试明确指定类型参数。候选人是:
- javascript - 读取 JSON 文件的问题
- vba - 从表单中的文本框中捕获文本时,如何修复运行时错误?
- animation - 卡片轮播刷卡,细节扑朔迷离
- java - 如何自定义预定义的 Camel 组件?
- javascript - 尝试在分支末尾存储字符串超出调用堆栈限制
- c# - Sql Server:超时连接
- python - Python请求库:json.dumps的使用
- excel - 如何将活动工作簿保存在 Excel VBA 的另一个文件夹中?
- three.js - 如何在 Three.js 和 OrbitControls.js 中使用相机旋转光线