Using Spark with Incorta

Apache Spark is an open-source, general-purpose, cluster-computing framework. Spark excels at iterative computation and includes numerous libraries for statistical analysis, graph computations, machine learning, deep learning, and SQL-based data access. As a general-purpose framework for cluster computing and distributed applications, Spark supports batch applications, streaming data, machine learning algorithms, and concurrent queries. Combining different processing types for production data pipelines is a common use case for Spark.

Spark can run both computations in memory and on disk. Broad support exists for Spark’s APIs in Java, Scala, R, and Python. Spark can natively run in an Apache Hadoop cluster and can access numerous data sources such as Apache Cassandra, Hive Tables, Parquet Files, and JDBC/ODBC databases. A Spark cluster can be tuned to be very fast for supporting parallelized computational workloads. Fast speeds mean that analytics applications and data scientists can process large data sets very quickly and interactively.

As a general-purpose computation framework, Apache Spark contains various libraries for many data sources and for data transformations. With Incorta’s tightly packaged integration with Spark, you can seamlessly integrate Incorta with Apache Spark to unlock advance uses of the platform.

Use Cases of Using Spark with Incorta

When you enable Spark, there are two primary use cases:

  1. Create Incorta “Materialized Views.”
  2. Increase the abilities of the Incorta SQL Interface.

In order to understand more about the two use cases for Spark, you must understand how Spark integrates with the Incorta architecture.

A single Spark cluster, either existing or bundled (standalone) with Incorta, is used in the integration. In the reference architecture above, the same Spark cluster is depicted in two places to illustrate the two use cases. You can either run the standalone Spark cluster that installs with Incorta (either on the same server as other Incorta processes or on a dedicated server for Spark) or they can configure Incorta to integrate with a pre-existing Spark cluster as long as it matches the bundled version number (which as of Incorta 4.3 is Spark version 2.3.1). Per this reference architecture, the primary two ways in which Spark is used in conjuction with the Incorta platform are called out below.

How Spark Works with Incorta “Materialized Views”

The loader service integrates with Spark in order to create Materialized Views. You can use scripting with Python and Spark to handle many use cases, for example:

  • Create bridge tables to resolve many to many relationships.
  • Create an aggregated table from a detail table and use the aggregated table to join as a parent table. For example, you have a line table and a distribution table, and the distribution table is at a lower grain than the line table. To keep the dashboard query with the line table as the base table and to get an aggregated metric from the distribution table, create a materialized view on the distribution table with a key from the line table that is joined to the parent table.
  • Convert Oracle PL/SQL procedures where multi-pass logic is required
  • Create predictive analytics using Spark ML and NumPy
  • Perform data quality analysis

The following sample python script is an example of how you can use PySpark to define an Incorta Materialized View. In this example, this python script reads all of the records from both the Incorta “SALES” and “PRODUCT” tables found in the Incorta “SALES” schema. The script also reads the records from the same Materialized View “SALES_PRODUCTS” from the “SALES_MV” schema. Each of these tables are read into PySpark dataframes. Once you register the tables as temporary tables, SparkSQL libraries can use these dataframes to update the “SALES_PRODUCT” table since the last update.

s=read("SALES.SALES")
s.registerTempTable("sales")
P= read("SALES.PRODUCTS")
P.registerTempTable("products")
--here we are going to read the MV itself.
mv = read("SALES_MV.SALES_PRODUCTS")
mv.registerTempTable("mv")
selectedSales = sqlContext.sql("SELECT p.name, s.*  from products p inner join sales s on p.id= s.product_id
WHERE s.last_updated > (SELECT MAX(mv.last_updated) FROM mv)")
save(selectedSales)

Increase the abilities of Incorta SQL Interface

The analytics service integrates with Spark, specifically the SparkSQL libraries available in the Spark framework, in order to pass off complex SQL queries from the SQL interface that cannot be handled by the Analytics Service’s In-Memory Direct Data Mapping Engine or to query parquet files directly.