Create from list of hard-coded rows

from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

# Inferred Schema
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# Explicit Schema
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')

Create from Padas DataFrame

pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)

DataFrame Inspection

# Show 1st row in default format
df.show(1)

# Show a row vertically
df.show(1, vertical=True)

# List columns
df.columns

# Show schema
df.printSchema()

# Summarize dataframe
df.select("a", "b", "c").describe().show()

# Show a single column
df.select(df.c).show()

# Derive a new column within the DataFrame
df.withColumn('upper_c', upper(df.c)).show()

# Apply a filter
df.filter(df.a == 1).show()

Use a UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

Grouping Data

# Create a hard-coded DataFrame
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])

# Select with GROUP BY
df.groupby('color').avg().show()

# Incorporate a UDF into the process
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

Conslidate Distributed Data from worker nodes to Driver Node

# Collect all rows into driver node data set (may throw out of memory if data set is very large
# df2 will be a Spark DataFrame
df2 = df.collect()

# Collect into a Pandas DataFrame on driver node (may throw out of memory if data set is very large
df3 = df.toPandas()

# Collect a smaller set (to avoid out of memory error)
df.take(1)
df.tail()