Using Spark Streaming to merge/upsert data into a Delta Lake with working code

Canadian Data Guy
4 min readOct 12, 2022

This blog will discuss how to read from a Spark Streaming and merge/upsert data into a Delta Lake. We will also optimize/cluster data of the delta table. In the end, we will show how to start a streaming pipeline with the previous target table as the source.

Overall, the process works in the following manner, we read data from a streaming source and use this special function foreachBatch. Using this we will call any user-defined function responsible for all the processing. This function encapsulates the Merge and Optimize to the target Delta table.

First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. for simplicity. Let’s generate a stream using the below. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 10,000 rows per second across 100 partitions.

Generate streaming data at your desired rate

generated_df = (
.option("numPartitions", 100)
.option("rowsPerSecond", 10 * 1000)
"md5( CAST (value AS STRING) ) as md5"
,"value%1000000 as hash"


Parameters / Variables (Feel free to change as per your needs)

target_table_name = "to_be_merged_into_table"
check_point_location = f"/tmp/delta/{target_table_name}/_checkpoints/"
join_column_name ="hash"

Create an Empty Delta table so data could be merged into it

spark.sql(f"""  DROP TABLE IF EXISTS {target_table_name};""")
.option("checkpointLocation", check_point_location)

Check if data is populated


A user-defined function which does the data processing, Merge…



Canadian Data Guy | Data Engineering & Streaming @ Databricks | Ex Amazon/AWS | All Opinions Are My Own