Stelo Technical Documents

Using SQDR Streaming Support

Last Update: 16 February 2023
Product: StarQuest Data Replicator
Version: SQDR 6.11 or later
Article ID: SQV00DR048

Abstract

SQDR v5 provided support for streaming change data to Apache Kafka and compatible streaming services using the SQDR Kafka Producer, a stored procedure installed in the SQDR control database and invoked from the Apply processing task of SQDR (tier 3).

Introduced in SQDR v6, SQDR Streaming Support provides the same support and more in an integrated, extensible, and more efficient manner. The original Kafka Producer will continue to function, but customers are encouraged to update to the new support.

Apache Kafka is an open-source message broker project that aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. The design is heavily influenced by transaction logs. The Kafka interface can also be used to communicate with cloud-based streaming services such as Azure Event Hubs for Kafka, the Oracle Cloud Infrastructure Streaming service, Amazon Managed Streaming for Apache Kafka, and IBM Event Streams.

Customers requiring high speed access to legacy system change data (such as IBM Db2, Oracle and Microsoft SQL Server) to drive information into new generation data engines, such as Databricks, Spark, Apache Hive, MongoDB, Hadoop, MemSQL, Cassandra, Azure Storage, Azure Data Lake Storage, etc. will find their requirements well met by the Stelo real time replication solution, SQDR.

This technical document provides an overview of the Streaming Support and demonstrates how to set up a simple Producer/Consumer sample. See Using SQDR Streaming Support with Azure Databricks for detailed instructions on setting up a connection to Azure Databricks through Azure Event Hubs.

Overview

Prerequisites

Setup Procedure:
Setting up the Stelo Kafka files
Creating the Incremental Group
Configuring and Starting the Kafka Consumer
Troubleshooting

Reference:
JSON Format
Replay

Azure Event Hubs and Azure Data Lake Store
Oracle Cloud Streaming Service
Amazon Managed Streaming for Apache Kafka
Hints and FAQ

Overview

SQDR provides support for streaming change data to Apache Kafka consumers.   The Stelo-provided Streaming Support may be associated with an incremental group containing one or more subscriptions.  Acting as a Kafka Producer, it injects a message for each subscribed table’s activity, with one message for each Insert, Update and Delete DML operation.  While each group may be associated with only one producer; multiple groups may be configured to use different producers which may also share common source tables.  No customization of the producer code is required to use the functionality.

In addition to the Streaming Support included in SQDR, a sample consumer is provided as Java source code. The sample illustrates parsing the message and exploiting the message details.  Users are encouraged to utilize this sample as a template for creating custom consumers.

Reliable, high speed distribution of the message is a service of the Kafka message bus. 

Prerequisites

Download SQDR_Streaming.zip containing the source and binary files for the sample Consumer.

You must be using SQDR and SQDR Plus 6.11 or later and Db2 11.5.x for the SQDR control database, the database should be named SQDRC, and both SQDR and SQDR Plus must reside on the same system. However, the consumer can reside elsewhere.

The installer for SQDR Plus contains a public JRE and Kafka libraries that can be used by the sample Consumer. To run the sample consumer on a different system, you can install the JRESQ and KafakaSQ packages available from StarQuest.

To customize the Consumer, you will need a Java IDE (e.g. Eclipse) and the JDK.

You also need to know the connectivity information (e.g. IP address or hostname and listening port) of your Kafka server. You can also use a local Kafka server for testing purposes.

SETUP PROCEDURE:

Setting up the StarQuest Kafka files

The Stelo-supplied files are supplied as a zip file; extract this zip file into C:\dev\kafka. In addition to the ready-to-run jar files in C:\dev\Kafka\SQDRConsumer\jars, the source directories can be imported into Eclipse.

Creating the Destination

See Creating an SQDR destination using the Simba ODBC driver if you wish to replicate baseline to a Databricks destination using the Simba ODBC driver. Change data will be replicated through Kafka. You can also use the Simba ODBC driver to create the destination table.

Otherwise, you may use the control database (SQDRC) as the "destination", even if it doesn’t match the “true” destination type – e.g. Databricks. You can connect with the userID SQDR. Both baselines and change data can be replicated through Kafka in many situations.

