大数据

mongodb kafka connector

一、准备工作

mongodb 版本大于6.0

mongodb-kafka-connect 1.8.1

avro 1.9.2

zookeeper 3.8.4

kafka 2.6.3

二、mongodb 设置副本集

将 MongoDB 转换为副本集(推荐)
即使只有 1 个节点,也可以配置为 单节点副本集(适用于开发和测试环境)。

编辑 MongoDB 配置文件,通常位于 /etc/mongod.conf

# 关键:启用副本集
replication:
  replSetName: rs0 

rs0为设置副本集名称

重启mongod

sudo systemctl restart mongod

初始化副本集

mongosh
rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "localhost:27017" }
  ]
});

检查状态:

rs.status();

如果看到 "ok": 1,说明副本集已生效。

三、创建Connector

Source Connector

下载MongoDB Connector1.8.1版本文件,解压至本地,目录结构如下所示。

.
├── assets
   ├── mongodb-leaf.png
   └── mongodb-logo.png
├── doc
   ├── LICENSE.txt
   └── README.md
├── etc
   ├── MongoSinkConnector.properties
   └── MongoSourceConnector.properties
├── lib
   └── mongo-kafka-connect-1.9.1-confluent.jar
└── manifest.json

从Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下

创建mongo-source.properties文件

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=test
collection=students

topic.prefix=
topic.suffix=
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=

启动mongo同步kafka 服务

/data/kafka/bin/connect-standalone.sh /data/kafka/config/connect-standalone.properties /root/tmp/mongo-source.properties