How to write your first Spark Stream Batch Join with working code

Canadian Data Guy
4 min readJan 26

When I started learning about Spark Streaming, I could not find enough code/material which could kick-start my journey and build my confidence. I wrote this blog to fill this gap which could help beginners understand how simple streaming is and build their first application.

In this blog, I will explain most things by first principles to increase your understanding and confidence and you walk away with code for your first Streaming application.

Photo by Ian Schneider on Unsplash

Scenario:

Let’s assume we have a streaming source with data arriving all the time. We want to add more attributes from another table( Think lookup table/ dimension table). Thus we will stream the data and join with the lookup table via Stream-Batch join. The result would be written as a Delta table, which could be used downstream for analytics or streaming.

Imports & Parameters

from pyspark.sql import functions as F
from faker import Faker
import uuid

# define schema name and where should the table be stored
schema_name = "test_streaming_joins"
schema_storage_location = "/tmp/CHOOSE_A_PERMANENT_LOCATION/"


# Please download this file from https://simplemaps.com/data/us-zips then download and place it at a location of your choice and then change the value for the variable below
static_table_csv_file = "/FileStore/jitesh.soni/data/us_zip_code_and_its_attributes.csv"

# Static table specification
static_table_name = "static_zip_codes"


# Target Stareaming Table specification
target_table_name = "joined_datasets"

# Recommend you to keep the checkpoint next to the Delta table so that you do have to notion about where the checkpoint is
checkpoint_location = f"{schema_storage_location}/{target_table_name}/_checkpoints/"Create Target Database
  • The below code will help create a schema/database with comments and storage locations for tables
create_schema_sql = f"""
CREATE SCHEMA IF NOT EXISTS {schema_name}
COMMENT 'This is {schema_name} schema'
LOCATION '{schema_storage_location}'
WITH DBPROPERTIES ( Owner='Jitesh');
"""
print(f"create_schema_sql: {create_schema_sql}")

Generate Static Or a lookup Dataset

Canadian Data Guy

https://canadiandataguy.com | Data Engineering & Streaming @ Databricks | Ex Amazon/AWS | All Opinions Are My Own

Recommended from Medium

Lists

See more recommendations