On the Advanced panel of the destination, select the checkbox Stream (Using Apply Extensions).

Use an object schema other than SQDR for the Object Schema so that place-holder tables do not create clutter in the SQDR schema.

If desired, add any directives to be appended to the CREATE TABLE statement that SQDR will issue tor the actual destination.

 

 

Creating the Incremental Group

If it does not already exist, use Data Replicator Manager to create an incremental group and add subscriptions.

Here are some configuration hints:

Create the I/R group first, specifying the following on the Advanced panel::

  • Select the Parameters button and enter the following information, which instructs SQDR Plus (tier 2) how to apply change data:
    • Connection information for the Kafka server.
      In some cases, this may be a single line e.g.
      bootstrap.servers=172.19.36.175:9092
      or it might be several lines long, such as a connection to Azure Event Hubs for Kafka.

    • className The name of the class to be run on Tier 2 to apply change data
    • batch.size
    • linger.ms
    • Optional: The default value for the Kafka topic is the group name, with spaces replaced by underscore. You can override the default by specifying topic in the Parameters dialog:

    Example:

    bootstrap.servers=localhost:9092
    className=com.starquest.sqdr.apply.kafka.KafkaSparkProcessor
    topic=SPARK_TOPIC
    batch.size=524288
    linger.ms=200

  • If desired, select a value for Before Image (*Default, Combined, Separately). If you select *Default, the value defined in the Change Data Logging Defaults section of the Replication tab of the SQDR Service properties is used. See the Advanced Tab for Groups of Incremental Subscriptions topic of the SQDR Help file for more details about this parameter.

Create subscriptions by right-clicking Members under the group and choosing Insert Member (rather than creating subscriptions under the Source). When creating the subscriptions, specify the following on the Destination panel:

  • For I/R Apply Options, confirm that Only Stream Change Data is selected. This setting is inherited from the group default, and insures that no attempt is made to apply change data into the destination; only Kafka messages are produced. Note that this setting may be greyed-out after setting the following options.
  • To replicate change data only, specify Append replicated rows to existing data
    and Null synchronization - DDL Only for Baseline Replication Options, so that no baseline activity will occur. These settings insure that “running the baseline” does not result in an error; however, the subscription is successfully “registered” with the Capture Agent.
  • To perform baselines through Kafka, specify Use native-loader function.



Run the Subscription:

If Automatic Snapshot (on the advanced panel of the group) is enabled (default), the group is run automatically after creating the subscriptions, and the icons for both the group and subscription should turn solid green. Otherwise, enable capture & apply.by running the subscriptions or the group.

Configuring and Starting the Kafka Consumer

  1. Edit consumer.bat or consumer.sh and set the TOPIC variable. This value is used as the Kafka topic.
    e.g.
    set TOPIC=Sample_Group
  2. Delete any old copies of kafka.properties and run consumer.bat or consumer.sh once to create kafka.properties.
  3. Edit kafka.properties and modify this line, supplying the IP address or hostname and port of your Kafka server, including the backslash character:
    bootstrap.servers=172.19.36.175\:9092
    or
    bootstrap.servers=127.0.0.1\:9092 (local Kafka server)
  4. Optional: Add a line like the following. This value does not have to match the name of the IR group, as long as it is unique (e.g. if you are running multiple groups, and thus multiple producer/consumer pairs, using a unique ID)
    group.id=MY_IR_GROUP
  5. Run consumer.bat or consumer.sh again. This time, the application will continue to run and start to display data received from SQDR. The sample consumer parses the JSON data structure supplied by SQDR and sends the output to stdout. You can redirect the output to a file if desired e.g. C> consumer > output.txt.
  6. Enter control-C to terminate the application.

Using a local Kakfa Server for testing

To create a local Kafka server for testing:

Copy the Kafka software (C:\Program Files\StarQuest\kafka) to a directory whose name contains no spaces e.g. C:\bin\kafka.

Create and run from two command windows the following batch files

C:\bin\zoo.bat:
set KAFKA=C:\bin\kafka
rmdir /s /q C:\tmp\zookeeper
"%KAFKA%\bin\windows\zookeeper-server-start" "%KAFKA%\config\zookeeper.properties"

