StarQuest Technical Documents

Using the SQDR Kafka Producer

Last Update: 4 April 2019
Product: StarQuest Data Replicator
Version: SQDR 4.6 or later
Article ID: SQV00DR034

Abstract

This tech note describes how to install and configure the SQDR Kafka Producer and a StarQuest-supplied sample Kafka consumer.

Apache Kafka is an open-source message broker project that aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. The design is heavily influenced by transaction logs.

Customers requiring high speed access to legacy system change data (such as IBM DB2, Oracle and Microsoft SQL Server) to drive information into new generation data engines, such as Apache Hive, MongoDB, Hadoop, MemSQL, Cassandra, etc. will find their requirements well met by the StarQuest real time replication solution, SQDR.

Overview

Prerequisites

Setup Procedure:
Setting up the StarQuest Kafka files
Setting up Kafka and System environment variables
Local hosts File
Install the SQDR Kafka Producer as a Java stored procedure in the local SQDRC control database
Creating the Incremental Group
Adding the Kafka Connection Properties to the IR Group
Configuring and Starting the Kafka Consumer
Troubleshooting

sqdrJdbcBaseline Application

Reference:
XML Format
JSON Format

Hints and FAQ

Overview

SQDR provides support for streaming change data to Apache Kafka consumers.   The StarQuest-provided Kafka Producer may be associated with an incremental group containing one or more subscriptions.  The producer injects a message for each subscribed table’s activity, with one message for each Insert, Update and Delete DML operation.  While each group may be associated with only one producer; multiple groups may be configured to use different producers which may also share common source tables.  No customization of the producer code is required to use the functionality.

In addition to the SQDR Producer, a sample consumer is provided as Java source code. The sample illustrates parsing the message and exploiting the message details.  Users are encouraged to utilize this sample as a template for creating custom consumers.

The architecture relies upon the SQDR Change Data Processing support.  The Kafka producer runs as a cached stored procedure within the SQDR-supplied DB2 for LUW control database.  Each change row processed by the incremental support is passed to the stored procedure and injected as a message.  Reliable, high speed distribution of the message is a service of the Kafka message bus. 

Prerequisites

Download the binary distribution of Kafka from http://kafka.apache.org/downloads.html; at the time of the latest revision of this document, the current version is kafka_2.12-2.1.1.tgz (Kafka 2.1.1 built with Scala 2.12). On Windows, you will need 7-zip or another compression utility to unpack the .tgz file, or contact StarQuest support for a .zip distribution.

Download Kafka.zip, containing the StarQuest Producer and source and binary files for the sample Consumer, along with installation scripts.

You must be using SQDR 4.65 or later. If you are using a combined tier solution, the installer for SQDR Plus contains a public JRE that is used by the installation scripts and by the sample Consumer.

The producer runs as a Java stored procedure within the local DB2 LUW control database. Since Kafka 2.0 (July 2018) and later requires JRE 8 or later, we recommend using DB2 LUW 11.1, which includes IBM JRE 8 JVM; DB2 LUW 10.5 contains the IBM JRE 7 JVM, which does not work with recent versions of Kafka. A workaround for this issue is to configure the DBM parameter JDK_PATH to specify an external JRE

To customize the Consumer, you will need a Java IDE (e.g. Eclipse) and the JDK.

You also need to know the IP address or hostname and port that your Kafka server is listening on.

SETUP PROCEDURE:

Setting up the StarQuest Kafka files

The StarQuest-supplied files are supplied as a zip file; extract this zip file into C:\dev\kafka. In addition to the ready-to-run jar files in C:\dev\Kafka\KafkaProcedure\jars, the source directories can be imported into Eclipse.

Setting up Kafka and System environment variables

  1. Download the binary distribution for Kafka and extract it to a directory of your choice; we recommend using a directory path without spaces. In our setup, the Kafka installation directory is C:\bin\kafka.
  2. Create a new system environment variable KAFKALIB containing the path to the lib subdirectory of your Kafka installation

e.g.
KAFKALIB=C:\bin\kafka_2.11-0.11.0.1\libs

  1. Modify the CLASSPATH system environment variable, adding:

