Overview
For SQL developers that are familiar with SCD and merge statements, you may wonder how to implement the same in big data platforms, considering database or storages in Hadoop are not designed/optimised for record level updates and inserts.
In this post, I’m going to demonstrate how to implement the same logic as a SQL Merge statement by using Spark. All these operations will be done in memory after reading your source and target data. The code is implemented using Python by you can implement similar logic in Scala or R too.
Objective
Source data:
https://api.kontext.tech/resource/536f7c3f-f8af-5c56-ae58-9892207bf3cf
Target data (existing data, key is column id):
https://api.kontext.tech/resource/010285ea-35d2-5722-be97-5970c5898576
The purpose is to merge the source data into the target data set following a FULL Merge pattern.
https://api.kontext.tech/resource/2179c9a4-cb16-50c3-9789-8aa52c3420a7
Step by step
Imports the required packages and create Spark context
Follow the code below to import the required packages and also create a Spark context and a SQLContext object.
from pyspark.sql.functions import udf, lit, when, date_subfrom pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateTypeimport jsonfrom pyspark import SparkContext, SparkConf, SQLContextfrom pyspark.sql import Rowfrom datetime import datetimeappName = "Spark SCD Merge Example"master = "local"conf = SparkConf().setAppName(appName).setMaster(master)sc = SparkContext(conf=conf)sqlContext = SQLContext(sc)def quiet_logs(sc):logger = sc._jvm.org.apache.log4jlogger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)# hide info logsquiet_logs(sc)
In this example, we are running Spark in local mode and you can change the master to yarn or any others. I also hide the info logs by setting the log level to ERROR.
Create the target data frame
Use the following code to create a Spark data frame. In reality, you will need to read these data from HDFS, Hive or any other storages.
# Target data setdata_target = [Row(1, "Hello!", False, False, datetime.strptime('2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),Row(1, "Hello World!", True, False, datetime.strptime('2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),Row(2, "Hello Spark!", True, False, datetime.strptime('2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),Row(3, "Hello Old World!", True, False, datetime.strptime('2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))]schema_target = StructType([StructField("id", IntegerType(), True),StructField("attr", StringType(), True),StructField("is_current", BooleanType(), True),StructField("is_deleted", BooleanType(), True),StructField("start_date", DateType(), True),StructField("end_date", DateType(), True)])df_target = sqlContext.createDataFrame(sc.parallelize(data_target),schema_target)df_target.show()df_target.printSchema()
Create source data frame
Use similar approach to create a data frame that includes the source data.
# Source data setdata_source = [Row(1, "Hello World!"),Row(2, "Hello PySpark!"),Row(4, "Hello Scala!")]schema_source = StructType([StructField("src_id", IntegerType(), True),StructField("src_attr", StringType(), True)])df_source = sqlContext.createDataFrame(sc.parallelize(data_source),schema_source)df_source.show()df_source.printSchema()
Now, we can do a full join with these two data frames.
Implement full join between source and target data frames
As shown in the following code snippets, fullouter join type is used and the join keys are on column id and end_date. A new column action is also added to work what actions needs to be implemented for each record.
There are four possibilities for the actions:
- UPSERT: attributes have changed in the source and the existing records need to be expired and new records need to be inserted.
- DELETE: business keys no longer exist in source table and the records in target table need to be deleted logically.
- INSERT: new business keys exist in source that need to be inserted into the target table directly.
- NOACTION: no changes to the attributes or the records in target table are not current.
high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()print(high_date)current_date = datetime.today().date()print(current_date)# Prepare for merge - Added effective and end datedf_source_new = df_source.withColumn('src_start_date', lit(current_date)).withColumn('src_end_date', lit(high_date))# FULL Merge, join on key column and also high date column to make only join to the latest recordsdf_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &(df_source_new.src_end_date == df_target.end_date), how='fullouter')# Derive new column to indicate the actiondf_merge = df_merge.withColumn('action',when(df_merge.attr != df_merge.src_attr, 'UPSERT').when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE').when(df_merge.id.isNull(), 'INSERT').otherwise('NOACTION'))df_merge.show()
Implement the SCD type 2 actions
Now we can implement all the actions by generating different data frames:
# Generate the new data frames based on action codecolumn_names = ['id', 'attr', 'is_current','is_deleted', 'start_date', 'end_date']# For records that needs no actiondf_merge_p1 = df_merge.filter(df_merge.action == 'NOACTION').select(column_names)# For records that needs insert onlydf_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),df_merge.src_attr.alias('attr'),lit(True).alias('is_current'),lit(False).alias('is_deleted'),df_merge.src_start_date.alias('start_date'),df_merge.src_end_date.alias('end_date'))# For records that needs to be deleteddf_merge_p3 = df_merge.filter(df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))# For records that needs to be expired and then inserteddf_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),df_merge.src_attr.alias('attr'),lit(True).alias('is_current'),lit(False).alias('is_deleted'),df_merge.src_start_date.alias('start_date'),df_merge.src_end_date.alias('end_date'))df_merge_p4_2 = df_merge.filter(df_merge.action == 'UPSERT').withColumn('end_date', date_sub(df_merge.src_start_date, 1)).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(False)).select(column_names)
Union the data frames
Finally we can union the data frames to one.
# Union all records togetherdf_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)df_merge_final.orderBy(['id', 'start_date']).show()# At last, you can overwrite existing data using this new data frame.# ...
Complete code example
from pyspark.sql.functions import udf, lit, when, date_subfrom pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateTypeimport jsonfrom pyspark import SparkContext, SparkConf, SQLContextfrom pyspark.sql import Rowfrom datetime import datetimeappName = "Spark SCD Merge Example"master = "local"conf = SparkConf().setAppName(appName).setMaster(master)sc = SparkContext(conf=conf)sqlContext = SQLContext(sc)def quiet_logs(sc):logger = sc._jvm.org.apache.log4jlogger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)# hide info logsquiet_logs(sc)# Target data setdata_target = [Row(1, "Hello!", False, False, datetime.strptime('2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),Row(1, "Hello World!", True, False, datetime.strptime('2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),Row(2, "Hello Spark!", True, False, datetime.strptime('2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),Row(3, "Hello Old World!", True, False, datetime.strptime('2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))]schema_target = StructType([StructField("id", IntegerType(), True),StructField("attr", StringType(), True),StructField("is_current", BooleanType(), True),StructField("is_deleted", BooleanType(), True),StructField("start_date", DateType(), True),StructField("end_date", DateType(), True)])df_target = sqlContext.createDataFrame(sc.parallelize(data_target),schema_target)df_target.show()df_target.printSchema()# Source data setdata_source = [Row(1, "Hello World!"),Row(2, "Hello PySpark!"),Row(4, "Hello Scala!")]schema_source = StructType([StructField("src_id", IntegerType(), True),StructField("src_attr", StringType(), True)])df_source = sqlContext.createDataFrame(sc.parallelize(data_source),schema_source)df_source.show()df_source.printSchema()high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()print(high_date)current_date = datetime.today().date()print(current_date)# Prepare for merge - Added effective and end datedf_source_new = df_source.withColumn('src_start_date', lit(current_date)).withColumn('src_end_date', lit(high_date))# FULL Merge, join on key column and also high date column to make only join to the latest recordsdf_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &(df_source_new.src_end_date == df_target.end_date), how='fullouter')# Derive new column to indicate the actiondf_merge = df_merge.withColumn('action',when(df_merge.attr != df_merge.src_attr, 'UPSERT').when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE').when(df_merge.id.isNull(), 'INSERT').otherwise('NOACTION'))df_merge.show()# Generate the new data frames based on action codecolumn_names = ['id', 'attr', 'is_current','is_deleted', 'start_date', 'end_date']# For records that needs no actiondf_merge_p1 = df_merge.filter(df_merge.action == 'NOACTION').select(column_names)# For records that needs insert onlydf_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),df_merge.src_attr.alias('attr'),lit(True).alias('is_current'),lit(False).alias('is_deleted'),df_merge.src_start_date.alias('start_date'),df_merge.src_end_date.alias('end_date'))# For records that needs to be deleteddf_merge_p3 = df_merge.filter(df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))# For records that needs to be expired and then inserteddf_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),df_merge.src_attr.alias('attr'),lit(True).alias('is_current'),lit(False).alias('is_deleted'),df_merge.src_start_date.alias('start_date'),df_merge.src_end_date.alias('end_date'))df_merge_p4_2 = df_merge.filter(df_merge.action == 'UPSERT').withColumn('end_date', date_sub(df_merge.src_start_date, 1)).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(False)).select(column_names)# Union all records togetherdf_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)df_merge_final.orderBy(['id', 'start_date']).show()# At last, you can overwrite existing data using this new data frame.# ...
Other considerations
In this demo, I am reading all the target data into memory. In real world, you would not do this for performance consideration. Thus a better approach is to partition your data properly. For example, if the target data is stored in parquet format, you can partition the data by end_data. In this way, you only need to read the active partition into memory to merge with source data. And also you can only overwrite a single partition in parquet too to save IO operations. You will archive much better performance through this approach.
Refer to this document for more details:
https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files