C:\bin\kafka.bat
set KAFKA=C:\bin\kafka
rmdir /s /q C:\tmp\kafka-logs
"%KAFKA%\bin\windows\kafka-server-start" "%KAFKA%\config\server.properties"

Once zookeeper and kafka are running, then configure SQDR and the consumer for 127.0.0.1:9092 and start your consumer in a third command window.

Troubleshooting

Some common errors/troubleshooting techniques:

  • Make sure that you are using the correct name for the Kafka library - Kafka uses libs rather than the more typical lib for the directory name.
  • Check for typos; it is not uncommon to accidentally enter KAKFA or SDQR.
  • Verify connectivity to the Kafka server, checking both hostname and port. You can use something like
    telnet <host> <port>
    or the PowerShell command
    Test-NetConnection -ComputerName <host> -Port <port>
  • If you plan to use TLS encrypted communication to the Kafka server (typically port 9094) rather than plain text (typically port 9092), you may need to work with certificate management and your connection string will be more complicated.
  • The kafka-console-producer (supplied with Kafka) can be used for testing connectivity/functionality, since it does not involve SQDR at all.
  • The following error may appear in db2diag.log or an error message when an invalid hostname or IP address is supplied in Parameters.
    No resolvable bootstrap urls given in bootstrap.servers

sqdrJdbcBaseline Application

The sqdrJdbcBaseline Application supplied with the SQDR v5 Kafka Producer is deprecated, as SQDR Streaming Support now also includes baseline support. However, it can be used as a test application to verify connectivity independent of SQDR; see See Using the SQDR v5 Kafka Producer for more information.

JSON Format

Below is an example of the JSON format produced for an Update operation. The Consumer needs to be able to parse and interpret this information.

{"group":"653e4f0809d95340ad7b0ebf12d57bfc",
"txid":"00000000000000000040",
"seq":1673563603769,
"operation":"U",
"beforekey":{"RRN":13},
"dest":"SQDR.TAB1",
"row":{"RRN":13,"FLD1":5}}

Here is an example of the JSON format for a simple Insert operation:

{"group":"4DEC433B09A62C4DA6646BF4EE1A3F30",
"txid":"00000000000000000109",
"seq":1566511556144,
"operation":"I",
"beforekey":null,
"dest":"\"SQDR\".\"TAB1A\"",
"row":{"FLD1": "10173", "FLD2": "new line", "FLD4": "100"}}

The group is always present and contains the GUID of the Group to which the Subscription belongs.

The txid is the SQDR Plus-generated hex-binary transaction id for the change record. Note that this is not the transaction id from the source system.

The seq is a BIGINT sequence number that represents the relative order in which the update occurs in the source log within a given transaction.  For a given source log, it is also the sequential order in which the operation was processed within the log. It can be used in conjunction with txid and Dest to detect Change Data Replay, if that functionality is needed by the Consumer. See Replay below for more information.

The operation (type) is always present. For change data, Its value is one of I/D/U for Insert/Delete/Update source-table operations. There are additional operations that the Consumer may need to process.

  • S indicates that the subscription needs to be rebaselined. This can be an indication of a REORG at the source or that there is a disruption of the SQDR Plus Capture Agent’s log reading position.
  • X indicates that the source table has been dropped. An A operation indicates that the source table has been altered.
  • B operation may be generated for subscriptions with “criteria” (i.e. a where clause). This additional change data row sent for each update contains the values of all the columns prior to the update. This option is not available for some source databases, such Db2 for i for with *AFTER journaling or SQL Server sources.
  • C indicates that all change records collected during a baseline copy have been processed. This record can refer to baselines for other groups or even other agents subscribing to the same table.
  • SCHEMA = describes the JSON that's coming
  • E = end of change data result set - time to merge those changes into the destination table
  • e = end of change data result set (archive subscription) - time to append those changes into the destination table

The following operation types are sent when performing a baseline:

  • SCHEMA = describes the JSON that's coming
  • K = start of baseline. This instructs the consumer to truncate or delete if desired. Note that SQDR (tier 3) will perform that operation if you are using the Simba/Spark ODBC driver for a Databricks destination.
  • k = start of baseline for an archive subscription (append rather than truncate/delete).
  • L indicates a Load operation (baseline). This is in place of the I/D/U operations that are used for change data.
  • E = end of baseline data- time to merge contents into destination table
    e = end of baseline data (archive subscription) - time to append data to destination table

