How to write your first Spark Stream Batch Join with working code
--
When I started learning about Spark Streaming, I could not find enough code/material which could kick-start my journey and build my confidence. I wrote this blog to fill this gap which could help beginners understand how simple streaming is and build their first application.
In this blog, I will explain most things by first principles to increase your understanding and confidence and you walk away with code for your first Streaming application.
Scenario:
Let’s assume we have a streaming source with data arriving all the time. We want to add more attributes from another table( Think lookup table/ dimension table). Thus we will stream the data and join with the lookup table via Stream-Batch join. The result would be written as a Delta table, which could be used downstream for analytics or streaming.
Imports & Parameters
from pyspark.sql import functions as F
from faker import Faker
import uuid
# define schema name and where should the table be stored
schema_name = "test_streaming_joins"
schema_storage_location = "/tmp/CHOOSE_A_PERMANENT_LOCATION/"
# Please download this file from https://simplemaps.com/data/us-zips then download and place it at a location of your choice and then change the value for the variable below
static_table_csv_file = "/FileStore/jitesh.soni/data/us_zip_code_and_its_attributes.csv"
# Static table specification
static_table_name = "static_zip_codes"
# Target Stareaming Table specification
target_table_name = "joined_datasets"
# Recommend you to keep the checkpoint next to the Delta table so that you do have to notion about where the checkpoint is
checkpoint_location = f"{schema_storage_location}/{target_table_name}/_checkpoints/"Create Target Database
- The below code will help create a schema/database with comments and storage locations for tables
create_schema_sql = f"""
CREATE SCHEMA IF NOT EXISTS {schema_name}
COMMENT 'This is {schema_name} schema'
LOCATION '{schema_storage_location}'
WITH DBPROPERTIES ( Owner='Jitesh');
"""
print(f"create_schema_sql: {create_schema_sql}")