References → Data Ingestion and Loading

About data ingestion

To allow Incorta Direct Data Platform™ to analyze your data using its own or integrated tools, it needs to have access to your data. It also needs to extract data, persist it to its shared storage, and load it into its engine memory to allow its Analytics Service to access this data.

Data ingestion in Incorta Direct Data Platform™ starts with creating a data source that defines the properties of the connection with your data provider, then creating a physical schema object to comprise data from this data source, and finally loading and persisting data.

The following figure summarizes the process of data ingestion in Incorta Direct Data Platform™:

Data sources

Incorta Direct Data Platform™ can connect to external data as well as local data. External data can be a database, one or more files on a cloud storage service, or data in Software as a Service (SaaS) applications. Local data is one or more files uploaded to the shared storage of an Incorta cluster. Using the available connectors, you can create different data sources that define how Incorta Direct Data Platform™ can access your data. A data source defines the properties of the connection between Incorta and your data provider. For a list of the connectors available in Incorta, refer to Connectors → All.

Physical schemas and data discovery

After using suitable connectors to define data sources, you must create physical schema objects to encompass your data or to reference it. During the physical schema object creation, the Loader Service discovers your data and defines the object columns and their properties accordingly.

A physical schema object can be one of the following:

Only a physical schema table directly references one or more data sources to get data. Other objects in a physical schema reference one or more physical schema tables or other objects to get data from them. An alias only refers to the data of another existing physical schema object.

Data Loading

Incorta can analyze only data available to its Analytics Service. Thus, after defining data sources and creating physical schema tables that reference data in these data sources, you must load your data to Incorta shared storage and to its Engine memory to start analyzing it. Incorta Direct Data Platform™ stores the extracted data as Apache Parquet files in shared storage (staging) and creates Direct Data Mapping (DDM) files for objects according to the object type and configurations.

Load types

Incorta Direct Data Platform™ supports multiple types of loads:

  • Physical schema load and object load
  • On-demand and scheduled load
  • Full load, incremental load, and load from staging

Physical Schema load and object load

A load job can be for a single physical schema object or all applicable objects in a physical schema. As a schema manager user, you can use the Schema Designer to start a schema or object load job. You cannot start a load job for an alias as it is merely a reference to data of another existing physical schema object. In addition, the Loader Service will skip aliases during a physical schema load job.

A physical schema load job can be on-demand or scheduled, and incremental, full, or from staging. For a physical schema object, a load job can only be on-demand full or from staging.

Important

Some Data Lake connectors, such as Amazon Web Services (AWS) S3, support remote tables that you can use to access your data without the need to load it to Incorta shared storage or engine memory. The Loader Service does not extract or load data for a remote table. Thus, the Analytics Service cannot query this data unless you create an MV that references this remote table and load the MV data into Incorta. In addition, external visualization tools that Incorta integrates with, such as Power BI and Tableau, can access this remote data through the Incorta SQL Interface (SQLi).

On-demand and scheduled loads

You can start a load job on demand for the physical schema or a specific physical schema object. You can also schedule the loading of data for the whole physical schema. While you use the Schema Designer to start on-demand load jobs, you use either the Schema Manager or the Scheduler to schedule physical schema load jobs.

Full load, incremental load, and load from staging

Depending on the data source of your data, the connector you use to access it, and the table data source properties, you can perform different types of load jobs. These types of loading strategies are different in behavior and output.

For a detailed description of the output for each load strategy starting with release 5.1, refer to References → Data Consistency and Availability.

Full load

You can perform a full load for a physical schema or an object. By default, the Loader Service performs a full load for all physical schema tables and MVs. However, for physical schema tables and MVs that have incremental load enabled and full load disabled, the Loader Service may throw errors in the case of the first full load job or skips them during next full load jobs.

Note

Typically, a schema developer performs a full load of an object at least once before enabling the Disable Full Load property.

