Table Handling

Reading Data into a DataFrame

Load Spark DataFrame from CSV in Fabric Files Section

df = spark.read.load('Files/mydata.csv', format='csv', header=True)

Load TSV from Files Section

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema","true") \
    .load(f"Files/dw_source/{filename}.txt")

Load from Delta Table using SQL

df = spark.sql( \
"""
SELECT
  first_name, last_name
FROM
  customer
WHERE
  customer_type = 1
""")

Save dataframe as Delta Table

Write Dataframe to table in managed storage

# Save the dataframe as a delta table
df.write.format("delta").saveAsTable("mytable")

Write Dataframe to table in Files folder

df.write.format("delta").saveAsTable("myexternaltable", path="Files/myexternaltable")

Write Dataframe to table in external blob storage

df.write.format("delta").saveAsTable("myexternaltable", path="abfss://my_store_url..../myexternaltable")

Save DataFrame as Delta table (no Metastore entry)

# default save (will not overwrite)
delta_path = "Files/mydatatable"
df.write.format("delta").save(delta_path)

# force overwrite
new_df.write.format("delta").mode("overwrite").save(delta_path)

# append to existing data
new_rows_df.write.format("delta").mode("append").save(delta_path)

Create table without writing data

Create a Fabric table with defined schema using Python

from delta.tables import *

DeltaTable.create(spark) \
  .tableName("products") \
  .addColumn("Productid", "INT") \
  .addColumn("ProductName", "STRING") \
  .addColumn("Category", "STRING") \
  .addColumn("Price", "FLOAT") \
  .execute()

Create a Fabric table with defined schema using SQL

%%sql

CREATE TABLE salesorders
(
    Orderid INT NOT NULL,
    OrderDate TIMESTAMP NOT NULL,
    CustomerName STRING,
    SalesTotal FLOAT NOT NULL
)
USING DELTA

Create a table reference to external Delta table.  Schema is read from the external location, so is not required to be specified.

%%sql

CREATE TABLE MyExternalTable
USING DELTA
LOCATION 'Files/mydata'

CRUD Operations

SQL Syntax

Using SQL in Python API

new_rows_df.write.format("delta").mode("append").save(delta_path)

Using SQL with magic

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Delta API

Update table in Files folder

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Describe history of table using time travel

%%sql

# metastore table
DESCRIBE HISTORY products

# external table in Files folder
DESCRIBE HISTORY 'Files/mytable'

Retrieve historical version of table

# by version number
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

# by timestamp
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)