首页 > 解决方案 > 使用 mongodb-source-connect 时出现“仅在副本集上支持 $changeStream 阶段”错误

问题描述

新年快乐

我在这里是因为我在运行 kafka-mongodb-source - connect遇到错误.

在尝试取得进展时,我遇到了这个错误,我找不到答案,所以我写在这里。

这就是我想做的

bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties share/confluent-hub-components/mongodb-kafka-connect-mongodb/etc/MongoSourceConnector.properties

这是connect-avro-standalone.properties

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=share/java,/Users/anton/Downloads/confluent-5.3.2/share/confluent-hub-components

这是MongoSourceConnecor.properties

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=test
collection=test

这是我得到的一个主要错误

[2020-01-02 18:55:11,546] ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
com.mongodb.MongoCommandException: Command failed with error 40573 (Location40573): 'The $changeStream stage is only supported on replica sets' on server localhost:27017. The full response is {"ok": 0.0, "errmsg": "The $changeStream stage is only supported on replica sets", "code": 40573, "codeName": "Location40573"}

标签: mongodbapache-kafkaavroapache-kafka-connect

解决方案


MongoDB 更改流选项仅在副本集设置中可用。但是,您可以按照以下步骤将独立安装更新为单节点副本集。

  1. 找到mongodb.conf文件并添加副本集详细信息

将以下副本集详细信息添加到mongodb.conf文件

replication:
  replSetName: "<replica-set name>"

例子

复制:replSetName:“rs0”

注意:在 brew 中安装 MongoDB 的位置 /usr/local/etc/mongod.conf

  1. 使用启动副本集rs.initiate()

登录到 MongoDB shell 并运行命令,rs.initiate()这将启动您的副本集。成功启动后的日志如下所示

> rs.initiate()
{
    "info2" : "no configuration specified. Using a default configuration for the set",
    "me" : "127.0.0.1:27017",
    "ok" : 1,
    "$clusterTime" : {
        "clusterTime" : Timestamp(1577545731, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    },
    "operationTime" : Timestamp(1577545731, 1)
}

这就是这两个简单的步骤,您正在运行一个只有一个节点的 MongoDB 副本集。

参考:https ://onecompiler.com/posts/3vchuyxuh/enabling-replica-set-in-mongodb-with-just-one-node


推荐阅读