Sources
All Structured Streaming queries start with defining a source, some place from which to read a stream of data (such as Apache Kafka). When you load a source (which we'll show shortly), you'll get back a streaming DataFrame, which is a Spark DataFrame that can be incrementally processed.
Built-in production and development data sources
Structured Streaming supports the following production sources out-of-the-box:
- File source - Reads the files from a directory. Files are processed in the order of the file modification time. Supported file formats are: text, CSV, JSON, ORC, and Parquet. Streaming from a file source enables you to keep your structured data in cloud storage, which is cheaper than storing your data in some other type of location, such as a relational database.
- Kafka source - Reads data from a Kafka-compatible broker, such as a Confluent cluster or Azure EventHubs.
Note
Some Spark vendors support additional production sources. See your vendor's documentation.
Structured Streaming also supports the following non-production sources for testing purposes:
- Socket source: Reads UTF-8 text data from a socket connection into a streaming DataFrame. Never use this in production, since it isn't fault-tolerant1.
- Rate source: Generates data at the specified number of rows per second into a streaming DataFrame to test performance. This source is useful when load testing your jobs, since it allows you to easily generate thousands of rows per second.
- Rate source per micro-batch: Generates data at the specified number of rows per micro-batch into a streaming DataFrame for performance testing. Unlike the rate data source, this data source provides a consistent set of input rows per micro-batch regardless of query execution (such as query lagging or trigger configuration).
Each of these sources support many options. See Source reference.
Tip
You can also use the file source as a testing source, rather than the socket source. To do this, create static DataFrames via spark.createDataFrame
and write them to a specific directory on your system. Then, read those files using a Structured Streaming job with the files
source.
Source reference
Expand the supported options boxes for each source type to find the specific option name and its supported values.
File source
The name for the file source format is one of the following: parquet
, orc
, json
, csv
, or text
. There are generically supported options for any file type, as well as file-format specific data source options for each specific file source. For specific options for each of these file sources, see the docs for Parquet, ORC, JSON, CSV, and text files.
Supported Options
Option Name | Information | Default | Required? |
---|---|---|---|
path |
The path to the input directory. Glob paths are supported, but multiple comma-separated globs are not supported. | None | Yes |
maxFilesPerTrigger |
The maximum number of new files to be considered in every trigger. | None | No |
latestFirst |
Whether to process the latest new files first. This is useful when there is a large backlog of files. | False | No |
fileNameOnly |
Whether to check new files based on only the filename instead of on the full path. With this set to true , the following files are considered as the same file: file:///dataset.txt and s3://a/dataset.txt . |
False | No |
maxFileAge |
All files older than this age are ignored. For the first batch though, all files are considered valid. If latestFirst is set to true , maxFilesPerTrigger takes precedence over maxFilesAge . The max age is specified with respect to the timestamp of the latest file, and not the current system time. |
7 days | No |
cleanSource |
Whether to clean up files after processing. Available options are: archive , delete , and off . The delete option deletes files permanently. The archive option copies files to the sourceArchiveDir ; if the source file is /a/data.txt and the archive directory is /archive , the file is moved to /archive/a/data.txt . |
None | No |
sourceArchiveDir |
Specifies the archive directory for cleaned-up files. It cannot be a sub-directory of path ; if it were, archived files would be considered new and processed over and over again. |
None | Only if cleanSource is set to archive . |
Example
The code below shows an example for the CSV source. Note that while the maxFilesPerTrigger
option is common to all file sources, the sep
option is specific to the CSV file source.
val spark: SparkSession = ...
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.format("csv")
.schema(userSchema) // Specify schema of the csv files
.option("path", "/path/to/directory")
.option("sep", ";")
.load()
SparkSession spark = ...
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
.readStream()
.format("csv")
.schema(userSchema) // Specify schema of the csv files
.option("path", "/path/to/directory")
.option("sep", ";")
.load()
Kafka source
The Kafka source is named kafka
. It's compatible with Kafka broker versions 0.10.0 or higher. See the Kafka Integration Guide for the supported options and examples.
Socket source
The socket source is named socket
. The listening server socket is at the driver.
Supported Options
Option Name | Information | Default | Required? |
---|---|---|---|
host |
The string of the host to connect to, such as localhost. | None | Yes |
port |
The integer of the host to connect to, such as 9999. | None | Yes |
Example
Rate Source
The name for the rate source format is rate
. Each output row contains a timestamp and value, where timestamp is a timestamp
data type containing the time of message dispatch, and value is of long
data type containing the message count, starting from 0 as the first row.
Supported Options
Option Name | Information | Default | Required? |
---|---|---|---|
rowsPerSecond |
How many rows should be generated per second. | 1 | No |
rampUpTime |
How long to ramp up before the generating speed becomes rowsPerSecond . Using finer granularities than seconds truncates to integer seconds. This option is not supported with the continuous mode trigger. |
0 | No |
numPartitions |
The number of partitions for the generated rows. The source will try its best to reach rowsPerSecond , but the query may be resource constrained. numPartitions can be tweaked to help reach the desired speed. |
Spark's default parallelism2 | No |
Example
Not available in R.
Rate source per micro-batch
The name for the rate source per micro-batch format is rate-micro-batch
. Each output row contains a timestamp and value, where timestamp is a timestamp
data type containing the time of message dispatch, and value is of long
data type containing the message count, starting from 0 as the first row. For example, batch 0 produces values 0~999 and batch 1 produces values 1000~1999, and so on. Every record produced has a different message count value and a different generated time stamp, even across partitions.
Supported Options
Option Name | Information | Default | Required? |
---|---|---|---|
rowsPerBatch |
How many rows should be generated per micro-batch. | 0 | No |
numPartitions |
The partition number for the generated rows. | Spark's default parallelism2 | No |
startTimestamp |
Starting value of generated time. | 0 | No |
advanceMillisPerBatch |
The number of milliseconds being advanced in generated time on each micro-batch. | 1000 | No |
Example
Not available in R.
-
A source is fault-tolerant if it is able to replay data in the case of failure. The socket source doesn't persist the data it receives, so it can't replay data. The file source and Kafka source both support replay, so they are considered fault-tolerant. ↩
-
Default parallelism refers to
spark.sql.shuffle.partitions
. It defaults to 200 as of Spark 3.4, but may change in future releases. ↩↩