The beforekey is available for U and D records. If the subscription is configured to use RRN, it contains values for the RRN column. Otherwise if will contain the column values of the source table’s primary key, or the aggregate columns of unique indexes when no primary key exists.

The afterimage always contains the Dest element which identifies the table the operation pertains to. The afterimage optionally contains a row element that contain column values for an Insert or Update.

NULL values in columns or before keys are indicated by the xsi:nil attribute. Binary (char for bit data) columns are encoded in hexBinary.

The JsonRowParser.java file contains a sample parser that illustrates how to interpret the JSON records sent by the SQDR Streaming Support. The parser is used by the supplied sample/test Kafka Consumer.

Note that the original SQDR v5 Kafka Producer included an option to send XML rather than JSON format; XML is not supported by SQDR v6 Streaming Support.

Replay

Pausing SQDR client operations, disruption in availability of the SQDR Plus Capture Agent or killing/stopping the SQDR service can result in replaying change records to the Kafka Consumer.

In order to detect replay, the Consumer needs to track txid and seq for each subscribed table. If txid is less than the txid of the last processed record for the table, the record can be ignored as a replay. If the txid matches the txid of the last processed record, the records can be ignored if the seq is less than or equal to the seq of the last processed record.

Azure Event Hubs and Azure Data Lake Store

Azure Event Hubs for Kafka can be used in place of a Kafka cluster, either as a message broker between the SQDR Producer and a consumer (as described above), or as a method of storing data into Azure Blob Storage or Azure Data Lake Store.

It can also provide a mechanism for sending change data to a Spark environment such as Azure Databricks. See Using SQDR Streaming Support with Azure Databricks.

You must be using the Standard or Premium pricing tier; Kafka support is not available at the Basic tier.

The connection information for Azure Event Hubs consists of several complex lines and is more complicated than a typical Kafka connection string. See the Microsoft documentation Get an Event Hubs connection string for information on obtaining the connection string from Settings/Shared access policies. Select the primary Connection String and use that information to create a connection information that looks something like this (you will need to update the bootstrap.servers and password fields - i.e. the fields in italics below):

bootstrap.servers=myeventhub.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ArOQYqH0SqmRYbeO6McNHuTKXXXdYawK050LQhXpWI0=";

Supply this information to the Kafka Producer by pasting it as-is into the Properties dialog on the Advanced panel of the I/R Group.

To supply this information to the sqdrJdbcBaseline application, do one of the following:

  • Place the multi-line contents into a file called kafka.properties and set "kafkaproperties":"KAFKAPROPERTIES"
  • Replace newlines with the pipe (|) character.

When configuring Azure Event Hubs, you may need to add the IP address of your SQDR server to the firewall exception list.in Settings/Networking. However, this appears to be unnecessary for this particular usage.

In addition to creating the Event Hub name space, you can create an Event Hub using a name that matches a Kafka Topic. If an Event Hub by that name does not exist, it will be automatically created when a Producer or a Consumer connects.

Select Features/Capture of the Event Hub if you wish to store the data in an Azure Storage Account (also used by Azure Data Lake Store Gen 2) or Azure Data Lake Store Gen 1.

Troubleshooting: In the Azure Portal, examine the list of Event Hubs in your Event Hub Namespace. A typo in the name of a Topic (or I/R group) defined in SQDR will result in the creation of a Event Hub with the mistyped name.

References:

Oracle Cloud Streaming Service

The Oracle Cloud Streaming Service can be used in place of a Kafka cluster, as a message broker between the SQDR Producer and a consumer.

First, use the Oracle Cloud console to verify that your Oracle Cloud account is enabled for streaming:

Under Governance and Administration/Governance/Limits, Quotas and Usage:

Select the Service Streaming from the dropdown and verify the values for:
Connect Harness Count - at least 1
Partition Count - number of streams (aka topics) that you plan to use

Configuration of Streaming can be found under Solutions & Platforms/Analytics/Streaming.

You may Ignore Kafka Connection Configuration - this is used for communications between an Oracle database and Kafka, and is not relevant to this application.

