Connectors → Apache Kafka V2
About Apache Kafka
Apache Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
About the Apache Kafka V2 connector
With the new version of the Apache Kafka connector, you can create an external data source for one or more Kafka topics. The Kafka topics must contain messages in a valid JavaScript Object Notation (JSON) format.
The Apache Kafka V2 connector is available starting with the 2024.1.7 and 2024.7.0 releases.
The new connector provides multiple enhancements over the older connector version, including:
- An enhanced and streamlined user experience
- Supporting multiple topics per data source
- Automatic discovery; no need to generate an Avro schema
- Transformation of JSON messages (Flattening objects and arrays)
- Enhanced performance on Cloud installations and enhanced throughput
- Consuming messages in a batch mode during the load job
- Native support for Data Agent
- Multi-loader support (No need to define the Loader Service for the Kafka Consumer Service)
The Apache Kafka V2 connector supports the following Incorta-specific functionalities:
Feature | Supported |
---|---|
Chunking | ✔ |
Data Agent | ✔ |
Encryption at Ingest | ✔ |
Incremental Load | ✔ |
Multi-Source | ✔ |
OAuth | |
Performance Optimized | ✔ |
Remote | |
Single-Source | ✔ |
Spark Extraction | |
Webhook Callbacks | ✔ |
The Apache Kafka V2 connector supports incremental loads for streaming messages based on the extraction time of the last successful load job. It is recommended to set message.timestamp.type
in the topic configurations to LogAppendTime
to prevent any missing records when loading data incrementally.
If the configuration is set to the default CreateTime
value, ensure you configure the extraction.recovery.period.seconds
property in the Extra Options. This setting introduces a delay period after extraction, allowing the connector to capture any messages created but not yet appended to Kafka.
Steps to connect Apache Kafka and Incorta
To connect Apache Kafka and Incorta, here are the high-level steps, tools, and procedures:
- Create an external data source
- Create a physical schema with the Schema Wizard
- or, Create a physical schema with the Schema Designer
- Load the physical schema
- Explore the physical schema
Create an external data source
Here are the steps to create an external data source with the Apache Kafka V2 connector:
- Sign in to Incorta.
- In the Navigation bar, select Data.
- In the Action bar, select + New → Add Data Source.
- In the Choose a Data Source dialog, in Streaming Data, select Kafka V2.
- In the New Data Source dialog, specify the applicable connector properties.
- To test, select Test Connection.
- Select Ok to save your changes.
Kafka V2 connector properties
Here are the properties of the Kafka V2 connector:
Property | Control | Description |
---|---|---|
Data Source Name | text box | Enter the name of the data source |
Topics List | text box | Enter a comma-separated list of topics from which the data source reads. You can also use the asterisk wildcard character (*) to include topics with a specific name pattern. |
Extract Metadata of Kafka Messages | toggle | Enable this option to extract and include metadata from Kafka messages, which provides additional context and information about each message. Metadata includes message topic, partition, offset, timestamp, timestamp type, headers, and key. |
Consumer Properties | text box | A line-separated list of connection properties in the format: propertyName=propertyValue . You have to provide at least the IP addresses and ports of the bootstrap servers in the following format: bootstrap.servers=host1:port1,host2:port2,... For a complete list of properties, refer to the Kafka online documentation > Configuration > Consumer Configs. Important: You must not modify the following consumer properties: ● client.id ● key.deserializer ● value.deserializer ● enable.auto.commit ● auto.offset.reset ● max.poll.records Changing these properties in the Consumer Properties will cause the connector to fail. |
Authentication Type | dropdown list | Select the authentication type. Additional options might be required when selecting the authentication type. Available options are: ● Use Consumer Properties (if you have provided the required configurations in the consumer properties) ● SSL ● PLAIN SASL ● PLAIN SASL_SSL |
SSL > Enabled Protocol | dropdown list | The protocol to be used for the TLS communication. Available options are ● TLS v1 ● TLS v1.1 ● TLS v1.2 |
SSL > Endpoint Identification Algorithm | text box | Specify the algorithm for the hostname validation. An empty string disables hostname verification, which is the default. Enter https to enable hostname verification. |
SSL > Trust Store File | button | To upload a trust store file, select Choose File. In the Finder or File Explorer, select your trust store file, such as kafka.client.truststore.jks |
SSL > Trust Store Password | text box | Specify the password of the trust store file. Without a password, the trust store file remains available, but without integrity checks. |
SSL > Key Store File | button | To upload a key store file, select Choose File. In the Finder or File Explorer, select your key store file, such as Kafka.client.keystore.jks . |
SSL > Key Store Password | text box | Specify the password key store file |
SSL > Key Password | text box | Specify the password of the private key in the key store file |
PLAIN SASL/PLAIN SASL_SSL > Username | text box | Enter the username for authenticating the connection with Kafka |
PLAIN SASL/PLAIN SASL_SSL > Password | text box | The password for authenticating the connection with Kafka |
Extra Options | text box | Enter a line-separated list of additional options for the connector configurations in the form of key-value pairs. |
Use Data Agent | toggle | Enable using a data agent to securely ingest data that resides behind a firewall. For more information, please review Tools → Data Agent and Tools → Data Manager. |
Data Agent | dropdown list | Enable Use Data Agent to configure this property. Select from the data agents created in the tenant, if any. |
Supported extra options
The following table lists the extra options supported in the Kafka V2 connector.
Option Key | Description |
---|---|
extract.messages.after | Capture messages created or appended before a specific timestamp during a full load. Enter the timestamp in the following format: yyyy-MM-dd HH:mm:ss.SSS zzz . Example: extract.messages.after=2024-12-23 15:30:00.000 +0200 |
discovery.sample.size | The number of messages the connector uses during schema discovery to automatically determine the column names and data types Default: 50 messages. Example: discovery.sample.size=100 |
extraction.fetch.size | The number of messages to extract from Kafka in each batch. Adjust this value based on your data volume and processing requirements. Default: 500 Example: extraction.fetch.size=1000 |
extraction.consumers.max | The maximum number of concurrent consumers that can extract data from the source. Default: 50 Example: extraction.consumers.max=60 |
extraction.failures.allowed.rate | The maximum percentage of rejected rows allowed during data extraction. If the percentage of rejected rows exceeds this threshold, the extraction process will fail. Default: 100 (The extraction will not fail even if all rows are rejected.) Example: To fail the extraction if rejected rows exceed 5% of the rows, set the property as follows: Extraction.failures.allowed.rate=5 |
extraction.timezone | Specify the Kafka timezone ID if Incorta's timezone differs from Kafka’s timezone Example: extraction.timezone=PST |
extraction.recovery.period.seconds | When the topic message.timestamp.type is set to CreateTime , the load job must capture all messages created before the extraction start time. To ensure this, the extraction.recovery.period.seconds property defines the delay period after the extraction starts to capture any messages created but not yet appended to Kafka. The default value for this property is 20 seconds. If the expected delay between message creation and its arrival in Kafka is typically less than 10 seconds, set extraction.recovery.period.seconds=10 to eliminate potential data loss. It is recommended to change the topic configs message.timestamp.type from CreateTime to LogAppendTime . For more details, please check Kafka docs. |
Create a physical schema with the Schema Wizard
Here are the steps to create a physical schema with the Schema Wizard:
- Sign in to Incorta.
- In the navigation bar, select Schema.
- In the action bar, select + New → Schema Wizard.
- In (1) Choose a Source, specify the following:
- For Enter a name, enter the physical schema name.
- For Select a Datasource, select the Kafka external data source.
- Optionally add a description.
- In the Schema Wizard footer, select Next.
- In (2) Manage Tables, in the Data Panel, select the topics you want to use to create tables. Select the Select All checkbox to select all the topics.
- In the Schema Wizard footer, select Next.
- In (3) Finalize, in the Schema Wizard footer, select Create Schema.
The Schema Wizard uses the default transformation query to discover Kafka messages and create related tables. You can edit the table’s dataset in the Table Editor and use a custom query.
Create a physical schema with the Schema Designer
Here are the steps to create a Kafka physical schema using the Schema Designer:
- Sign in to Incorta.
- In the Navigation bar, select Schema.
- In the action bar, select + New → Create Schema.
- In Name, specify the physical schema name, and select Save.
- In the Schema Designer, in the action bar, select + New → Table.
- In the Table Data Source dialog, specify the following:
- For Type, select Streaming.
- For Data Source, select the Kafka external data source.
- Specify the Kafka table dataset properties.
- Select Add.
- In the Table Editor, in the Table Summary section, enter the table name.
- To save your changes, select Done in the action bar, and then in the Schema Designer, in the action bar, select Save Changes.
Kafka table dataset properties
For a physical schema table, you can define the following Kafka-specific dataset properties as follows:
Property | Control | Description |
---|---|---|
Topic Name | dropdown list | Select the topic you want to use from the list specified in the external data source. |
Transformation Query | text box | Select the text box or the Edit icon to launch the Edit Query dialog. Enter the transformation query of the message in JSON format. |
Incremental | toggle | Turn on this toggle to enable the incremental loading of the table |
Chunking Strategy | dropdown list | Select the chunking strategy: One Consumer Per Partition Utilize all Consumers No Chunking |
Extract Metadata of Kafka Messages | Enable this option to extract and include metadata from the Kafka messages. Metadata includes message topic, partition, offset, timestamp, timestamp type, headers, and key, which provides additional context and information about each message. | |
Callback | toggle | Turn this toggle on to enable post-extraction callback |
Callback URL | text box | This property appears when the Callback toggle is enabled. Specify the callback URL. |
View the physical schema diagram with the Schema Diagram Viewer
Here are the steps to view the physical schema diagram using the Schema Diagram Viewer:
- Sign in to Incorta.
- In the navigation bar, select Schema.
- In the list of physical schemas, select the Kafka physical schema.
- In the Schema Designer, in the summary section, select Diagram.
Load the physical schema
Here are the steps to perform a full load of the Kafka physical schema using the Schema Designer:
- Sign in to Incorta.
- In the navigation bar, select Schema.
- In the list of physical schemas, select the Kafka physical schema.
- In the Schema Designer, in the action bar, select Load → Full Load.
- To review the load status, in Last Load Status, select the date.
Here are the steps to perform an Incremental Load of the Kafka physical schema with the Schema Designer:
- Sign in to Incorta.
- In the navigation bar, select Schema.
- In the list of physical schemas, select the Kafka physical schema.
- In the Schema Designer, in the action bar, select Load → Incremental Load.
- To review the load status, in Last Load Status, select the date.
Explore the physical schema
With the full or incremental load of the Kafka physical schema complete, you can use the Analyzer to explore the physical schema, create your first insight, and save the insight to a new dashboard tab or an existing dashboard tab.
To open the Analyzer from the physical schema, follow these steps:
- In the navigation bar, select Schema.
- In the Schema Manager, in the physical schemas list, select the Kafka physical schema.
- In the Schema Designer, in the action bar, select Explore.
Additional Considerations
Data type discovery
The Kafka V2 connector may discover some data types as String during the JQ transformation. If the data types of your columns are not accurately discovered, you can adjust them manually by editing the dataset and selecting the correct data type in the Table Editor > Column Properties.
Transforming JSON messages
The Kafka V2 connector requires the JSON message to be valid JSON. By default, the connector discovers all top-level properties, objects, and arrays in the JSON messages as separate columns. However, nested (child) properties in arrays and objects are transformed into a single string. The Kafka V2 connector allows flattening the JSON message using JQ.
The following example shows the default transformation query of a Kafka message.
The Kafka message:
{"store_id": 1,"store_order_id": 1000,"status": "accepted","branch": {"branch_id": 5,"state": "NC"},"order_lines": [{"product_id": 10,"category": "books","quantity": 1,"unit_price": 15,"net_price": 30},{"product_id": 20,"category": "notebooks","quantity": 5,"unit_price": 12,"net_price": 25}]}
The default transformation query:
{store_id: .store_id,store_order_id: .store_order_id,status: .status,branch: .branch,order_lines: .order_lines}
The output per the default transformation query:
{"store_id": 1,"store_order_id": 1000,"status": "accepted","branch": "{"branch_id": 5,"state": "NC"}","order_lines": "[{"product_id": 10,"category": "books","quantity": 1,"unit_price": 15,"net_price": 30},{"product_id": 20,"category": "notebooks","quantity": 5,"unit_price": 12,"net_price": 25}]"}
A customized transformation query:
You can edit the default transformation query and flatten objects and arrays to extract nested properties as separate columns or rows as required. You can use a JQ editor to flatten JSON objects and arrays.
For details about supported JQ features, refer to Jackson JSON Processor.
You can use the JQ Play to test queries and outputs.
Example 1:
The following is an example of a transformation query that flattens nested properties in an object to columns.
{store_id: .store_id,store_order_id: .store_order_id,status: .status,branch_id: .branch.branch_id,state: .branch.state,}
The output per the customized transformation query in Example 1:
{"store_id": 1,"store_order_id": 1000,"status": "accepted","branch_id": 5,"state": "NC"}
Example 2:
The following is an example of a transformation query that flattens nested properties in an array to rows.
{store_id: .store_id,store_order_id: .store_order_id,status: .status,order_lines: .order_lines[]} |{store_id: .store_id,store_order_id: .store_order_id,status: .status,product_id: .order_lines.product_id,category: .order_lines.category,quantity: .order_lines.quantity,unit_price: .order_lines.unit_price,net_price: .order_lines.net_price}
The output per the customized transformation query in Example 2:
{"store_id": 1,"store_order_id": 1000,"status": "accepted","product_id": 10,"category": "books","quantity": 1,"unit_price": 15,"net_price": 30}{"store_id": 1,"store_order_id": 1000,"status": "accepted","product_id": 20,"category": "notebooks","quantity": 5,"unit_price": 12,"net_price": 25}
Example 3:
The following is an example of a transformation query that flattens nested properties in an object to columns and an array to rows.
.order_lines[] as $orderLine |{store_id: .store_id,store_order_id: .store_order_id,status: .status,branch_id: .branch.branch_id,state: .branch.state,product_id: $orderLine.product_id,category: $orderLine.category,quantity: $orderLine.quantity,unit_price: $orderLine.unit_price,net_price: $orderLine.net_price}
The output per the customized transformation query in Example 3:
{"store_id": 1,"store_order_id": 1000,"status": "accepted","branch_id": 5,"state": "NC","product_id": 10,"category": "books","quantity": 1,"unit_price": 15,"net_price": 30}{"store_id": 1,"store_order_id": 1000,"status": "accepted","branch_id": 5,"state": "NC","product_id": 20,"category": "notebooks","quantity": 5,"unit_price": 12,"net_price": 25}