Data Replication from Oracle to Apache Kafka using Oracle GoldenGate 21c
Hosts and databases used in this environment.
|
Category |
Source |
Target |
|
Host |
srvdb01.localdomain (OEL8) |
srvdb02.localdomain (OEL8) |
|
Database |
Oracle |
Kafka |
|
Database/Topics Name |
master |
REGIONS |
|
Database Version |
19.23.0.0.0 |
3.8.0 |
|
Database Home |
/u01/app/oracle/product/19.0.0/dbhome_1 |
/opt/kafka_2.13-3.8.0 |
|
Goldengate Version |
21.3.0.0.0 |
21.4.0.0.0 |
|
Goldengate Home |
/u01/app/oracle/product/21.3.0/ogg_1 |
/opt/oggbd |
|
Goldengate Schema |
oggadmin |
- |
|
Table/Topics for Replication |
HR.REGIONS |
REGIONS |
We will set up Oracle Goldengate replication from Oracle database
to Apache Kafka topics in real-time.
|
Oracle 19.23.0.0.0 (Source) |
Apache Kafka 3.8.0 (Target) |
|
1.
Prepare the database for Goldengate replication, enable archivelog mode, set force
logging, enable minimal supplemental logging and set enable_goldengate_replication
parameter to true. Version 19.23.0.0.0 Copyright
(c) 1982, 2023, Oracle. All rights
reserved. SQL>
startup mount; ORACLE instance started. Total
System Global Area 838860280 bytes Fixed
Size 8945144 bytes Variable
Size 725614592 bytes Database
Buffers 96468992 bytes Redo
Buffers 7831552 bytes Database
mounted. SQL> alter database archivelog; Database altered. SQL> alter database open; Database altered. SQL> alter database force logging; Database altered. SQL> alter database add supplemental log data; Database altered. SQL> alter system set enable_goldengate_replication=true scope=both; System
altered. |
1.
Install Java 1.8 and download, extract the latest Kafka release. Installed:
java-1.8.0-openjdk-headless-1:1.8.0.362.b09-2.el8_7.x86_64 Complete! [root@dbsrv02
~]# groupadd -g 54331 kafka [root@dbsrv02
~]# useradd -u 54331 -g kafka kafka [root@dbsrv02
~]# wget https://dlcdn.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz -O
/opt/kafka_2.13-3.8.0.tgz [root@dbsrv02
~]# cd /opt tar
-xzf kafka_2.13-3.8.0.tgz [root@dbsrv02
~]# chown -R kafka:kafka kafka_2.13-3.8.0/ |
|
2.
Create a Goldengate user with a dedicated tablespace and grant privileges. SQL> create user oggadmin identified by oggadmin default tablespace oggtbsp temporary tablespace temp; grant
connect,resource to oggadmin; grant
create table,create session,create view to oggadmin; grant
select any dictionary,select any table,select any transaction to oggadmin; grant
create any table to oggadmin; grant
flashback any table to oggadmin; grant
execute on dbms_flashback to oggadmin; grant
execute on utl_file to oggadmin; grant
unlimited tablespace to oggadmin; grant
select on system.logmnr_session$ to oggadmin; grant
alter system to oggadmin; grant
dba to oggadmin; User created. SQL> Grant succeeded. SQL> Grant succeeded. SQL> Grant succeeded. SQL> Grant succeeded. SQL> Grant succeeded. SQL> Grant succeeded. SQL> Grant succeeded. SQL> Grant succeeded. SQL> Grant succeeded. SQL> Grant succeeded. SQL> Grant succeeded. SQL>
exec dbms_goldengate_auth.grant_admin_privilege- (grantee=>'oggadmin',privilege_type=>'CAPTURE',- grant_select_privileges=>TRUE,do_grants=>TRUE); PL/SQL
procedure successfully completed. |
2.
Modify listeners config and Start Apache Kafka using ZooKeeper. listeners=PLAINTEXT://localhost:9092 [kafka@dbsrv02 ~]$ /opt/kafka_2.13-3.8.0/bin/zookeeper-server-start.sh
-daemon /opt/kafka_2.13-3.8.0/config/zookeeper.properties [kafka@dbsrv02 ~]$ /opt/kafka_2.13-3.8.0/bin/kafka-server-start.sh
-daemon /opt/kafka_2.13-3.8.0/config/server.properties |
|
3.
Unzip the Oracle GoldenGate for Oracle, Prepare the response file with below
parameter values. [oracle@dbsrv01
tmp]$ unzip -qa 213000_fbo_ggs_Linux_x64_Oracle_shiphome.zip [oracle@dbsrv01
tmp]$ cd fbo_ggs_Linux_x64_Oracle_shiphome/Disk1/response/ [oracle@dbsrv01 response]$ vi oggcore.rsp INSTALL_OPTION=ora21c SOFTWARE_LOCATION=/u01/app/oracle/product/21.3.0/ogg_1 START_MANAGER=true MANAGER_PORT=7809 DATABASE_LOCATION=/u01/app/oracle/product/19.0.0/dbhome_1 INVENTORY_LOCATION=/u01/app/oraInventory UNIX_GROUP_NAME=oinstall [oracle@dbsrv01
response]$ cd .. [oracle@dbsrv01
Disk1]$ ./runInstaller -silent -showProgress -responsefile
/tmp/fbo_ggs_Linux_x64_Oracle_shiphome/Disk1/response/oggcore.rsp Starting
Oracle Universal Installer... Checking
Temp space: must be greater than 120 MB.
Actual 4554 MB Passed Checking
swap space: must be greater than 150 MB.
Actual 6608 MB Passed Preparing
to launch Oracle Universal Installer from
/tmp/OraInstall2024-11-01_05-47-18PM. Please wait ... [oracle@dbsrv01
Disk1]$ You can find the log of this install session at: /u01/app/oraInventory/logs/installActions2024-11-01_05-47-18PM.log Prepare
in progress. .................................................. 9% Done. Prepare
successful. Copy
files in progress. .................................................. 45% Done. .................................................. 50% Done. .................................................. 57% Done. .................................................. 62% Done. .................................................. 68% Done. .................................................. 73% Done. .................................................. 79% Done. .................... Copy
files successful. Link
binaries in progress. .......... Link
binaries successful. Setup
files in progress. .................................................. 84% Done. .................................................. 89% Done. Setup
files successful. Setup
Inventory in progress. Setup
Inventory successful. .......... Finish
Setup successful. The
installation of Oracle GoldenGate Core was successful. Please
check '/u01/app/oraInventory/logs/silentInstall2024-11-01_05-47-18PM.log' for
more details. Start
Oracle GoldenGate Manager process. in progress. Successfully
Setup Software. .................................................. 100% Done. Start
Oracle GoldenGate Manager process. successful. [oracle@dbsrv01
~]$ alias ggsci=/u01/app/oracle/product/21.3.0/ogg_1/ggsci [oracle@dbsrv01
~]$ ggsci Oracle
GoldenGate Command Interpreter for Oracle Version
21.3.0.0.0 OGGCORE_21.3.0.0.0_PLATFORMS_210728.1047_FBO Oracle
Linux 7, x64, 64bit (optimized), Oracle Database 21c and lower supported
versions on Jul 29 2021 03:59:23 Operating
system character set identified as UTF-8. Copyright
(C) 1995, 2021, Oracle and/or its affiliates. All rights reserved. GGSCI
(dbsrv01.localdomain) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING
|
3.
Unzip, untar the Oracle Goldengate for Bigdata under Goldengate home directory
and configure the Manager process. [root@dbsrv02 ~]# cd /opt [root@dbsrv02
opt]# mkdir oggbd [root@dbsrv02
opt]# chown kafka:kafka oggbd [root@dbsrv02
opt]# su - kafka [kafka@dbsrv02
~]$ unzip -qa /tmp/214000_ggs_Linux_x64_BigData_64bit.zip -d /opt/oggbd/ [kafka@dbsrv02
oggbd]$ cd /opt/oggbd/ [kafka@dbsrv02
oggbd]$ tar -xf ggs_Linux_x64_BigData_64bit.tar [kafka@dbsrv02
oggbd]$ export LD_LIBRARY_PATH=/usr/lib/jvm/jre/lib/amd64/libjsig.so:\ >
/usr/lib/jvm/jre/lib/amd64/server/libjvm.so:\ >
/usr/lib/jvm/jre/lib/amd64/server:/usr/lib/jvm/jre/lib/amd64 [kafka@dbsrv02
oggbd]$ ./ggsci Oracle
GoldenGate for Big Data Version
21.4.0.0.0 (Build 002) Oracle
GoldenGate Command Interpreter Version
21.4.0.0.0 OGGCORE_21.4.0.0.0OGGRU_PLATFORMS_211022.1803 Oracle
Linux 7, x64, 64bit (optimized), Generic
on Oct 22 2021 23:14:43 Operating
system character set identified as UTF-8. Copyright
(C) 1995, 2021, Oracle and/or its affiliates. All rights reserved. GGSCI
(dbsrv02.localdomain) 1> create subdirs Creating
subdirectories under current directory /opt/oggbd Parameter
file /opt/oggbd/dirprm:
created. Report
file /opt/oggbd/dirrpt: created. Checkpoint
file /opt/oggbd/dirchk:
created. Process
status files
/opt/oggbd/dirpcs: created. SQL
script files
/opt/oggbd/dirsql: created. Database
definitions files
/opt/oggbd/dirdef: created. Extract
data files
/opt/oggbd/dirdat: created. Temporary
files /opt/oggbd/dirtmp:
created. Credential
store files /opt/oggbd/dircrd:
created. Master
encryption key wallet files /opt/oggbd/dirwlt: created. Dump
files
/opt/oggbd/dirdmp: created. GGSCI
(dbsrv02.localdomain) 2> EDIT PARAMS MGR PORT
7809 GGSCI
(dbsrv02.localdomain) 3> START MGR Manager
started. GGSCI
(dbsrv02.localdomain) 4> INFO ALL Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING |
|
4. On source, table structure and datatype look as below. SQL>
conn HR Enter
password: Connected. SQL>
desc REGIONS; Name
Null? Type -----------------------------------------
-------- ------------------ REGION_ID NOT NULL
NUMBER REGION_NAME
VARCHAR2(25) SQL>
select * from REGIONS; REGION_ID REGION_NAME ----------
------------------------- 1 Europe 2 Americas 3 Asia 4 Middle East and Africa |
4. Copy and modify the Oracle GoldenGate Kafka sample files, update the highlighted values accordingly. [kafka@dbsrv02
~]$ cd /opt/oggbd/AdapterExamples/big-data/kafka [kafka@dbsrv02
kafka]$ cp rkafka.prm kafka.props custom_kafka_producer.properties
/opt/oggbd/dirprm/ [kafka@dbsrv02
~]$ cd /opt/oggbd/dirprm/ [kafka@dbsrv02
dirprm]$ vi custom_kafka_producer.properties bootstrap.servers=localhost:9092 acks=1 reconnect.backoff.ms=1000 compression.type=gzip value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=16384 linger.ms=10000 [kafka@dbsrv02
dirprm]$ vi kafka.props gg.handlerlist=kafkahandler gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.kafkaProducerConfigFile=custom_kafka_producer.properties #The
following resolves the topic name using the short table name gg.handler.kafkahandler.topicMappingTemplate=${tableName} #The
following selects the message key using the concatenated primary keys gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys} #gg.handler.kafkahandler.schemaTopicName=mySchemaTopic gg.handler.kafkahandler.blockingSend=false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op #gg.handler.kafkahandler.metaHeadersTemplate=${alltokens} gg.handler.kafkahandler.transactionsEnabled=false gg.handler.kafkahandler.format=json #Sample
gg.classpath for Apache Kafka gg.classpath=dirprm/:/opt/kafka_2.13-3.8.0/libs/*: #Sample
gg.classpath for HDP #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/* |
|
5.
Connect to the source database from ggsci and enable trandata for the table
to be captured. [oracle@dbsrv01
~]$ export TNS_ADMIN=$ORACLE_HOME/network/admin [oracle@dbsrv01
~]$ ggsci Oracle
GoldenGate Command Interpreter for Oracle Version
21.3.0.0.0 OGGCORE_21.3.0.0.0_PLATFORMS_210728.1047_FBO Oracle
Linux 7, x64, 64bit (optimized), Oracle Database 21c and lower supported
versions on Jul 29 2021 03:59:23 Operating
system character set identified as UTF-8. Copyright
(C) 1995, 2021, Oracle and/or its affiliates. All rights reserved. GGSCI
(dbsrv01.localdomain) 1> dblogin userid oggadmin@master,password oggadmin Successfully
logged into database. GGSCI
(dbsrv01.localdomain as oggadmin@master) 2> ADD TRANDATA HR.REGIONS 2024-11-01
22:25:59 INFO OGG-15132
Logging of supplemental redo data enabled for table HR.REGIONS. 2024-11-01
22:25:59 INFO OGG-15133
TRANDATA for scheduling columns has been added on table HR.REGIONS. 2024-11-01
22:25:59 INFO OGG-15438
Enabling logical replication (with all keys) on auto capture table
HR.REGIONS. 2024-11-01
22:25:59 INFO OGG-15135
TRANDATA for instantiation CSN has been added on table HR.REGIONS. 2024-11-01
22:26:00 INFO OGG-26035
Logical replication for table HR.REGIONS is ENABLED. 2024-11-01
22:26:01 INFO OGG-10471
***** Oracle Goldengate support information on table HR.REGIONS ***** Oracle
Goldengate support native capture on table HR.REGIONS. Oracle
Goldengate marked following column as key columns on table HR.REGIONS:
REGION_ID. |
5.
To view JSON format, install jq [kafka@dbsrv02 ~]$ yum install jq |
|
6.
Setup Extract and Data Pump processes on the source database to capture DML
changes. GGSCI
(dbsrv01.localdomain as oggadmin@master) 4> EDIT PARAMS EXTR EXTRACT
EXTR USERID
oggadmin, PASSWORD oggadmin TRANLOGOPTIONS
INTEGRATEDPARAMS (PARALLELISM 2,MAX_SGA_SIZE 2048) EXTTRAIL
./dirdat/ex DISCARDFILE
./dirrpt/EXTR.dsc,APPEND,MEGABYTES 2000 DISCARDROLLOVER
AT 13:00 ON WEDNESDAY REPORTCOUNT
EVERY 10 MINUTES, RATE WARNLONGTRANS
2H, CHECKINTERVAL 2M TABLE
HR.REGIONS; GGSCI
(dbsrv01.localdomain as oggadmin@master) 5> ADD EXTRACT EXTR,INTEGRATED
TRANLOG,BEGIN NOW Integrated
Extract added. GGSCI
(dbsrv01.localdomain as oggadmin@master) 6> ADD EXTTRAIL
./dirdat/ex,EXTRACT EXTR,MEGABYTES 500 EXTTRAIL
added. GGSCI
(dbsrv01.localdomain as oggadmin@master) 7> EDIT PARAMS PUMP EXTRACT
PUMP USERID
oggadmin, PASSWORD oggadmin RMTHOST
192.168.56.82,MGRPORT 7809 PASSTHRU RMTTRAIL
./dirdat/rx DISCARDFILE
./dirrpt/PUMP.dsc,APPEND,MEGABYTES 20000 DISCARDROLLOVER
AT 13:00 ON WEDNESDAY TABLE
HR.REGIONS; GGSCI
(dbsrv01.localdomain as oggadmin@master) 8> ADD EXTRACT
PUMP,EXTTRAILSOURCE ./dirdat/ex Extract
added. GGSCI
(dbsrv01.localdomain as oggadmin@master) 9> ADD RMTTRAIL ./dirdat/rx,
extract PUMP, megabytes 500 RMTTRAIL
added. |
6.
Setup the Replicat process in Goldengate to send data to Kafka. GGSCI
(dbsrv02.localdomain) 1> EDIT PARAMS RKAFKA Replicat
added. REPLICAT
rkafka TARGETDB
LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT
EVERY 1 MINUTES, RATE GROUPTRANSOPS
10000 MAP
HR.*, TARGET *.*; [kafka@dbsrv02
~]$ /opt/oggbd/ggsci Oracle
GoldenGate for Big Data Version
21.4.0.0.0 (Build 002) Oracle
GoldenGate Command Interpreter Version
21.4.0.0.0 OGGCORE_21.4.0.0.0OGGRU_PLATFORMS_211022.1803 Oracle
Linux 7, x64, 64bit (optimized), Generic
on Oct 22 2021 23:14:43 Operating
system character set identified as UTF-8. Copyright
(C) 1995, 2021, Oracle and/or its affiliates. All rights reserved. GGSCI
(dbsrv02.localdomain) 1> ADD REPLICAT RKAFKA,EXTTRAIL ./dirdat/rx Replicat
added. |
|
7. Start the extract and datapump process. GGSCI
(dbsrv01.localdomain as OGGADMIN@master) 9> START EXTR Sending
START request to Manager ... Extract
group EXTR starting. GGSCI
(dbsrv01.localdomain as OGGADMIN@master) 10> START EXTR Sending
START request to Manager ... Extract
group PUMP starting. GGSCI
(dbsrv01.localdomain as OGGADMIN@master) 11> INFO ALL Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING EXTRACT RUNNING EXTR 00:00:03 00:00:04 EXTRACT RUNNING PUMP 00:00:00 00:00:07 |
7. Start the replicat process. GGSCI
(dbsrv02.localdomain) 2> ADD REPLICAT RKAFKA,EXTTRAIL ./dirdat/rx Replicat
added. GGSCI
(dbsrv02.localdomain) 3> START RKAFKA Sending
START request to Manager ... Replicat
group RKAFKA starting. GGSCI
(dbsrv02.localdomain) 4> INFO ALL Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING REPLICAT RUNNING RKAFKA 00:00:00 00:00:00 |
|
8.
Perform DML operations on the source database. [oracle@dbsrv01
~]$ sqlplus / as sysdba SQL*Plus:
Release 19.0.0.0.0 - Production on Thu Nov 7 01:46:49 2024 Version
19.23.0.0.0 Copyright
(c) 1982, 2023, Oracle. All rights
reserved. Connected
to: Oracle
Database 19c Enterprise Edition Release 19.0.0.0.0 - Production Version
19.23.0.0.0 SQL>
conn HR Enter
password: Connected. SQL>
select * from regions; REGION_ID REGION_NAME ----------
------------------------- 1 Europe 2 Americas 3 Asia 4 Middle East and Africa SQL>
insert into regions values (5,'USA'); 1
row created. SQL>
commit; Commit
complete. SQL>
delete from regions where REGION_ID=5; 1
row deleted. SQL>
commit; Commit
complete. |
8. Use kafka-console-consumer to consume messages from the Kafka topic “REGIONS” and verify successful replication. [kafka@dbsrv02
dirprm]$ cd /opt/kafka_2.13-3.8.0/bin/ [kafka@dbsrv02
bin]$./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
REGIONS --from-beginning | jq { "table": "HR.REGIONS", "op_type": "I", "op_ts": "2024-11-07
01:42:30.150548", "current_ts": "2024-11-07
01:42:37.670000", "pos":
"00000000020000002519", "after": { "REGION_ID": 5, "REGION_NAME": "USA" } } { "table": "HR.REGIONS", "op_type": "D", "op_ts": "2024-11-07
01:44:34.100735", "current_ts": "2024-11-07
01:44:42.227000", "pos":
"00000000020000002658", "before": { "REGION_ID": 5, "REGION_NAME": "USA" } } |
Comments
Post a Comment