Using a Delta table as a streaming source
As data is added to this source from a stream, the data frame is dynamically updated.
💡
Only Append operations can be used to update the underlying table.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("Files/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.show()
Using a Delta table as a streaming sink
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("Files/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.show()
Query the table data is streamed into
%%sql
CREATE TABLE DeviceTable
USING DELTA
LOCATION 'Files/delta/devicetable';
SELECT device, status
FROM DeviceTable;
Stop the stream
delta_stream.stop()