Connectors → Apache Kafka

About the Apache Kafka connector

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. To learn more, please review Concepts → Apache Kafka.

With the Kafka connector, you can create an external data source for a Kafka topic available on a list of one or more Kafka brokers. The Kafka topic must contain messages in valid JavaScript Object Notation (JSON) format.

In order to ingest JSON using a defined schema, the Kafka connector requires an Apache Avro file. An Avro file is a map for how to serialize and flatten semi-structured, schema-less JSON into structured data for related physical schema tables. You can use the Avro Extractor Tool to generate an .avro file for the JSON messages related to a specific Kafka topic.

When enabled, the Kafka connector creates a Kafka Consumer in a Consumer Group that immediately begins ingesting an initial stream of messages from the Kafka topic from the available list of Kafka Brokers.

Important

In this release, the Kafka Consumer in Incorta uses a Java client for Apache Kafka 2.1.

The Kafka connector applies the Avro mapping file, transforms the schema-less, unordered JSON into structured data, encrypts the structured data, and stores the encrypted data as Comma Separated Value (.csv) files in Shared Storage for the given tenant.

Because Kafka is a platform for streaming data, the Kafka connector inherently loads data incrementally. The Kafka connector supports specifying a rewind offset.

The Kafka connector supports the following Incorta specific functionality:

Feature Supported
Chunking
Data Agent
Encryption at Ingest
Incremental Load
Multi-Source
OAuth
Performance Optimized
Remote
Single-Source
Spark Extraction
Webhook Callbacks

Deployment of the Kafka connector may require the following:

About the Kafka topic JSON message

As semi-structured data, JSON is schema-less and unordered. For this reason, the content of the JSON message for a given Kafka topic may vary by design. One message may contain only one JSON object in a nested array whereas another message may contain numerous nested objects within a nested array. In addition, some key-value pairs may be present, absent, and in differing order between messages.

The Kafka connector requires the JSON message to be valid JSON and in one line. In addition, the JSON message itself must contain a key labeled data that contains the actual data you want to store in Incorta.

Although optional, it is strongly recommended to include an additional key-pair that names the root entity in the physical schema table. For example, it is recommended that the JSON message contain a key arbitrarily labeled entity that contains the value of the name of root entity in the physical schema table

Here is an valid example:

{ "entity": "my_table",
  "data": { "name1": "value1",
            "name2": { "name3": "value4",
                       "name5": "value5"
                     }
          }
}
Important

Because of the key-value requirement for a ‘data’ key and for each message to be one line, you may need to make changes to the applications that produce and write JSON messages to a given Kafka topic.

About Apache Avro and the Avro Extractor Tool

Apache Avro is a JSON file itself that defines how to serialize JSON as a schema for a given JSON object. The Avro Extractor Tool allows you to generate an Avro file for the JSON messages related to a specific Kafka topic.

To learn more about Apache Avro, please review Concepts → Apache Avro.

To learn more about the Avro Extractor Tool, please review Tools → Avro Extractor Tool.

Deployment Steps

The Kafka connector ships with Incorta. In certain cases, you may need to make additional configurations to deploy the connector successfully as follows:

Specify a Kafka Consumer Service Name

For an Incorta Cluster with two or Incorta Nodes that each run a Loader Service, you must specify a Kafka Consumer Service Name in the Cluster Management Console.

A Cluster Management Console (CMC) Administrator for your Incorta Cluster must configure the Kafka Consumer Service Name. Changes to this property require that the CMC Administrator restart each Loader Service.

If your Incorta Cluster contains more than two Incorta Nodes each with a Loader Service, then you must specify the Incorta Node and Loader Service to use. If you do not assign a loader service to consume the Kafka messages, Incorta assigns a Loader Service randomly. This can result in unexpected behavior and row duplication.

Here are the steps to specify the required properties for the Server Configurations:

  • As the CMC Administrator, sign in to the CMC.
  • In the Navigation bar, select Clusters.
  • In the cluster list, select a Cluster name.
  • In the canvas tabs, select Cluster Configurations.
  • In the panel tabs, select Server Configurations.
  • In the left pane, select Clustering.
  • In the right pane, tor Kafka Consumer Service Name, enter the <NODE_NAME>.<SERVICE_NAME>.
  • Select Save.
Important

When changing the Kafka loader service consumer from one loader service (A) to another loader service (B), you must restart the current loader service (A) first, then restart the other loader service (B).

A CMC Administrator must restart the Loader Services in the Incorta cluster.


