Spark Best Practices
This section describes the best practices for using Spark for materialized views in Incorta.
In order to use materialized views with Incorta, you need to have the Spark server installed. By default, an Incorta Node in the Incorta Direct Data Platform bundles Apache Spark as part of its installation. As a result, you have two options:
- Use Incorta’s bundled Spark, in which case Apache Spark will be installed on the same machine as Incorta, and it is not recommended.
If you start the bundled Spark using Incorta’s launching scripts,
startSpark.sh/stopSpark.sh), this will result in starting a Spark cluster on the same machine as Incorta.
- Use a separate Spark installation.
Whether you use the bundled Spark, or a separate Spark, you may direct multiple Incorta instances to use that same Spark installation. Please refer to the following table for a list of pros and cons for this configuration option:
- Only one Spark instance to maintain. The host machine must have enough resources to handle all the incoming requests.
- Running Spark and Incorta on the same machine makes it easy to satisfy the need of sharing the same disk.
- A high risk of running a high load due to too many incoming requests for the available machine resources. This can lead to out-of-memory errors and crashing the system. Resolving such issues may require a reconfiguration for Spark and Incorta on multiple/bigger machines.
It is important to get familiar with some Spark-related configuration parameters. The default values of these parameters can be found in the
spark-env.sh files, in the following directory:
However, it is highly recommended to make changes to these parameters at the MV level in the Data Source definition window, in Incorta’s MV table definition page as shown in the following image:
To set a certain property, that property has to be uncommented in the
spark-env.sh file, with the exception of the properties listed in the Pre-Configured Properties table. Next, configure that property with a specific value to change the default for it, by clicking on Add Property.
examples for MV properties to add, these are sample values, and we can play with the values:
spark.driver.memory - 8G spark.executor.memory - 20G spark.executor.cores - 2 (1-5 max) spark.cores.max - 4 spark.sql.shuffle.partitions - 500
The following tables contain lists of all the available parameters (along with their description) to use as a reference when configuring specific values:
||Alternate conf dir.||
||Where log files are stored.||
||Where the pid file is stored.||
||A string representing this instance of spark.||
||The scheduling priority for daemons.||0|
||Run the proposed command in the foreground. It will not output a PID file.||N/A|
The following are recommended best practices for configuring a Spark environment for running MV jobs in Incorta:
- Balance the resources assigned to SQL Apps and MV. if you are not using SQL Apps, assign zero core and disable it.
- Ensure sufficient resources are allocated to run the materialized views vs Incorta when Spark and Incorta are running in the same host.
- On the Incorta machine, ensure you have sufficient resources left for Spark MV driver process
- Maximize the overall throughput by ensuring the MV jobs can run in parallel. This typically means giving proper # of cores, just enough for running the individual job. For executor memory, consider garbage collection and don’t give it more than enough.
- Tune the jobs that require more resources first.
- We need to first make sure the task can finish, then make it run faster.
- We need to tune Compaction as well. Spill to disk should be avoided since it increases the duration of Compaction.
- Number of executors vs number of cores for each executor. Some considerations: Memory is not shared across executors.
- Number of shuffle partitions. We typically match the number of shuffle with the number of cores. When the Spark process keep running out of memory, we may try to increase it to more than 2001 to ensure the job finishes.
- Use the language as SQL if there is only query for which you need to create a MV. For other complex usecase you can select Python as the language.
- Rather than denormalizing the dimensional attributes in the MV itself try creating a MV which has id and metric fields so that the id fields can be joined to the keys of the parent tables for resolving the dimensional attributes
- Incremental logic can be supported in a MV if a proper key can be defined on it.
- We can use the ? inside the MV sql, ? means last successful transformation timestamp value of MV. This is similar to ? used inside our regular SQL Queries. The value is stored in a long format representing a timestamp value. Eg
Select * FROM incorta_metadata.DASHBOARD WHERE CREATIONDATE > ?
The most accurate way to run incremental materialized views is to select the max date from the MV itself to get the incremented data accurately.
- In an Incremental MV please make sure that field order, field names (they are case sensitive) and field datatypes are the same in the full and incremental part of the code. If the datatypes are not the same then use the CAST function, for eg Timestamp to Date: CAST(thets AS DATE) AS thedate
- One more way to do incremental logic in a MV where we always want to refresh the last 30 days worth of data -
- Filtering je_lines by creation date from pyspark.sql.functions import lit, when, col, coalesce from datetime import datetime, timedelta
df_GL_JE_LINES = read("EBS_GL.GL_JE_LINES") d = datetime.today() - timedelta(days=30) df_GL_JE_LINES = df_GL_JE_LINES.filter(df_GL_JE_LINES.EFFECTIVE_DATE > lit(d)
Here is a sample script:
- Shows how to create a MV using dataframes and joining them via SQL, base tables are in an Incorta schema called TEST and should have been loaded:
from pyspark.sql import functions as F df_customers = read("TEST.customers") df_customers.createOrReplaceTempView("customers_table") df_submitters = read("TEST.submitters") df_submitters.createOrReplaceTempView("submitters_table") df = spark.sql(""" SELECT DISTINCT a.submitter_id, a.submitter_name, a.submitter_type, coalesce(upper(b.revenuegroupname), upper(a.customer_name)) AS submitter_finance_name, coalesce(b.revenuegroupnumber, a.customer_agn) AS submitter_financials_customer_id, a.vendor_id, a.vendor_name FROM submitters_table a LEFT OUTER JOIN customers_table b ON a.customer_agn=b.customerid WHERE lower(a.source_name)='test' """) save(df)
- Example of an Incremental MV
incremental MV example import pyspark.sql.functions as F from datetime import datetime, timedelta from pyspark.sql.functions import lit, when, col, coalesce
df_TRANSACTION_LINES_SOURCE = read("transactions.TRANSACTION_LINES_SOURCE") df_ITEMS = read("items.ITEMS") #Filtering by 7 days d = datetime.today() - timedelta(days=7) df_TRANSACTION_LINES_SOURCE = df_TRANSACTION_LINES_SOURCE.filter(df_TRANSACTION_LINES_SOURCE.DATE_MODIFIED > lit(d)) df_TRANSACTION_LINES_SOURCE.createOrReplaceTempView("TRANSACTION_LINES_SOURCE") df_ITEMS.createOrReplaceTempView("ITEMS") df1 = spark.sql(""" SELECT ts.ACCOUNT_ID, ts.AMOUNT, ts.AMOUNT_TAXED, ts.COMPLETE_0, CAST(ts.DATE_ARRIVE AS TIMESTAMP) + INTERVAL '8' HOUR AS DATE_ARRIVE, COALESCE(ts.BRAND_ID, it.BRAND_ID) AS CALCULATED_BRAND_ID FROM TRANSACTION_LINES_SOURCE AS ts LEFT OUTER JOIN ITEMS it ON ts.ITEM_ID = it.ITEM_ID WHERE 1=1 """) save(df1)
Following are some sample MV properties which we can add for performance, for eg this config uses total of 25G * 2 = 50G ram, we can tweak each one of them based on how spark has been configured and the amount of free RAM:
spark.driver.memory - 4G spark.executor.memory - 25G spark.executor.cores - 2 spark.cores.max - 2 spark.sql.shuffle.partitions - 200