Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Here I will be talking about and demonstrating structured streaming. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. This stream data can be files in HDFS or cloud storage like S3, message in Kafka topic, continuous data read from a TCP socket etc.
In this post, we will be using spark streaming as a batch job. By batch job, I mean, a scheduled recurrent processing of data. The question arises why on earth will you use spark streaming for batch processing? Well, there could several use cases like you want to leverage the spark streaming triggers. One can use the inherent features of Spark Streaming like checkpointing. Also, quickly build a job ensuring required level of fault tolerance.
Okay, lets get straight into the use case we want to tackle here.
The Problem Statement
We want a batch job that runs on a schedule. Every time the job runs, it looks for any new files at a location and process the new file. The batch should process each file only once. In the processing, we will simply read and save the file in a different location based on the file’s original location.
Let’s divide the problem statement in context of Spark Streaming:
Input Source: The source of the data is file.
Trigger: The stream will trigger on any new file in a location
Sink: The data from the stream is written out as a file onto different location on the basis of original source path. So sink is again File.
If you want to understand more about the various sources, triggers and sink visit the documentation here
The code and demo will be all created using Databricks Unified Data Analytics Platform. The platform provides notebooks powered by Apache Spark.
For user data, we will use a table. We will create a table and insert values in the table. I am using DBFS to store the table data, you can choose HDFS or S3 or any other distributed file system.
CREATE TABLE STREAMING_DATA( RECORD_ID INTEGER, A_NUMBER DOUBLE, A_TEXT STRING ) LOCATION '/dbfs/mnt/test_data/streaming_db/streaming_data'
You can find the complete setup notebook here.
We will insert new records in the above table and this will give the behaviour of new files landing at the location /dbfs/mnt/test_data/streaming_db/streaming_data
Spark Streaming Job
There will be a very simple spark structured streaming job that will look for csv files at the location /dbfs/mnt/test_data/streaming_db/streaming_data
The file_stream_df is the streaming dataframe. This dataframe is the unbounded table that will hold the new data as soon as new file lands atomically to the location. The stream schema defines each line in the csv as one single column.
We want to treat each file uniquely and want to be able to place it in a separate location. So here we are going to use
foreachBatch sink. The
foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query.
foreachBatch(...) allows you to specify a function that is executed on the output data of every micro-batch of a streaming query.
For our usecase, to write the input file to a different location, based on the original source path, the function looks like below:
We can very well parallellise the writing of the dataframe into different location. But we are interested in proving that the foreachBatch sink can help us achieve the needed result. Leave a comment if you want me to share more on that.
We can start the stream, by calling the writeStream on the streaming dataframe we created earlier. But, we want the spark streaming to behave like any other batch process. So will trigger the stream only once and it will stream all the files data at the input source to the foreachBatch output sink. As soon as all the data streaming is complete, the stream closes. The code to achieve all of that is below:
file_stream_df.writeStream .foreachBatch(foreach_batch_function) .trigger(once=True) .start()
As you can see the foreachBatch is passed the handler, foreach_batch_function, we created earlier. Also, note the trigger once parameter. Now this simple spark streaming application that run once and process all the files at the location can be scheduled like any other batch process.
Well, although our spark job achieves the batch behaviour we desired, it is missing a key aspect of spark streaming. Every time the scheduler triggers the spark application, it will process the same files again. In order to avoid this, we will make use of checkpointing.
The use of checkpointing in spark streaming to recover from failures and unintentional shutdowns of the streams. One can recover the previous progress and state of a previous query and continue where it left off. In our case, process the new files and ignore the old, processed ones. We will configure the writeStream with a checkpoint location, and the it will save all the progress information at that location using write-ahead logs. So our file writeStream query will look like below:
file_stream_df.writeStream.foreachBatch(foreach_batch_function) .option("checkpointLocation", "/dbfs/mnt/checkpoint_location") .trigger(once=True) .start()
Please leave a comment, if you found this helpful or would like to read more on spark streaming. Thanks for reading! 🙂