During a full load, the following occurs:

  • The Loader Service extracts data from the data source for each physical schema table or the single specified table according to the table data source properties.

  • The Loader Service creates new source parquet files in the source directory. The Loader Service creates a new parquet version directory with an offset subdirectory to save these files.

  • If the Data LoadingEnable Always Compact option is enabled in the Tenant Configurations, in the Cluster Management Console (CMC), the Loader Service also creates a compacted version of the object parquet files in the compacted directory. A data deduplication process precedes the compaction process to mark duplicate data that will be removed from the compacted parquet files.

  • For an MV, the Loader Service passes the query of the MV Script to Spark. Spark reads data from the parquet files of the underlying physical schema objects and creates new parquet files for the MV in a new parquet version directory in the source directory. A compacted version of the MV parquet files is also created if compaction is enabled.

    Note

    Spark reads the MV data from the compacted parquet files of the underlying object in the case that the underlying object is a physical schema table or another MV. However, starting with release 5.1.2, an MV can reference columns from Incorta SQL tables or Incorta Analyzer tables in other physical schemas. In this case, Spark will read data from the source parquet files of these Incorta tables as they do not have a compacted version.

  • For Incorta Analyzer tables and Incorta SQL tables, the Loader Service creates full parquet files in the source directory. Prior to release 5.1.2, the Loader Service would create snapshot DDM files for these tables in the ddm directory (also known as snapshot in older releases).

  • For physical schema tables and MVs with performance optimization enabled, the Loader Service loads data to the Engine memory. The Engine then calculates any formula columns, key columns, encrypted columns, or load filters for each object and creates snapshot DDM files. These files are saved to the schemas directory that exists in the ddm directory.

  • In the case that there is a join relationship where one of the physical schema objects is the child table, the Engine creates a new version of the join DDM files and saves them to the joins directory that exists in the ddm directory.

Important

The described behavior and output are applicable starting with release 5.1 where the Loader Service creates a new version of files. For older releases, a full load job deletes all existing parquet, DDM, and compacted files and creates new ones.

Incremental load

You can start an incremental load only for a physical schema. However, the Loader Service incrementally loads only physical schema tables and MVs that have incremental load enabled. For physical schema objects with incremental load disabled or that do not support incremental loads, such as Incorta Analyzer and SQL tables, the Loader Service performs a full load.

During an incremental load, the same behavior of the Loader Service for a full load occurs except for the following:

  • In the case of a physical schema table, the Loader Service extracts data from the data source according to the update query or the update file rather than the original query or file. As for an MV, the Loader Service passes the MV incremental script, not the original script, to Spark.
  • In the case of both physical schema tables and MVs, the Loader Service creates only a sub-directory (offset directory) for the new parquet files under the object’s latest parquet version directory.

Load from staging

Load from staging is the same as a full load but without the extraction and transformation phases. The Loader Service loads the latest existing parquet files for the respective object to the Engine memory, whether these files are loaded before (committed) or not (uncommitted). The Engine then calculates any joins, formula columns, key columns, encrypted columns, or load filters and creates a new version of the DDM files.

Load order and queueing

Physical schema and object queueing is a pre-load phase and is not part of the load job itself. However, it is an important phase where the Engine decides on the order of loading physical schemas and objects according to their dependencies. The Engine queues physical schemas to run in sequence or in parallel depending upon the join relationships and formula columns in the physical schema objects. The Engine queues physical schemas and objects with no preceding dependencies first and then the ones that depend on them, and so on. MVs, Incorta Analyzer Tables, and Incorta SQL Tables wait for their underlying objects to load first.

On the other hand, a schema manager user can specify the order of loading physical schema tables and MVs in a single physical schema. A schema manager user adds tables and MVs to separate load groups and orders these groups as appropriate.

In addition to manually defining the load order of MVs using load groups, release 5.1.2 introduces the automation of load ordering of MVs within a load group. The automated approach includes detecting the MV dependencies within the load group and ordering the load accordingly. This should both relieve the burden of manually defining the MV load ordering and allow for better performance and optimal resource utilization.

