首页 > 解决方案 > druid 中的 protobuf 摄取仅处于运行状态,但未创建数据源

问题描述

我已经使用 quickstart compose 文件完成了一个简单的 druid 设置。我想将 protobuf 从卡夫卡摄取到德鲁伊。我关注了这个链接 ,但无论我在描述符文件 URL 中使用什么路径,它都不会拾取,但在它显示正在运行的任务中。

这是我的撰写文件

version: "2.2"

volumes:
  metadata_data: {}
  middle_var: {}
  historical_var: {}
  broker_var: {}
  coordinator_var: {}
  router_var: {}


services:
  postgres:
    container_name: postgres
    image: postgres:latest
    volumes:
      - metadata_data:/var/lib/postgresql/data
    environment:
      - POSTGRES_PASSWORD=FoolishPassword
      - POSTGRES_USER=druid
      - POSTGRES_DB=druid
      
  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.5
    environment:
      - ZOO_MY_ID=1

  coordinator:
    image: apache/druid:0.20.0
    container_name: coordinator
    volumes:
      - /home/druid-data:/opt/data
      - coordinator_var:/opt/druid/var
    depends_on: 
      - zookeeper
      - postgres
    ports:
      - "8081:8081"
    command:
      - coordinator
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1

  broker:
    image: apache/druid:0.20.0
    container_name: broker
    volumes:
      - broker_var:/opt/druid/var
    depends_on: 
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8082:8082"
    command:
      - broker
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1

  historical:
    image: apache/druid:0.20.0
    container_name: historical
    volumes:
      - /mnt/druid-data:/opt/data
      - /mnt/druid-data/historical:/opt/druid/var
    
    depends_on: 
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8083:8083"
    command:
      - historical
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1
  middlemanager:
    image: apache/druid:0.20.0
    container_name: middlemanager
    volumes:
      - /home/druid-data:/opt/data
      - middle_var:/opt/druid/var
      - /home/druid-data/proto:/home
    depends_on: 
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8091:8091"
    command:
      - middleManager
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1

  router:
    image: apache/druid:0.20.0
    container_name: router
    volumes:
      - router_var:/opt/druid/var
      - /home/druid-data/proto:/home
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8888:8888"
    command:
      - router
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1

我真的不明白为什么它不从 kafka 主管中选择文件描述符。我使用的是这个链接中提到的同一个 kafka 主管和同一个 proto 文件。

任务日志

    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2020-12-15T11:33:57,320 INFO [task-runner-0-priority-0] org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