%KAFKALIB%\kafka-clients-2.1.1.jar; %KAFKALIB%\kafka_2.1.1.jar;%KAFKALIB%\kafka-tools-2.1.1.jar;%KAFKALIB%\scala-library-2.12.7.jar;%KAFKALIB%\log4j-1.2.17.jar;%KAFKALIB%\slf4j-api-1.7.25.jar;%KAFKALIB%\slf4j-log4j12-1.7.25.jar;%KAFKALIB%\kafka-log4j-appender-2.1.1.jar

These version numbers will vary depending on the version of kafka.

  1. Reboot the system so that the new CLASSPATH value is available to Java stored procedures running under DB2.

Local hosts File

When the Kafka producer communicates with the Kafka server, the server may return its hostname. If your DNS system is not configured to resolve that hostname correctly, open C:\Windows\System32\drivers\etc\hosts in Notepad or another text editor and add line similar to the following:

172.19.36.175 mykafka.mydomain.com

Install the SQDR Kafka Producer as a Java stored procedure in the local SQDRC control database

  1. Open a command window.
  2. cd C:\dev\Kafka\KafkaProcedure\jars
  3. Open install.bat in Notepad or another editor and update directory locations and password if necessary. The credentials are those of the local Windows user sqdr that SQDR uses to access the local DB2 database.
  4. Run install.bat - this calls SQLJ.INSTALL_JAR() to create the Java stored procedure and calls CREATE PROCEDURE to register it.

To update the Java stored procedure at a later time, edit and run replace.bat.

The SQL script createproc.sql is supplied for informational and troubleshooting purposes.

Creating the Incremental Group

If it does not already exist, use Data Replicator Manager to create an incremental group and add subscriptions.

Here are some configuration hints:

If the goal is to send the change data as a Kafka message, rather than updating a destination, create the group and subscriptions as follows:

You may use the control database (SQDRC) as the "destination", even if it doesn’t match the “true” destination type – e.g. MemSQL.

Create the I/R group first, specifying the following on the Advanced panel::

  • Change Apply/Row Limit from 2000 to 20000.
  • Change Data:
    Logging Level: Data-Always
    Select the Control DBMS radio button
    Stored Procedure: TOKAFKA
    (This will invoke the stored procedure locally in the SQDRC context.)
  • Subscription Wizard Apply Defaults:
    Specify Only Stream Change Data – this indicates that Apply will not attempt to perform any operations on the destination tables.

When creating the subscriptions, specify the following on the Destination panel:

  • For I/R Apply Options, confirm that Only Stream Change Data is selected. This setting is inherited from the group default, and insures that no attempt is made to apply change data into the destination; only the stored procedure will be invoked. Note that this setting will be greyed-out after setting the following options.
  • For Baseline Replication Options, specify Append replicated rows to existing data
    and Null synchronization - DDL Only, so that no baseline activity will occur. These settings insure that “running the baseline” does not result in an error; however, the subscription is successfully “registered” with the Capture Agent.


Subscriptions still need to be “run” in order to enable capture & apply.

Adding the Kafka Connection Properties to the IR Group

In Data Replicator Manager, modify the Description field of the incremental group, entering the following values. Multiple values should be separated by the semicolon character.

  • Connection to the Kafka server. This value will also be used in kafka.properties when configuring the consumer.

bootstrap.servers=172.19.36.175:6667

  • Kafka Topic (optional): If not specified, the default value is the group name, with spaces replaced by underscore.

topic=My_Topic

  • Encoding (optional): If not specified the default behavior is to send data in XML format. You can also request JSON encoding by setting the SQDR advanced setting useXMLEncoding to 0.

local.encoding=XML
local.encoding=JSON

group properties

 

If necessary, enable capture & apply. by running the subscription.

 

Configuring and Starting the Kafka Consumer

  1. Edit consumer.bat or consumer.sh and set the TOPIC variable. This value is used as the Kafka topic.
    e.g.
    set TOPIC=Sample_Group
  2. Delete any old copies of kafka.properties and run consumer.bat or consumer.sh once to create kafka.properties.
  3. Edit kafka.properties and modify this line, supplying the IP address or hostname and port of your Kafka server, including the backslash character:
    bootstrap.servers=172.19.36.175\:6667
  4. Optional: Add a line like the following. This value does not have to match the name of the IR group, as long as it is unique (e.g. if you are running multiple groups, and thus multiple producer/consumer pairs, using a unique ID)
    group.id=MY_IR_GROUP
  5. Run consumer.bat or consumer.sh again. This time, the application will continue to run and start to display data received from the SQDR Producer. The sample consumer parses the XML or JSON data structure supplied by the Producer and sends the output to stdout.
  6. Enter control-C to terminate the application.