The new implementation allows detecting the MV dependencies and virtually dividing MVs within a load group into sub-groups where independent MVs will load first and then their dependent MVs, and so on. You can include the MVs you want in one load group, and leave the load ordering decision to the load job. For MVs with cyclic dependencies, ordering the MV load depends mainly on the alphabetical order of the MV names.

Load phases

A load job process goes through multiple phases that vary according to the object type, configuration, and load job type. The following are the major load phases:

  • Extraction
  • Transformation (Enrichment)
  • Load (and Post-Load)
Important

Within a single load job, each phase should be completed for all objects before starting the new phase. For example, the extraction of all physical schema tables should be completed before starting the transformation phase of MVs, and the extraction or transformation should be completed before the loading starts.

The following figure summarizes the data loading process:

Extraction

This is the first phase in a full or incremental load job for a physical schema table. Other objects do not go through this phase. During this phase,

  • The Loader Service extracts records from data sources and writes them into parquet files. In the case of a load from staging, the Loader Service loads existing parquet files. No extraction happens during a load from staging.
  • The Loader Service can extract multiple tables in parallel, depending on object dependencies and the availability of threads in the table thread pool. If a physical schema table contains multiple data sets, the Loader Service can also extract these data sets in parallel. Furthermore, if you configure a data set to use chunking (parallel extraction), the Loader Service also extracts this data set in multiple threads.
  • At the end of the extraction phase, the Loader Service creates a compacted version of the extracted parquet files without duplicate records. When the Enforce Primary Key Constraint property is enabled for an object in the Table Editor, the compaction process will include performing Primary Key index calculations to mark duplicate records that must be deleted during the compaction to ensure that only unique data records exist.

Transformation (Enrichment)

This is the first actual phase in a full or incremental load for an MV or SQLi table. The Loader Service submits queries to Spark to process these queries and waits for them to finish. Spark then creates new parquet files in shared storage for these objects.

For MVs that reference physical schema tables or other MVs, Spark reads from the compacted parquet files. If a compaction job is running for an object that an MV references, the Loader Service waits until the compaction is finished before sending queries to Spark.

For MVs that reference Incorta Analyzer tables or Incorta SQL tables, Spark reads data from the original parquet files.

Load and Post-Load

In this phase, the following applies:

  • For physical schema tables and MVs that have performance optimization enabled, the Engine loads data from the source parquet files into its memory.
  • After loading (post-load process) and in the case that a loaded object has one or more key columns, formula columns, load filters, encrypted columns, or joins (as a child table), the engine calculates them and writes snapshot DDM files.
  • During the post-load process, the Engine also calculates Incorta Analyzer tables and Incorta SQL tables, and then the Loader Service writes full parquet files for these tables.
Important

As the Loader Service calculates Incorta Analyzer tables and Incorta SQL tables during the Load and Post-Load phase and Spark processes MVs during the Transformation phase, an MV cannot reference an Incorta Analyzer table or Incorta SQL table that exists in the same physical schema as the MV.

Metadata database commit and synchronization

After loading data to Incorta shared storage and Engine memory, the Loader Service updates the Incorta metadata database with the load job information, including the latest available versions of parquet, DDM, and compacted files, data size, and commit time for each object in the load job. The Loader Service then sends a synchronization signal to the Analytics Service and other follower Loader Services so that they read from these latest versions.

Performance optimization

A schema manager user can enable performance optimization for the physical schema objects to make their data available to the Analytics Service. Enabling performance optimization instructs the Loader Service to load the object files to the Analytics Service Engine memory. However, enabling this feature may affect the load job time. For physical schema objects with performance optimization disabled, the Loader Service only creates parquet files in shared storage but does not load them to the Engine memory.

By default, Incorta Direct Data Platform™ enables Performance Optimization for all physical schema objects, except for an alias to which this property does not apply. As a schema manager user, you can disable or enable this feature per physical schema object in the Table Editor. You can also enable and disable it for one or more objects in the Schema Manager.

Important

You should not disable Performance Optimization for Incorta Analyzer tables and Incorta SQL tables to make them available to the Analytics Service. Starting with release 5.1.2, you cannot disable Performance Optimization for Incorta Analyzer tables or Incorta SQL tables in the Table Editor.

