Skip to content

Commit

Permalink
Add new Spark job sample
Browse files Browse the repository at this point in the history
  • Loading branch information
garystafford committed Sep 23, 2022
1 parent 9d386c5 commit 8e8f4f1
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 2 deletions.
1 change: 1 addition & 0 deletions apache_spark_examples/spark_batch_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Note: Expects (4) environment variables: BOOTSTRAP_SERVERS, TOPIC_PURCHASES, SASL_USERNAME, SASL_PASSWORD

import os

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
Expand Down
5 changes: 3 additions & 2 deletions apache_spark_examples/spark_streaming_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
# References: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
# Author: Gary A. Stafford
# Date: 2022-09-02
# Note: Expects (4) environment variables: BOOTSTRAP_SERVERS, TOPIC_PURCHASES, SASL_USERNAME, SASL_PASSWORD
# Note: Expects a min. of two environment variables: BOOTSTRAP_SERVERS, TOPIC_PURCHASES
# Optionally: AUTH_METHOD, SASL_USERNAME, SASL_PASSWORD

import os

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType, BooleanType
from pyspark.sql.window import Window


def main():
Expand Down
93 changes: 93 additions & 0 deletions apache_spark_examples/spark_streaming_kafka_running_totals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Purpose: Reads a stream of messages from a Kafka topic and
# writes a stream of running sales totals to console (stdout)
# References: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
# Author: Gary A. Stafford
# Date: 2022-09-23
# Note: Expects a min. of two environment variables: BOOTSTRAP_SERVERS, TOPIC_PURCHASES
# Optionally: AUTH_METHOD, SASL_USERNAME, SASL_PASSWORD

import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType, BooleanType


def main():
spark = SparkSession \
.builder \
.appName("kafka-streaming-query-running-totals") \
.getOrCreate()

spark.sparkContext.setLogLevel("INFO")

df_sales = read_from_kafka(spark)

summarize_sales(df_sales)


def read_from_kafka(spark):
options = {
"kafka.bootstrap.servers":
os.environ.get("BOOTSTRAP_SERVERS"),
"subscribe":
os.environ.get("TOPIC_PURCHASES"),
"startingOffsets":
"earliest"
}

if os.environ.get("AUTH_METHOD") == "sasl_scram":
options["kafka.security.protocol"] = "SASL_SSL"
options["kafka.sasl.mechanism"] = "SCRAM-SHA-512"
options["kafka.sasl.jaas.config"] = \
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{0}\" password=\"{1}\";".format(
os.environ.get("SASL_USERNAME"), os.environ.get("SASL_PASSWORD"))

df_sales = spark \
.readStream \
.format("kafka") \
.options(**options) \
.load()

return df_sales


def summarize_sales(df_sales):
schema = StructType([
StructField("transaction_time", TimestampType(), False),
StructField("transaction_id", IntegerType(), False),
StructField("product_id", StringType(), False),
StructField("price", FloatType(), False),
StructField("quantity", IntegerType(), False),
StructField("is_member", BooleanType(), True),
StructField("member_discount", FloatType(), True),
StructField("add_supplements", BooleanType(), True),
StructField("supplement_price", FloatType(), True),
StructField("total_purchase", FloatType(), False),
])

ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.groupBy("product_id") \
.agg(F.sum("total_purchase"), F.count("quantity")) \
.orderBy(F.col("sum(total_purchase)").desc()) \
.select("product_id",
F.format_number("sum(total_purchase)", 2).alias("sales"),
F.format_number("count(quantity)", 0).alias("drinks")) \
.coalesce(1) \
.writeStream \
.queryName("streaming_to_console") \
.trigger(processingTime="2 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 10) \
.option("truncate", False) \
.start()

ds_sales.awaitTermination()


if __name__ == "__main__":
main()

0 comments on commit 8e8f4f1

Please sign in to comment.