You do not need to create a stream (which is equivalent to a topic), as that will be created automatically when the producer or client connects with a topic name.

If the DefaultPool doesn't already exist, create a pool under Stream Pools. Select Settings of the Stream Pool and verify that the checkbox AUTO CREATE TOPICS is enabled. Alternatively, you can create the stream (with a name that matches the topic) in advance.

View Kafka Connection Settings and select Copy All Settings to obtain the connection string. However, this connection string will need to be modified with the appropriate user and AUTH_TOKEN.

Next, create a user, group, and policy:

  1. Go to Governance and Administration/Identity/Users
  2. Create a dedicated user for the streaming service: streaming-user
  3. On the user details page, generate an auth token. Copy this token, as you will not be able to retrieve it later.
  4. Under Governance and Administration/Identity/Groups: create a group StreamingUsers and add the streaming-user to the group
  5. Under Governance and Administration/Identity/Policies, create Policy StreamingUsersPolicy and add the following:

allow group StreamingUsers to manage streams in tenancy
allow group StreamingUsers to manage stream-pull in tenancy
allow group StreamingUsers to manage stream-push in tenancy

You can now use this user and its AUTH-TOKEN with the connection string that was obtained from the Pool. The username consists of 3 components - tenancy/userID/ocid. If the phrase oracleidentitycloudservice appears as a fourth component, remove it.

Copy the connection string into kafka.properties in the Consumer directory and start the consumer. Verify that the consumer starts without error; you should see the topic appear as a stream in the Oracle console.

To test connectivity from Oracle Streaming Service to the consumer, select the stream in the console and select Produce Test Message. If you enter a malformed string, it may result in an error at the Producer, but that is sufficient to show connectivity. If you are using the supplied Consumer sample, paste in the following string:

{"group":"d761072ab77ee34cb6fb4117bd1fa142","txid":null,"seq":1,"operation":"L","beforekey":null,"dest":"\"MYSCHEMA\".\"MYTABLE\"","row":{"ID":"0","DESCRIP":"Other"}}

For more information about configuring the Oracle Cloud Streaming Service, see the blog posting Migrate Your Kafka Workloads To Oracle Cloud Streaming.

Amazon Managed Streaming for Apache Kafka (MSK)

Amazon Managed Streaming for Apache Kafka (MSK) can be used in place of a Kafka cluster, either as a message broker between the SQDR Producer and a consumer (as described above), or as a method of storing data into a destination such as Amazon Redshift or Amazon S3 object store (using AWS Glue or using Kafka-Kinesis-Connector and Amazon Kinesis Data Firehose), or supplying data to an analysis tool such as Amazon Kinesis Data Analytics.

For more information see Using SQDR and Amazon Managed Streaming for Kafka.

Hints and FAQ

Question: The statistics for the subscription show that rows have been sent, but I do not see them in my destination.

Answer: The statistics represent the number of rows that have been sent to Kafka; SQDR does not typically have visibility into the activity on the final destination. Make sure that the Consumer is running and performing its task. If the Consumer is not running, the data is typically retained by the Kafka server or Azure Event Hub if it has been configured for retention.

Question: How can we identify in the output that is published to Kafka which attributes are used as a primary key or in unique indexes? We would use this metadata on the target system to build relationships between documents.

Answer: Within the Change Record, the "Before key" element provides the columns and value pairs associated with the primary key (or unique columns) which may be used to identify the row being reported (for "U" and "D" opcodes).


Symptom: The following error occurred in the Kafka server (running on Windows) when adding a topic::

java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method)

Solution: Topic names are case-sensitive, but (on Windows) file and directory names are not. When the Kafka server is running on a system with case-sensitive filenames, specifying topic names in different cases will result in the creation of separate topics. However, on Windows, specifying a topic name that differs only in case from an existing topic (e.g. TEST and test) will result in the above error.


DISCLAIMER

The information in technical documents comes without any warranty or applicability for a specific purpose. The author(s) or distributor(s) will not accept responsibility for any damage incurred directly or indirectly through use of the information contained in these documents. The instructions may need to be modified to be appropriate for the hardware and software that has been installed and configured within a particular organization.  The information in technical documents should be considered only as an example and may include information from various sources, including IBM, Microsoft, and other organizations.