文章
mongodb kafka connector
一、准备工作
mongodb 版本大于6.0
mongodb-kafka-connect 1.8.1
avro 1.9.2
zookeeper 3.8.4
kafka 2.6.3
二、mongodb 设置副本集
将 MongoDB 转换为副本集(推荐)
即使只有 1 个节点,也可以配置为 单节点副本集(适用于开发和测试环境)。
编辑 MongoDB 配置文件,通常位于 /etc/mongod.conf
# 关键:启用副本集
replication:
replSetName: rs0
rs0为设置副本集名称
重启mongod
sudo systemctl restart mongod
初始化副本集
mongosh
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "localhost:27017" }
]
});
检查状态:
rs.status();
如果看到 "ok": 1,说明副本集已生效。
三、创建Connector
Source Connector
下载MongoDB Connector1.8.1版本文件,解压至本地,目录结构如下所示。
.
├── assets
│ ├── mongodb-leaf.png
│ └── mongodb-logo.png
├── doc
│ ├── LICENSE.txt
│ └── README.md
├── etc
│ ├── MongoSinkConnector.properties
│ └── MongoSourceConnector.properties
├── lib
│ └── mongo-kafka-connect-1.9.1-confluent.jar
└── manifest.json
从Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下
创建mongo-source.properties文件
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=test
collection=students
topic.prefix=
topic.suffix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=
启动mongo同步kafka 服务
/data/kafka/bin/connect-standalone.sh /data/kafka/config/connect-standalone.properties /root/tmp/mongo-source.properties