Troubleshooting

Limited troubleshooting capabilities are available when invoking the Producer stored procedure from SQDR. To assist with troubleshooting, the ProcTester Java application is provided. To run ProcTester, obtain the HEXID of the incremental group and modify and run ProcTester.bat.

Note: The current version of ProcTester only works when SQDR is configured to send JSON. An older version of ProcTester that works with XML is located in the subdirectory ProcTester.XML.

Obtain the Group ID

After the I/R group is created, you can obtain its ID, which is a 32 character GUID stored in the SQDRC control database. This value is used by the test utility.

You can obtain its value by executing the following SQL from a DB2 command window (db2cmd):

db2> connect to SQDRC user SQDR using <password>
db2> select id from SQDR.GROUPS where GROUP_NAME like 'MY_IR_GROUP'


ID
-----------------------------------
x'03A6116FDE36CE49B0978E50B4487365'

You can also use IBM Data Studio to examine the SQDR.GROUPS table.

Configure and run ProcTester

  1. Edit ProcTester.bat and set HEXID to the Group ID value obtained in the previous step.
  2. Update directory locations and password if necessary. The credentials are those of the local Windows user sqdr that SQDR uses to access the local DB2 database.
  3. Run ProcTester.bat from a command window.

 If you get the error Missing required configuration "bootstrap.servers" which has no default value, confirm that you have configured the correct HEXID and that the bootstrap.servers value has been configured in the Comment of the group.

The sqdrJdbcBaseline application can also be used for testing, since it does not use the DB2 stored procedure at all.

 

sqdrJdbcBaseline Application

The SQDR Kafka Producer described above produces only change data. The sqdrJDBCBaseline application supplements that support by providing a method of performing a baseline of the desired table, resulting in Kafka output that can be read by the same consumer. Output of the baseline application can be formatted as JSON or XML, and is identical to that produced by the SQDR Kafka Producer except for producing an operation code of "L" (Load) rather than "I" (Insert).

The sqdrJdbcBaseline directory contains the following files:

  • sqdrJdbcBaseline.jar
  • sqdrJdbcBaseline.bat
  • drivers\ - a subdirectory containing JDBC drivers for all supported host databases
  • sample_configs\ - a subdirectory containing sample configurations for all supported host databases

Setup

  • Make sure that CLASSPATH includes the Kafka libraries as described above.
  • The batch file refers to the following additional jar files from the Kafka 0.11.0.1 distribution. Edit the path and filenames if you are using a different Kafka distribution.
    • JSON jar files
    • the Apache commons-lang Jar file
      This also can be downloaded from: https://commons.apache.org/proper/commons-lang/download_lang.cgi
  • Choose a config file from the sample_configs directory and copy it to the main directory, renaming it as desired. We suggest using an identifying name such as the host system's hostname.
  • Edit the configuration file e.g. mysys.cfg:
    • The group is obtained from the SDQR Plus control tables (SQDRPn) with the following SQL. This parameter is optional; unless your consumer requires it, in most cases you can ignore this parameter, though a value should still be provided.

select SUBSCRIBERID from sqdr.sq_subscribers where SUBSCRIBERNAME like '%<subscriber>%'

    • Customize the other fields e.g sourceurl, sourceuser, sourcepassword, sql, topic. If you have defined an SQDR Plus agent for the source system, you can refer to the agent configuration (sqagent.properties) for the sourceurl.
    • Interval value of 5000 = 5 seconds.
    • kafkaproperties is the same value as that configured in the Description field of the SQDR incremental group.
  • Invoke the batch file, supplying the config file name (without the .cfg extension).

C> sqdrJdbcBaseline mysys

You can supply multiple config files e.g.

C> sqdrJdbcBaseline sys1 sys2 sys3

If you invoke the batch file without supplying a config name, the batch file will use a config file with the same basename as the batch file; for example, invoking sqdrJdbcBaseline is the same as invoking sqdrJdbcBaseline sqdrJdbcBaseline (where there is a config file named .sqdrJdbcBaseline.cfg).

