Connectors → Apache Kafka V2

About Apache Kafka

Apache Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

About the Apache Kafka V2 connector

With the new version of the Apache Kafka connector, you can create an external data source for one or more Kafka topics. The Kafka topics must contain messages in a valid JavaScript Object Notation (JSON) format.

Connector Availability

The Apache Kafka V2 connector is available starting with the 2024.1.7 and 2024.7.0 releases.

The new connector provides multiple enhancements over the older connector version, including:

  • An enhanced and streamlined user experience
  • Supporting multiple topics per data source
  • Automatic discovery; no need to generate an Avro schema
  • Transformation of JSON messages (Flattening objects and arrays)
  • Enhanced performance on Cloud installations and enhanced throughput
  • Consuming messages in a batch mode during the load job
  • Native support for Data Agent
  • Multi-loader support (No need to define the Loader Service for the Kafka Consumer Service)

The Apache Kafka V2 connector supports the following Incorta-specific functionalities:

FeatureSupported
Chunking
Data Agent
Encryption at Ingest
Incremental Load
Multi-Source
OAuth
Performance Optimized
Remote
Single-Source
Spark Extraction
Webhook Callbacks
Important

The Apache Kafka V2 connector supports incremental loads for streaming messages based on the extraction time of the last successful load job. It is recommended to set message.timestamp.type in the topic configurations to LogAppendTime to prevent any missing records when loading data incrementally.

If the configuration is set to the default CreateTime value, ensure you configure the extraction.recovery.period.seconds property in the Extra Options. This setting introduces a delay period after extraction, allowing the connector to capture any messages created but not yet appended to Kafka.

Steps to connect Apache Kafka and Incorta

To connect Apache Kafka and Incorta, here are the high-level steps, tools, and procedures:

Create an external data source

Here are the steps to create an external data source with the Apache Kafka V2 connector:

  • Sign in to Incorta.
  • In the Navigation bar, select Data.
  • In the Action bar, select + NewAdd Data Source.
  • In the Choose a Data Source dialog, in Streaming Data, select Kafka V2.
  • In the New Data Source dialog, specify the applicable connector properties.
  • To test, select Test Connection.
  • Select Ok to save your changes.

Kafka V2 connector properties

Here are the properties of the Kafka V2 connector:

PropertyControlDescription
Data Source Nametext boxEnter the name of the data source
Topics Listtext boxEnter a comma-separated list of topics from which the data source reads. You can also use the asterisk wildcard character (*) to include topics with a specific name pattern.
Extract Metadata of Kafka MessagestoggleEnable this option to extract and include metadata from Kafka messages, which provides additional context and information about each message. Metadata includes message topic, partition, offset, timestamp, timestamp type, headers, and key.
Consumer Propertiestext boxA line-separated list of connection properties in the format: propertyName=propertyValue. You have to provide at least the IP addresses and ports of the bootstrap servers in the following format: bootstrap.servers=host1:port1,host2:port2,...
For a complete list of properties, refer to the Kafka online documentation > Configuration > Consumer Configs.

Important: You must not modify the following consumer properties:
  ●  client.id
  ●  key.deserializer
  ●  value.deserializer
  ●  enable.auto.commit
  ●  auto.offset.reset
  ●  max.poll.records
Changing these properties in the Consumer Properties will cause the connector to fail.
Authentication Typedropdown listSelect the authentication type. Additional options might be required when selecting the authentication type.
Available options are:
  ●  Use Consumer Properties (if you have provided the required configurations in the consumer properties)
  ●  SSL
  ●  PLAIN SASL
  ●  PLAIN SASL_SSL
SSL > Enabled Protocoldropdown listThe protocol to be used for the TLS communication.
Available options are
  ●  TLS v1
  ●  TLS v1.1
  ●  TLS v1.2
SSL > Endpoint Identification Algorithmtext boxSpecify the algorithm for the hostname validation. An empty string disables hostname verification, which is the default.
Enter https to enable hostname verification.
SSL > Trust Store FilebuttonTo upload a trust store file, select Choose File. In the Finder or File Explorer, select your trust store file, such as kafka.client.truststore.jks
SSL > Trust Store Passwordtext boxSpecify the password of the trust store file.
Without a password, the trust store file remains available, but without integrity checks.
SSL > Key Store FilebuttonTo upload a key store file, select Choose File. In the Finder or File Explorer, select your key store file, such as Kafka.client.keystore.jks.
SSL > Key Store Passwordtext boxSpecify the password key store file
SSL > Key Passwordtext boxSpecify the password of the private key in the key store file
PLAIN SASL/PLAIN SASL_SSL > Usernametext boxEnter the username for authenticating the connection with Kafka
PLAIN SASL/PLAIN SASL_SSL > Passwordtext boxThe password for authenticating the connection with Kafka
Extra Optionstext boxEnter a line-separated list of additional options for the connector configurations in the form of key-value pairs.
Use Data AgenttoggleEnable using a data agent to securely ingest data that resides behind a firewall.
For more information, please review Tools → Data Agent and Tools → Data Manager.
Data Agentdropdown listEnable Use Data Agent to configure this property.
Select from the data agents created in the tenant, if any.