Steps to connect Apache Kafka and Incorta

To connect Apache Kafka and Incorta, here are the high level steps, tools, and procedures:

Create an Apache Avro file

You must create an Avro file for an external data source that uses the Apache Kafka connector. To learn how to create an Avro file with the Avro Extractor Tool, please review Tools → Avro Extractor Tool.

Create an external data source

Here are the steps to create a external data source with the Kafka connector:

  • Sign in to the Incorta Direct Data Platform™.
  • In the Navigation bar, select Data.
  • In the Action bar, select + NewAdd Data Source.
  • In the Choose a Data Source dialog, in Streaming, select Kafka.
  • In the New Data Source dialog, specify the applicable connector properties.
  • To test, select Test Connection.
  • Select Ok to save your changes.

Kafka connector properties

Note: Security Protocol

As of Release 5.1, you can now specify the Security Protocol as a property of the external data source in the New Data Source dialog. This means that you no longer need to define this protocol in the kafka-consumer.properties file or in a custom properties file.

Here are the properties for the Kafka connector:

Property Control Description
Data Source Name text box Enter the name of the data source
Topic text box Enter the name of the Kafka topic
Broker List text box Enter a comma-separated list of Kafka Brokers as follows:
<IPv4\_ADDRESS>:<PORT>
The default port for a Kafka Broker is 9092.
Message Type Field text box Optional.
If you created an .avro file using the same input parameter for the avroExtractor.jar command, then specify the same value for this property.
Trim messageType after dash toggle Optional.
If you created an .avro file using the same input parameter for the avroExtractor.jar command, then enable this property.
Kafka Version drop down list Select a supported Apache Kafka version. Blank defaults to the most current Kafka version.
Enable Kafka Consumer toggle When enabled, the Loader Service defined as the Kafka Consumer will start ingesting messages and write encrypted .csv files to to the KAFKA directory in the Tenant’s shared directory.
Date Expiration input number This property is not functional.
Offset rewind to text box Optional.
Using the yyyy-mm-dd hh:mm:ss.SSS zzz timestamp format, specify the time to rewind the consumer to. This may cause the consumer to retrieve old messages that might have been previously retrieved.
Mapping File upload Select Choose File and the select a local .avro file to upload to Incorta.
Security Protocol drop down list Select from the available options:
  • SASL PLAINTEXT
  • SSL
  • (none)
    SSL (Secure Socket Layer) uses private-key certificate pairs. SASL (Simple Authentication Security Layer) is a framework that provides developers of applications and shared libraries with mechanisms for authentication, data integrity-checking, and encryption.
  • SASL →
    SASL Mechanism For Client Connection
    drop down list Available with SASL PLAINTEXT. Select PLAIN or GSSAPI. SASL and PLAIN uses a simple username and password for authentication. SASL and GSSAPI uses Kerberos with Active Directory. Kerberos requires that all hosts resolve with a fully-qualified domain names (FQDNs).
    SASL → JAAS Config text box Specify the JASS (Java Authentication and Authorization Service) configuration, for example:
    org.apache.kafka.common.security.plain.PlainLoginModule required
    SASL → JAAS Username text box Specify the JASS username for the configuration
    SASL → JAAS Password text box Specify the JASS password for the configuration
    SSL → SSL Protocols drop down list Select from the available options:
  • TLS v1
  • TLS v1.1
  • TLS v1.2
  • SSL → Endpoint Identification Algorithm textbox Specify the algorithm for host name validation. An empty string disables host name verification, which is the default. Specify https to enable host name verification.
    SSL → Keystore File button To upload a Keystore file, select “Choose File”. In the Finder or File Explorer, select your Keystore file such as
    kafka.client.keystore.jks
    SSL → Keystore password text box Specify the keystore password
    SSL → Truststore File button To upload a truststore file, select “Choose File”. In the Finder or File Explorer, select your truststore file such as
    kafka.client.truststore.jks
    SSL → Trustore password text box Specify the truststore password. Without a password, the truststore remains available, but there are no integrity checks.
    Custom Properties File button (+) To upload a .csv file, select +. Then select Upload a csv file. In the Finder or File Explorer, select your custom properties file in .csv format.

    Custom properties file

    In previous releases, it was necessary to create a kafka-consumer.properties file that affected the Kafka consumer group for the given Incorta Cluster. Starting in Release 5.1, you can now create and upload a custom properties file for each external data source that uses the Kafka connector.

    The properties file is in CSV format and must contain the following header: Property,Type,Value

    Here is an example:

    Property,Type,Value
    kafka.logger.class,string,MyClass
    kafka.logger.logWarning,boolean,true
    kafka.logger.logInfo,boolean,false
    kafka.auto.commit.interval.ms,numeric,10000
    kafka.request.timeout.ms,numeric,605000
    kafka.max.poll.interval.ms,numeric,600000

    To learn more about the available Consumer Configurations properties, please review Apache Kafka 2.1 Consumer Configs documentation.

    Create a physical schema with the Schema Wizard

    Here are the steps to create a Kafka physical schema with the Schema Wizard:

    • Sign in to the Incorta Direct Data Platform™.
    • In the Navigation bar, select Schema.
    • In the Action bar, select + New → Schema Wizard
    • In (1) Choose a Source, specify the following:

      • For Enter a name, enter the physical schema name.
      • For Select a Datasource, select the Kafka external data source.
      • Optionally create a description.
    • In the Schema Wizard footer, select Next.
    • In (2) Manage Tables, in the Data Panel, first select the name of the Data Source, and then check the Select All checkbox.
    • In the Schema Wizard footer, select Next.
    • In (3) Finalize, in the Schema Wizard footer, select Create Schema.

    Create a physical schema with the Schema Designer

    Here are the steps to create a Kafka physical schema using the Schema Designer:

    • Sign in to the Incorta Direct Data Platform™.
    • In the Navigation bar, select Schema.
    • In the Action bar, select + New → Create Schema.
    • In Name, specify the physical schema name, and select Save.
    • In the Schema Designer, in Start adding tables to your schema, select Kafka.
    • In the Data Source dialog, specify the Kafka table data source properties.
    • Select Add.
    • In the Table Editor, in the Table Summary section, enter the table name.
    • To save your changes, select Done in the Action Bar.

    Kafka table data source properties

    For a physical schema table, you can define the following Kafka specific data source properties as follows:

    Property Control Description
    Type drop down list Default is Kafka
    Data Source drop down list Select the Kafka external data source
    Query Text box Select the text box. In the Edit Query dialog, specify a SELECT statement for the table as defined in the .avro file.
    Callback toggle Enables the Callback URL field
    Callback URL text box This property appears when the Callback toggle is enabled. Specify the URL.

    Example of Query:
    SELECT id, dt, item, quantity, price FROM tblFruitSales.tblFruitSales
    Important

    The Kafka connector natively supports incremental loads for streaming messages from Kafka. There are no configurations to enable an Update query other than specifying one or more key columns in the physical schema table. You can schedule an incremental load for a Kafka physical schema table in a physical schema as frequently as every 5 minutes.

    View the physical schema diagram with the Schema Diagram Viewer

    Here are the steps to view the physical schema diagram using the Schema Diagram Viewer:

    • Sign in to the Incorta Direct Data Platform™.
    • In the Navigation bar, select Schema.
    • In the list of schemas, select the Kafka physical schema.
    • In the Schema Designer, in the Action bar, select Diagram.

    Load the physical schema

    Here are the steps to perform a Full Load of the Kafka physical schema using the Schema Designer:

    • Sign in to the Incorta Direct Data Platform™.
    • In the Navigation bar, select Schema.
    • In the list of physical schemas, select the Kafka physical schema.
    • In the Schema Designer, in the Action bar, select Load → Load Now → Full.
    • To review the load status, in Last Load Status, select the date.

    Here are the steps to perform an Incremental Load of the Kafka physical schema with the Schema Designer:

    • Sign in to the Incorta Direct Data Platform™.
    • In the Navigation bar, select Schema.
    • In the list of physical schemas, select the Kafka physical schema.
    • In the Schema Designer, in the Action bar, select Load → Load Now → Incremental.
    • To review the load status, in Last Load Status, select the date.

    Explore the physical schema

    With the full or incremental load of the Kafka physical schema complete, you can use the Analyzer to explore the physical schema, create your first insight, and save the insight to a new dashboard tab or an existing dashboard tab.

    To open the Analyzer from the physical schema, follow these steps:

    • In the Navigation bar, select Schema.
    • In the Schema Manager, in the List view, select the Kafka physical schema.
    • In the Schema Designer, in the Action bar, select Explore Data.

    Known issues

    By default, the Kafka connector converts Boolean and Date data types to String. You can use a formula column in a physical schema table to covert a String to a specific data type using one or more built-in functions.