2020-12-15T11:33:57,320 INFO [task-runner-0-priority-0] org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
2020-12-15T11:33:57,320 INFO [task-runner-0-priority-0] org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1608032037318
2020-12-15T11:33:57,322 INFO [task-runner-0-priority-0] org.apache.druid.server.coordination.CuratorDataSegmentServerAnnouncer - Announcing self[DruidServerMetadata{name='192.168.16.7:8100', hostAndPort='192.168.16.7:8100', hostAndTlsPort='null', maxSize=0, tier='_default_tier', type=indexer-executor, priority=0}] at [/druid/announcements<ip>:8100]
2020-12-15T11:33:57,329 INFO [task-runner-0-priority-0] org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Announced self [{"druidNode":{"service":"druid/middleManager","host":"192.168.16.7","bindOnHost":false,"plaintextPort":8100,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"peon","services":{"dataNodeService":{"type":"dataNodeService","tier":"_default_tier","maxSize":0,"type":"indexer-executor","priority":0},"lookupNodeService":{"type":"lookupNodeService","lookupTier":"__default"}}}].
2020-12-15T11:33:57,383 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Initialized sequences: SequenceMetadata{sequenceId=0, sequenceName='index_kafka_metrics-protobuf_8ea4c14da35ee9d_0', assignments=[0], startOffsets={0=4}, exclusiveStartPartitions=[], endOffsets={0=9223372036854775807}, sentinel=false, checkpointed=false}
2020-12-15T11:33:57,385 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Adding partition[0], start[4] -> end[9223372036854775807] to assignment.
2020-12-15T11:33:57,387 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-kafka-supervisor-mhchilod-1, groupId=kafka-supervisor-mhchilod] Subscribed to partition(s): protobuff-druid-test-py-0
2020-12-15T11:33:57,393 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[0] to[4].
2020-12-15T11:33:57,393 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-kafka-supervisor-mhchilod-1, groupId=kafka-supervisor-mhchilod] Seeking to offset 4 for partition protobuff-druid-test-py-0
2020-12-15T11:33:57,685 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.security.basic.BasicSecurityResourceFilter to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,740 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.security.basic.authentication.endpoint.BasicAuthenticatorResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,768 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.security.basic.authorization.endpoint.BasicAuthorizerResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,770 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.http.security.StateResourceFilter to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,778 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.http.SegmentListerResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,781 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.http.HistoricalResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,782 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.QueryResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,785 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.segment.realtime.firehose.ChatHandlerResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,787 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.http.security.ConfigResourceFilter to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,792 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.query.lookup.LookupListeningResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,795 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.query.lookup.LookupIntrospectionResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,797 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.StatusResource to GuiceManagedComponentProvider with the scope "Undefined"
2020-12-15T11:33:57,834 INFO [main] org.eclipse.jetty.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@6248cfab{/,null,AVAILABLE}
2020-12-15T11:33:57,848 INFO [main] org.eclipse.jetty.server.AbstractConnector - Started ServerConnector@4a833595{HTTP/1.1, (http/1.1)}{0.0.0.0:8100}
2020-12-15T11:33:57,848 INFO [main] org.eclipse.jetty.server.Server - Started @5737ms
2020-12-15T11:33:57,849 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Starting lifecycle [module] stage [ANNOUNCEMENTS]
2020-12-15T11:33:58,054 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Successfully started lifecycle [module]
2020-12-15T11:33:58,144 INFO [task-runner-0-priority-0] org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-kafka-supervisor-mhchilod-1, groupId=kafka-supervisor-mhchilod] Cluster ID: 1Ay9hrPhR0qKn-CCrYihgg
2020-12-15T11:33:58,622 DEBUG [qtp758201484-122] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/status HTTP/1.1
2020-12-15T11:33:58,638 DEBUG [qtp758201484-113] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/checkpoints HTTP/1.1
2020-12-15T11:33:58,645 DEBUG [qtp758201484-133] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/time/start HTTP/1.1
2020-12-15T11:34:23,731 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:34:53,731 DEBUG [qtp758201484-115] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:35:23,731 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:35:53,729 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:36:23,730 DEBUG [qtp758201484-137] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:36:53,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:37:23,729 DEBUG [qtp758201484-121] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:37:53,731 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:38:23,729 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:38:53,730 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:39:23,731 DEBUG [qtp758201484-142] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:39:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:40:23,730 DEBUG [qtp758201484-137] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:40:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:41:23,729 DEBUG [qtp758201484-137] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:41:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:42:23,731 DEBUG [qtp758201484-137] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:42:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:43:23,730 DEBUG [qtp758201484-121] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:43:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:44:23,730 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:44:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:45:23,729 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:45:53,731 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:46:23,731 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:46:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:47:23,729 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:47:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:48:23,729 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:48:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:49:23,729 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:49:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:50:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:50:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:51:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:51:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:52:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:52:53,731 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:53:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:53:53,730 DEBUG [qtp758201484-121] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:54:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
    

标签: apache-kafkadruidpydruid

解决方案


所以我发现当从 kafka 摄取时,如果时间戳是相同的(我这边的一个愚蠢的错误),那么我们将无法看到新消息。在将时间戳设置为实时而不是一些硬编码数据之后,它就像一个魅力。


推荐阅读