Merge Multiple Spark Streams Into A Delta Table with working code

Canadian Data Guy
4 min readOct 13, 2022

This blog will discuss how to read from multiple Spark Streams and merge/upsert data into a single Delta Table. We will also optimize/cluster data of the delta table.

Overall, the process works in the following manner:

  1. Read data from a streaming source
  2. Use this special function foreachBatch. Using this we will call any user-defined function responsible for all the processing.
  3. Our user-defined function runs the Merge and Optimize over the target Delta table.

Architecture

First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc.

However, for simplicity we will use .format(’rate’) to generate a stream. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 1,000 rows per second across 100 partitions.

For the purpose of this blog, we will build 2 Spark streams one for each country Canada & USA.

USA’s stream

generated_streaming_usa_df = (
spark.readStream
.format("rate")
.option("numPartitions", 100)
.option("rowsPerSecond", 1 * 1000)
.load()
.selectExpr(
"md5( CAST (value AS STRING) ) as md5"
,"value"
,"value%1000000 as hash"
,"'USA' AS country"
,"current_timestamp() as ingestion_timestamp"
)
)
#display(generated_streaming_usa_df)

Canada’s Stream

generated_streaming_canada_df = (
spark.readStream
.format("rate")
.option("numPartitions", 100)
.option("rowsPerSecond", 1 * 1000)
.load()
.selectExpr(
"md5( CAST (value AS STRING) ) as md5"
,"value"
,"value%1000000 as hash"
,"'Canada' AS country"
,"current_timestamp() as ingestion_timestamp"
)
)
#display(generated_streaming_canada_df)

Parameters / Variables (Feel free to change as per your needs)

--

--

Canadian Data Guy

https://canadiandataguy.com | Data Engineering & Streaming @ Databricks | Ex Amazon/AWS | All Opinions Are My Own