Apache Kafka的Oracle到MongoDB的数亚博贵宾会贴吧据移动很容易
在数据库世界中,更改数据捕获功能已存在多年。CDC可以通过插入,更新和删除数据来侦听数据库的更改,并将这些事件与其他数据库系统发送到ETL,复制和数据库迁移等各种场景中。通过利用Apache Kafka,Confluent Oracle CDC连接器和Apache Kafka的MongoDB连接器,您可以轻松地将数据库亚博贵宾会贴吧从Oracle流入MongoDB。在此帖子中,我们将通过Oracle将数据传递给MongoDB,为您提供一步一步的配置,让您轻亚博贵宾会贴吧松重复使用,调整和探索功能。在高级别,我们将在一个自包含的Docker撰写环境中配置上面的参考图像,其中包括以下内容:Oracle数据库MongoDB Apache Kafka Confluent KSQL这些容器将在本地网络中运行所有桥接器,以便您可以播放亚博贵宾会贴吧他们来自您当地的Mac或PC。查看GitHub存储库以下载完整示例。准备Oracle Docker Image如果您有现有的Oracle数据库,请从Docker-Compose文件中删除“数据库”部分。如果您尚未拥有Oracle数据库,则可以从Docker Hub中提取Oracle数据库企业版。您需要接受Oracle条款和条件,然后通过Docker登录登录您的Docker帐户然后Docker Pull Store / Oracle / Database-Enterprise:12.2.0.1-Slim在本地下载图像。 Launching the docker environment The docker-compose file will launch the following: Apache Kafka including Zookeeper, REST API, Schema Registry, KSQL Apache Kafka Connect MongoDB Connector for Apache Kafka Confluent Oracle CDC Connector Oracle Database Enterprise The complete sample code is available from a GitHub repository . To launch the environment, make sure you have your Oracle environment ready and then git clone the repo and build the following: docker-compose up -d --build Once the compose file finishes you will need to configure your Oracle environment to be used by the Confluent CDC Connector. Step 1: Connect to your Oracle instance If you are running Oracle within the docker environment, you can use docker exec as follows: docker exec -it oracle bash -c "source /home/oracle/.bashrc; sqlplus /nolog " connect / as sysdba Step 2: Configure Oracle for CDC Connector First, check if the database is in archive log mode. select log_mode from v$database; If the mode is not “ARCHIVELOG”, perform the following: SHUTDOWN IMMEDIATE; STARTUP MOUNT; ALTER DATABASE ARCHIVELOG; ALTER DATABASE OPEN; Verify the archive mode: select log_mode from v$database The LOG_MODE should now be, “ARCHIVELOG”. Next, enable supplemental logging for all columns ALTER SESSION SET CONTAINER=cdb$root; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; The following should be run on the Oracle CDB: CREATE ROLE C##CDC_PRIVS; GRANT CREATE SESSION, EXECUTE_CATALOG_ROLE, SELECT ANY TRANSACTION, SELECT ANY DICTIONARY TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO C##CDC_PRIVS; CREATE USER C##myuser IDENTIFIED BY password CONTAINER=ALL; GRANT C##CDC_PRIVS TO C##myuser CONTAINER=ALL; ALTER USER C##myuser QUOTA UNLIMITED ON sysaux; ALTER USER C##myuser SET CONTAINER_DATA = (CDB$ROOT, ORCLPDB1) CONTAINER=CURRENT; ALTER SESSION SET CONTAINER=CDB$ROOT; GRANT CREATE SESSION, ALTER SESSION, SET CONTAINER, LOGMINING, EXECUTE_CATALOG_ROLE TO C##myuser CONTAINER=ALL; GRANT SELECT ON GV_$DATABASE TO C##myuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##myuser CONTAINER=ALL; GRANT SELECT ON GV_$ARCHIVED_LOG TO C##myuser CONTAINER=ALL; GRANT CONNECT TO C##myuser CONTAINER=ALL; GRANT CREATE TABLE TO C##myuser CONTAINER=ALL; GRANT CREATE SEQUENCE TO C##myuser CONTAINER=ALL; GRANT CREATE TRIGGER TO C##myuser CONTAINER=ALL; ALTER SESSION SET CONTAINER=cdb$root; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; GRANT FLASHBACK ANY TABLE TO C##myuser; GRANT FLASHBACK ANY TABLE TO C##myuser container=all; Next, create some objects CREATE TABLE C##MYUSER.emp ( i INTEGER GENERATED BY DEFAULT AS IDENTITY, name VARCHAR2(100), lastname VARCHAR2(100), PRIMARY KEY (i) ) tablespace sysaux; insert into C##MYUSER.emp (name, lastname) values ('Bob', 'Perez'); insert into C##MYUSER.emp (name, lastname) values ('Jane','Revuelta'); insert into C##MYUSER.emp (name, lastname) values ('Mary','Kristmas'); insert into C##MYUSER.emp (name, lastname) values ('Alice','Cambio'); commit; Step 3: Create Kafka Topic Open a new terminal/shell and connect to your kafka server as follows: docker exec -it broker /bin/bash When connected create the kafka topic : kafka-topics --create --topic SimpleOracleCDC-ORCLCDB-redo-log \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000 Step 4: Configure the Oracle CDC Connector The oracle-cdc-source.json file in the repository contains the configuration of Confluent Oracle CDC connector. To configure simply execute: curl -X POST -H "Content-Type: application/json" -d @oracle-cdc-source.json http://localhost:8083/connectors Step 5: Setup kSQL data flows within Kafka As Oracle CRUD events arrive in the Kafka topic, we will use KSQL to stream these events into a new topic for consumption by the MongoDB Connector for Apache Kafka. docker exec -it ksql-server bin/bash ksql http://127.0.0.1:8088 Enter the following commands: CREATE STREAM CDCORACLE (I DECIMAL(20,0), NAME varchar, LASTNAME varchar, op_type VARCHAR) WITH ( kafka_topic='ORCLCDB-EMP', PARTITIONS=1, REPLICAS=1, value_format='AVRO'); CREATE STREAM WRITEOP AS SELECT CAST(I AS BIGINT) as "_id", NAME , LASTNAME , OP_TYPE from CDCORACLE WHERE OP_TYPE!='D' EMIT CHANGES; CREATE STREAM DELETEOP AS SELECT CAST(I AS BIGINT) as "_id", NAME , LASTNAME , OP_TYPE from CDCORACLE WHERE OP_TYPE='D' EMIT CHANGES; To verify the steams were created: SHOW STREAMS; This command will show the following: Stream Name | Kafka Topic | Format ------------------------------------ CDCORACLE | ORCLCDB-EMP | AVRO DELETEOP | DELETEOP | AVRO WRITEOP | WRITEOP | AVRO ------------------------------------ Step 6: Configure MongoDB Sink The following is the configuration for the MongoDB Connector for Apache Kafka: { "name": "Oracle", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "WRITEOP", "connection.uri": "mongodb://mongo1", "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy", "database": "kafka", "collection": "oracle", "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.overwrite.existing": "true", "document.id.strategy.partial.value.projection.type": "allowlist", "document.id.strategy.partial.value.projection.list": "_id", "errors.log.include.messages": true, "errors.deadletterqueue.context.headers.enable": true, "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry:8081", "key.converter":"org.apache.kafka.connect.storage.StringConverter" } } In this example, this sink process consumes records from the WRITEOP topic and saves the data to MongoDB. The write model, UpdateOneBusinessKeyTimestampStrategy, performs an upsert operation using the filter defined on PartialValueStrategy property which in this example is the "_id" field. For your convenience, this configuration script is written in the mongodb-sink.json file in the repository. To configure execute: curl -X POST -H "Content-Type: application/json" -d @mongodb-sink.json http://localhost:8083/connectors Delete events are written in the DELETEOP topic and are sinked to MongoDB with the following sink configuration: { "name": "Oracle-Delete", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "DELETEOP", "connection.uri": "mongodb://mongo1”, "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy", "database": "kafka", "collection": "oracle", "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.overwrite.existing": "true", "document.id.strategy.partial.value.projection.type": "allowlist", "document.id.strategy.partial.value.projection.list": "_id", "errors.log.include.messages": true, "errors.deadletterqueue.context.headers.enable": true, "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry:8081" } } curl -X POST -H "Content-Type: application/json" -d @mongodb-sink-delete.json http://localhost:8083/connectors This sink process uses the DeleteOneBusinessKeyStrategy writemdoel strategy . In this configuration, the sink reads from the DELETEOP topic and deletes documents in MongoDB based upon the filter defined on PartialValueStrategy property. In this example that filter is the “_id” field. Step 7: Write data to Oracle Now that your environment is setup and configured, return to the Oracle database and insert the following data: insert into C##MYUSER.emp (name, lastname) values ('Juan','Soto'); insert into C##MYUSER.emp (name, lastname) values ('Robert','Walters'); insert into C##MYUSER.emp (name, lastname) values ('Ruben','Trigo'); commit; Next, notice the data as it arrived in MongoDB by accessing the MongoDB shell. docker exec -it mongo1 /bin/mongo The inserted data will now be available in MongoDB. If we update the data in Oracle e.g. UPDATE C##MYUSER.emp SET name=’Rob’ WHERE name=’Robert’; COMMIT;\ The document will be updated in MongoDB as: { "_id" : NumberLong(11), "LASTNAME" : "Walters", "NAME" : "Rob", "OP_TYPE" : "U", "_insertedTS" : ISODate("2021-07-27T10:25:08.867Z"), "_modifiedTS" : ISODate("2021-07-27T10:25:08.867Z") } If we delete the data in Oracle e.g. DELETE FROM C##MYUSER.emp WHERE name=’Rob’; COMMIT;. The documents with name=’Rob’ will no longer be in MongoDB. Note that it may take a few seconds for the propagation from Oracle to MongoDB. Many possibilities In this post we performed a basic setup of moving data from Oracle to MongoDB via Apache Kafka and the Confluent Oracle CDC Connector and MongoDB Connector for Apache Kafka. While this example is fairly simple, you can add more complex transformations using KSQL and integrate other data sources within your Kafka environment making a production ready ETL or streaming environment with best of breed solutions. Resources How to Get Started with MongoDB Atlas and Confluent Cloud Announcing the MongoDB Atlas Sink and Source Connectors in Confluent Cloud Making your Life Easier with MongoDB and Kafka Streaming Time-Series Data Using Apache Kafka and MongoDB