Supported extra options

The following table lists the extra options supported in the Kafka V2 connector.

Option KeyDescription
extract.messages.afterCapture messages created or appended before a specific timestamp during a full load.
Enter the timestamp in the following format: yyyy-MM-dd HH:mm:ss.SSS zzz.
Example:
extract.messages.after=2024-12-23 15:30:00.000 +0200
discovery.sample.sizeThe number of messages the connector uses during schema discovery to automatically determine the column names and data types
Default: 50 messages.
Example: discovery.sample.size=100
extraction.fetch.sizeThe number of messages to extract from Kafka in each batch. Adjust this value based on your data volume and processing requirements.
Default: 500
Example: extraction.fetch.size=1000
extraction.consumers.maxThe maximum number of concurrent consumers that can extract data from the source.
Default: 50
Example: extraction.consumers.max=60
extraction.failures.allowed.rateThe maximum percentage of rejected rows allowed during data extraction. If the percentage of rejected rows exceeds this threshold, the extraction process will fail.
Default: 100 (The extraction will not fail even if all rows are rejected.)
Example: To fail the extraction if rejected rows exceed 5% of the rows, set the property as follows: Extraction.failures.allowed.rate=5
extraction.timezoneSpecify the Kafka timezone ID if Incorta's timezone differs from Kafka’s timezone
Example: extraction.timezone=PST
extraction.recovery.period.secondsWhen the topic message.timestamp.type is set to CreateTime, the load job must capture all messages created before the extraction start time. To ensure this, the extraction.recovery.period.seconds property defines the delay period after the extraction starts to capture any messages created but not yet appended to Kafka.

The default value for this property is 20 seconds. If the expected delay between message creation and its arrival in Kafka is typically less than 10 seconds, set extraction.recovery.period.seconds=10 to eliminate potential data loss. It is recommended to change the topic configs message.timestamp.type from CreateTime to LogAppendTime.

For more details, please check Kafka docs.

Create a physical schema with the Schema Wizard

Here are the steps to create a physical schema with the Schema Wizard:

  • Sign in to Incorta.
  • In the navigation bar, select Schema.
  • In the action bar, select + NewSchema Wizard.
  • In (1) Choose a Source, specify the following:
    • For Enter a name, enter the physical schema name.
    • For Select a Datasource, select the Kafka external data source.
    • Optionally add a description.
  • In the Schema Wizard footer, select Next.
  • In (2) Manage Tables, in the Data Panel, select the topics you want to use to create tables. Select the Select All checkbox to select all the topics.
  • In the Schema Wizard footer, select Next.
  • In (3) Finalize, in the Schema Wizard footer, select Create Schema.

The Schema Wizard uses the default transformation query to discover Kafka messages and create related tables. You can edit the table’s dataset in the Table Editor and use a custom query.

Create a physical schema with the Schema Designer

Here are the steps to create a Kafka physical schema using the Schema Designer:

  • Sign in to Incorta.
  • In the Navigation bar, select Schema.
  • In the action bar, select + NewCreate Schema.
  • In Name, specify the physical schema name, and select Save.
  • In the Schema Designer, in the action bar, select + NewTable.
  • In the Table Data Source dialog, specify the following:
    • For Type, select Streaming.
    • For Data Source, select the Kafka external data source.
    • Specify the Kafka table dataset properties.
  • Select Add.
  • In the Table Editor, in the Table Summary section, enter the table name.
  • To save your changes, select Done in the action bar, and then in the Schema Designer, in the action bar, select Save Changes.

Kafka table dataset properties

For a physical schema table, you can define the following Kafka-specific dataset properties as follows:

PropertyControlDescription
Topic Namedropdown listSelect the topic you want to use from the list specified in the external data source.
Transformation Querytext boxSelect the text box or the Edit icon to launch the Edit Query dialog. Enter the transformation query of the message in JSON format.
IncrementaltoggleTurn on this toggle to enable the incremental loading of the table
Chunking Strategydropdown listSelect the chunking strategy:
One Consumer Per Partition
Utilize all Consumers
No Chunking
Extract Metadata of Kafka MessagesEnable this option to extract and include metadata from the Kafka messages. Metadata includes message topic, partition, offset, timestamp, timestamp type, headers, and key, which provides additional context and information about each message.
CallbacktoggleTurn this toggle on to enable post-extraction callback
Callback URLtext boxThis property appears when the Callback toggle is enabled. Specify the callback URL.

View the physical schema diagram with the Schema Diagram Viewer

Here are the steps to view the physical schema diagram using the Schema Diagram Viewer:

  • Sign in to Incorta.
  • In the navigation bar, select Schema.
  • In the list of physical schemas, select the Kafka physical schema.
  • In the Schema Designer, in the summary section, select Diagram.

Load the physical schema

Here are the steps to perform a full load of the Kafka physical schema using the Schema Designer:

  • Sign in to Incorta.
  • In the navigation bar, select Schema.
  • In the list of physical schemas, select the Kafka physical schema.
  • In the Schema Designer, in the action bar, select LoadFull Load.
  • To review the load status, in Last Load Status, select the date.