When you disable performance optimization for an object, its data is evicted from the Analytics Service Engine memory. When you enable it again, you need to perform either a full load or load from staging for the object.

Load job statuses

A load job can be completed successfully or can fail. You can use the following tools to view the status of a load job:

  • Schema Manager: shows the status and time of the last load job, if any, for each physical schema
  • Schema Designer: shows either the time of the last successful load job or the status of the last failed load job for a specific physical schema
  • Load Job Viewer: contains the load statuses and states for each load job for a given physical schema and object

The following are the possible statuses of a load job:

StatusDescription
In QueueThe physical schema is still in queue waiting for the Loader Service to start loading data.
RunningThe status of the load job during execution. A running load job has different states depending upon the stage or the phase of the loading process.
SucceededThe status of a load job that is completed successfully
Finished With ErrorsThe status of a load job that is completed but with some errors in any stage or phase in the load process
FailedThe status of a load job that failed completely for reasons such as server error or network error
InterruptedThe status of a load job that is interrupted (stopped or cancelled) by the user or for any other reason

5.1.2 enhancements to reduce deduplication resource overhead

The 5.1.2 release introduces enhancements to reduce the overhead of the PK index calculation, which leads to reducing memory and disk requirements, enhancing performance, and improving CPU utilization.

The new implementation for deduplicating records in Incorta's memory includes the following:

  • Scalable PK index calculation during incremental load jobs
  • Support skipping PK index calculation during full load jobs

Scalable PK index calculation

This release supports the scalability of the PK index calculation process, especially during an incremental load job. The implemented enhancements are as follows:

  • Parallel and scalable handling of parquet files during an incremental load job to enhance CPU utilization
  • Using temporary files when comparing existing and new records to reduce the required disk space
  • Loading only new parquet files into the Incorta engine memory instead of loading new and old files to reduce memory overhead

The scalable PK index calculation feature is enabled by default. The engine.properties file that exists in the Loader Service directory (<installation_path>/IncortaNode/services/<loader_service_directory>/incorta) has new settings to manage and configure it. A System Administrator with root access to the Incorta host machine can edit this file to disable or configure the related settings as required.

The following table shows the settings that control this feature in the engine.properties file:

SettingDescriptionImpacted ScenariosTypeDefault Value
store.horizontal_scalable_pk_index_updateEnable or disable the scalable PK Index featureFull load and Incremental loadBooleantrue
store.horizontal_scalable_pk_index_update_thresholdMaximum number of rows to handle by one threadIncremental loadLong10_000_000
store.horizontal_scalable_pk_index_update_below_threshold_threadsMinimum number of threads to useIncremental loadInteger4
store.horizontal_scalable_pk_index_max_threadsMaximum number of threads to useIncremental loadInteger40

Skip or enforce PK constraint

A user who belongs to a group with the Schema Manager role often defines one or more key columns for a physical schema entity object such as a physical schema table or materialized view. The table data source for this object usually contains unique values for the key column or at least unique records.

In the 5.1.2 release, you have the option to either enforce the calculation of the primary key at the object level or skip this calculation to optimize data load time and performance. This scenario applies to full load jobs only. In incremental load jobs, the Loader Service must compare existing and new data to avoid data duplication when key columns are defined.

When the physical schema table or materialized view has at least one key column, the Table Editor for this object shows the Enforce Primary Key Constraint option to enforce or skip the PK index calculation.

Important

This feature requires enabling the scalable PK index calculation at the engine level, which is the default configuration.

In the case of enabling the Enforce Primary Key Constraint option, which is the default state, the Loader Service evaluates and checks each value in the key column(s) and enforces record uniqueness. This process requires heavy resource utilization, especially for large datasets.

If you disable this option for an object, the Loader Service loads the source data without checking record uniqueness. This helps in enhancing the load time and performance.

Concurrent schema update and load jobs

Incorta does not recommend running concurrent schema model update jobs and load jobs on the same schema or dependent schemas as this may result in errors or inaccurate data.