XML Format

Below is an example of the XML format produced by the TOKAFKA stored procedure. The Consumer needs to be able to parse and interpret this information.

    
<changerecord  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">    <group>8AF1F6CEF0E89D46868BE458FD59F889</group>    <txid>00000000000000005523</txid>    <seq>1463070235481</seq>    <operation>U</operation>    <beforekey>    	<id>4711</id>    </beforekey>    <afterimage>    <Dest>"sample"."dbo"."testtable"</Dest>       <row>    		<col1>Example  column data</col1>    		<col2>00000000000000005522</col2>    	    <col3  xsi:nil="true" />    	</row>    </afterimage>
</changerecord>

The group is always present and contains the GUID of the Group to which the Subscription belongs.

The txid is the SQDR Plus-generated hex-binary transaction id for the change record. Note that this is not the transaction id from the source system.

The seq is a BIGINT sequence number that represents the relative order in which the update occurs in the source log within a given transaction.  For a given source log, it is also the sequential order in which the operation was processed within the log. It can be used in conjunction with txid and Dest to detect Change Data Replay, if that functionality is needed by the Consumer. See Replay below for more information.

The operation (type) is always present. Its value is one of I/D/U for Insert/Delete/Update source-table operations. There are additional operations that the Consumer may need to process.

  • S indicates that the subscription needs to be rebaselined. This can be an indication of a REORG at the source or that there is a disruption of the SQDR Plus Capture Agent’s log reading position.
  • X indicates that the source table has been dropped. An A operation indicates that the source table has been altered.
  • B operation may be generated for subscriptions with “criteria” (i.e. a where clause). This additional change data row sent for each update contains the values of all the columns prior to the update. This option is not available for some source databases, such DB2 for i for with only *AFTER journaling or SQL Server sources.
  • C indicates that all change records collected during a baseline copy have been processed. This record can refer to baselines for other groups or even other agents subscribing to the same table.

The beforekey is available for U and D records. If the subscription is configured to use RRN, it contains values for the RRN column. Otherwise if will contain the column values of the source table’s primary key, or the aggregate columns of unique indexes when no primary key exists.

The afterimage always contains the Dest element which identifies the table the operation pertains to. The afterimage optionally contains a row element that contain column values for an Insert or Update.

NULL values in columns or before keys are indicated by the xsi:nil attribute. Binary (char for bit data) columns are encoded in hexBinary.

The XmlRowParser.java and JsonRowParser.java files contain sample parsers that illustrates how to interpret the XML and JSON records sent by the SQDR Kafka Producer. The parsers are used by the supplied sample/test Kafka Consumer.

Replay

Pausing SQDR client operations, disruption in availability of the SQDR Plus Capture Agent or killing/stopping the SQDR service can result in replaying change records to the Kafka Consumer.

In order to detect replay, the Consumer needs to track txid and seq for each subscribed table. If txid is less than the txid of the last processed record for the table, the record can be ignored as a replay. If the txid matches the txid of the last processed record, the records can be ignored if the seq is less than or equal to the seq of the last processed record.

JSON Format

You may configure SQDR to produce JSON output rather than the default of XML by using the Incremental Group level configuration parameter local.encoding=JSON, or the advanced setting useXMLEncoding=0 (the latter is global, affecting all I/R groups). Your consumer application needs to be able to parse the JSON output.

JSON encoding generally results in shorter strings, and is more readily parsed by Java applications.

Hints and FAQ

Question: How can we identify in the XML that is published to Kafka which attributes are used as a primary key or in unique indexes? We would use this metadata on the target system to build relationships between documents."

Answer: Within the XML Change Record, the "Before key" element provides the columns and value pairs associated with the primary key (or unique columns) which may be used to identify the row being reported (for "U" and "D" opcodes).


DISCLAIMER

The information in technical documents comes without any warranty or applicability for a specific purpose. The author(s) or distributor(s) will not accept responsibility for any damage incurred directly or indirectly through use of the information contained in these documents. The instructions may need to be modified to be appropriate for the hardware and software that has been installed and configured within a particular organization.  The information in technical documents should be considered only as an example and may include information from various sources, including IBM, Microsoft, and other organizations.