Here are the steps to perform an Incremental Load of the Kafka physical schema with the Schema Designer:

  • Sign in to Incorta.
  • In the navigation bar, select Schema.
  • In the list of physical schemas, select the Kafka physical schema.
  • In the Schema Designer, in the action bar, select LoadIncremental Load.
  • To review the load status, in Last Load Status, select the date.

Explore the physical schema

With the full or incremental load of the Kafka physical schema complete, you can use the Analyzer to explore the physical schema, create your first insight, and save the insight to a new dashboard tab or an existing dashboard tab.

To open the Analyzer from the physical schema, follow these steps:

  • In the navigation bar, select Schema.
  • In the Schema Manager, in the physical schemas list, select the Kafka physical schema.
  • In the Schema Designer, in the action bar, select Explore.

Additional Considerations

Data type discovery

The Kafka V2 connector may discover some data types as String during the JQ transformation. If the data types of your columns are not accurately discovered, you can adjust them manually by editing the dataset and selecting the correct data type in the Table Editor > Column Properties.

Transforming JSON messages

The Kafka V2 connector requires the JSON message to be valid JSON. By default, the connector discovers all top-level properties, objects, and arrays in the JSON messages as separate columns. However, nested (child) properties in arrays and objects are transformed into a single string. The Kafka V2 connector allows flattening the JSON message using JQ.

The following example shows the default transformation query of a Kafka message.

The Kafka message:

{
"store_id": 1,
"store_order_id": 1000,
"status": "accepted",
"branch": {
"branch_id": 5,
"state": "NC"
},
"order_lines": [
{"product_id": 10,
"category": "books",
"quantity": 1,
"unit_price": 15,
"net_price": 30},
{"product_id": 20,
"category": "notebooks",
"quantity": 5,
"unit_price": 12,
"net_price": 25}
]}

The default transformation query:

{
store_id: .store_id,
store_order_id: .store_order_id,
status: .status,
branch: .branch,
order_lines: .order_lines
}

The output per the default transformation query:

{
"store_id": 1,
"store_order_id": 1000,
"status": "accepted",
"branch": "{"branch_id": 5,"state": "NC"}",
"order_lines": "[{"product_id": 10,"category": "books","quantity": 1,"unit_price": 15,"net_price": 30},{"product_id": 20,"category": "notebooks","quantity": 5,"unit_price": 12,"net_price": 25}]"
}

A customized transformation query:
You can edit the default transformation query and flatten objects and arrays to extract nested properties as separate columns or rows as required. You can use a JQ editor to flatten JSON objects and arrays.

Note

For details about supported JQ features, refer to Jackson JSON Processor.

You can use the JQ Play to test queries and outputs.

Example 1:
The following is an example of a transformation query that flattens nested properties in an object to columns.

{
store_id: .store_id,
store_order_id: .store_order_id,
status: .status,
branch_id: .branch.branch_id,
state: .branch.state,
}

The output per the customized transformation query in Example 1:

{
"store_id": 1,
"store_order_id": 1000,
"status": "accepted",
"branch_id": 5,
"state": "NC"
}

Example 2:
The following is an example of a transformation query that flattens nested properties in an array to rows.

{
store_id: .store_id,
store_order_id: .store_order_id,
status: .status,
order_lines: .order_lines[]
} |
{
store_id: .store_id,
store_order_id: .store_order_id,
status: .status,
product_id: .order_lines.product_id,
category: .order_lines.category,
quantity: .order_lines.quantity,
unit_price: .order_lines.unit_price,
net_price: .order_lines.net_price
}

The output per the customized transformation query in Example 2:

{
"store_id": 1,
"store_order_id": 1000,
"status": "accepted",
"product_id": 10,
"category": "books",
"quantity": 1,
"unit_price": 15,
"net_price": 30
}
{
"store_id": 1,
"store_order_id": 1000,
"status": "accepted",
"product_id": 20,
"category": "notebooks",
"quantity": 5,
"unit_price": 12,
"net_price": 25
}

Example 3:
The following is an example of a transformation query that flattens nested properties in an object to columns and an array to rows.

.order_lines[] as $orderLine |
{
store_id: .store_id,
store_order_id: .store_order_id,
status: .status,
branch_id: .branch.branch_id,
state: .branch.state,
product_id: $orderLine.product_id,
category: $orderLine.category,
quantity: $orderLine.quantity,
unit_price: $orderLine.unit_price,
net_price: $orderLine.net_price
}

The output per the customized transformation query in Example 3:

{
"store_id": 1,
"store_order_id": 1000,
"status": "accepted",
"branch_id": 5,
"state": "NC",
"product_id": 10,
"category": "books",
"quantity": 1,
"unit_price": 15,
"net_price": 30
}
{
"store_id": 1,
"store_order_id": 1000,
"status": "accepted",
"branch_id": 5,
"state": "NC",
"product_id": 20,
"category": "notebooks",
"quantity": 5,
"unit_price": 12,
"net_price": 25
}