Create from list of hard-coded rows
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
# Inferred Schema
df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# Explicit Schema
df = spark.createDataFrame([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
Create from Padas DataFrame
pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
DataFrame Inspection
# Show 1st row in default format
df.show(1)
# Show a row vertically
df.show(1, vertical=True)
# List columns
df.columns
# Show schema
df.printSchema()
# Summarize dataframe
df.select("a", "b", "c").describe().show()
# Show a single column
df.select(df.c).show()
# Derive a new column within the DataFrame
df.withColumn('upper_c', upper(df.c)).show()
# Apply a filter
df.filter(df.a == 1).show()
Use a UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
# Simply plus one by using pandas Series.
return series + 1
df.select(pandas_plus_one(df.a)).show()
def pandas_filter_func(iterator):
for pandas_df in iterator:
yield pandas_df[pandas_df.a == 1]
df.mapInPandas(pandas_filter_func, schema=df.schema).show()
Grouping Data
# Create a hard-coded DataFrame
df = spark.createDataFrame([
['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
# Select with GROUP BY
df.groupby('color').avg().show()
# Incorporate a UDF into the process
def plus_mean(pandas_df):
return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
Conslidate Distributed Data from worker nodes to Driver Node
# Collect all rows into driver node data set (may throw out of memory if data set is very large
# df2 will be a Spark DataFrame
df2 = df.collect()
# Collect into a Pandas DataFrame on driver node (may throw out of memory if data set is very large
df3 = df.toPandas()
# Collect a smaller set (to avoid out of memory error)
df.take(1)
df.tail()