Table Handling
Reading Data into a DataFrame
Load Spark DataFrame from CSV in Fabric Files Section
df = spark.read.load('Files/mydata.csv', format='csv', header=True)
Load TSV from Files Section
df = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "\t") \
.option("inferSchema","true") \
.load(f"Files/dw_source/{filename}.txt")
Load from Delta Table using SQL
df = spark.sql( \
"""
SELECT
first_name, last_name
FROM
customer
WHERE
customer_type = 1
""")
Save dataframe as Delta Table
Write Dataframe to table in managed storage
# Save the dataframe as a delta table
df.write.format("delta").saveAsTable("mytable")
Write Dataframe to table in Files folder
df.write.format("delta").saveAsTable("myexternaltable", path="Files/myexternaltable")
Write Dataframe to table in external blob storage
df.write.format("delta").saveAsTable("myexternaltable", path="abfss://my_store_url..../myexternaltable")
Save DataFrame as Delta table (no Metastore entry)
# default save (will not overwrite)
delta_path = "Files/mydatatable"
df.write.format("delta").save(delta_path)
# force overwrite
new_df.write.format("delta").mode("overwrite").save(delta_path)
# append to existing data
new_rows_df.write.format("delta").mode("append").save(delta_path)
Create table without writing data
Create a Fabric table with defined schema using Python
from delta.tables import *
DeltaTable.create(spark) \
.tableName("products") \
.addColumn("Productid", "INT") \
.addColumn("ProductName", "STRING") \
.addColumn("Category", "STRING") \
.addColumn("Price", "FLOAT") \
.execute()
Create a Fabric table with defined schema using SQL
%%sql
CREATE TABLE salesorders
(
Orderid INT NOT NULL,
OrderDate TIMESTAMP NOT NULL,
CustomerName STRING,
SalesTotal FLOAT NOT NULL
)
USING DELTA
Create a table reference to external Delta table. Schema is read from the external location, so is not required to be specified.
%%sql
CREATE TABLE MyExternalTable
USING DELTA
LOCATION 'Files/mydata'
CRUD Operations
SQL Syntax
Using SQL in Python API
new_rows_df.write.format("delta").mode("append").save(delta_path)
Using SQL with magic
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
Delta API
Update table in Files folder
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Describe history of table using time travel
%%sql
# metastore table
DESCRIBE HISTORY products
# external table in Files folder
DESCRIBE HISTORY 'Files/mytable'
Retrieve historical version of table
# by version number
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
# by timestamp
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)