Introduction to PySpark Part 1 – Creating DataFrames and Reading Data from Files
This is the first part in a series I’m putting together as an introduction to PySpark. I find myself looking up a lot as reference so I’ve put this together as a kind of cheatsheet for myself and hopefully it helps others too.
The other parts of this blog post series can be found here:
- Part 1 – Creating Data Frames and Reading Data from Files
- Part 2 – Selecting, Filtering and Sorting Data
- Part 3 – Adding, Updating and Removing Columns
- Part 4 – Summarising Data
- Part 5 – Aggregating Data
The entire table of contents across these posts is here:
- Part 1 – Creating DataFrames and Reading Data from Files
- Creating a DataFrame
- Reading CSV Files as DataFrames
- Reading JSON Files as DataFrames
- Reading Parquet Files as DataFrames
- Reading Data from a SQL DB
- Part 2 – Selecting, Filtering and Sorting Data
- Selecting Columns
- Filtering Rows
- Chaining Conditions
- String Matching
- Matching against a list
- Not (~)
- Filtering on Nulls
- Dropping Duplicates
- Sorting Data
- Part 3 – Adding, Updating and Removing Columns
- Adding Columns
- Summing Two Columns
- expr for Cumulative Sum
- Other Arithmetic Operators
- String Concatenation
- Updating Columns
- Renaming Columns
- Casting Columns
- When…Otherwise
- UDFs
- Filling Nulls
- Removing Columns
- Adding Columns
- Part 4 – Summarising Data
- Summary Statistics (Describe)
- Getting Column max, min, mean, stdev
- Getting Column quantiles
- Counting Rows and Distinct values
- Part 5 – Aggregating Data
- GroupBy
- Pivot
- Windowing
- Time Series Aggregation
- Aggregation
- Windowing
- Joining DataFrames
- Concatenating DataFrames using Union
Creating a DataFrame
Processing of data on Spark is done using a data structure known as a DataFrame and a python API is provided for interacting with these data structures – PySpark. Those familiar with pandas and R will be comfortable working with DataFrames though there are some differences. For example, unlike a pandas DataFrame, you don’t have an index of rows as well as columns that you can access to select particular rows as Series objects. However, like a pandas DataFrame, we can index on column names and each column must be of a single data type.
DataFrames are tabular structures and processing is done on these data structures in parallel across the worker nodes of the spark cluster.
Let’s look at creating a DataFrame from some data we’ve defined, let’s say we have some weather data:
weather_data = [
{'Town': 'London', 'Temperature': 14, 'Humidity': 0.6, 'Wind': 8, 'Precipitation': 0.0},
{'Town': 'Orlando', 'Temperature': 26, 'Humidity': 0.65, 'Wind': 10, 'Precipitation': 0.6},
{'Town': 'Cairo', 'Temperature': 23, 'Humidity': 0.37, 'Wind': 11, 'Precipitation': 0.0},
{'Town': 'Rio De Janeiro', 'Temperature': 32, 'Humidity': 0.76, 'Wind': 17, 'Precipitation': 0.9},
]
We could create a DataFrame by passing a list of PySpark Row
objects to the spark.createDataFrame
function, in which each of the row’s attributes are passed in as keyword arguments (kwargs) to the Row
object.
Let’s take a look:
from pyspark.sql import Row
weather_df = spark.createDataFrame(
map(lambda x: Row(**x), weather_data)
)
display(weather_df)
Town | Temperature | Humidity | Wind | Precipitation |
---|---|---|---|---|
London | 14 | 0.6 | 8 | 0.0 |
Orlando | 26 | 0.65 | 10 | 0.6 |
Cairo | 23 | 0.37 | 11 | 0.0 |
Rio De Janeiro | 32 | 0.76 | 17 | 0.9 |
Alternatively, we can use pandas as an intermediate, in which we first convert to a pandas DataFrame and use the same spark.createDataFrame
function to create our spark DataFame.
Let’s take a look:
import pandas as pd
weather_df = spark.createDataFrame(
pd.DataFrame(weather_data)
)
display(weather_df)
Town | Temperature | Humidity | Wind | Precipitation |
---|---|---|---|---|
London | 14 | 0.6 | 8 | 0.0 |
Orlando | 26 | 0.65 | 10 | 0.6 |
Cairo | 23 | 0.37 | 11 | 0.0 |
Rio De Janeiro | 32 | 0.76 | 17 | 0.9 |
Reading CSV Files as DataFrames
We can read CSV files by using spark.read.csv
and passing the filepath in as an argument. If the CSV files have a header supplied, you’ll also need to pass in the header=True
flag.
populations = spark.read.csv('/mnt/tmp/city_population.csv', header=True)
display(populations)
Population | Year | City |
---|---|---|
6.9 | 1991 | London |
7.2 | 2001 | London |
8.2 | 2011 | London |
2.5 | 2001 | Manchester |
2.7 | 2011 | Manchester |
By default, all 3 columns here are strings and the delimiter is chosen by default to be a comma, there are two ways around this we can pass in some options when we read our CSV:
population_df = spark.read.options(header=True, inferSchema=True, delimiter=',').csv('/mnt/tmp/city_population.csv')
Now our Population
is a DoubleType
, the Year
is an IntegerType
and the City
is a StringType
.
If we wanted to define these ourselves to ensure our data types are as we expect, we can provide a schema to do so:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
population_schema = StructType(fields=[
StructField('Population', DoubleType()),
StructField('Year', IntegerType()),
StructField('City', StringType()),
])
population_df = spark.read.csv('/mnt/tmp/city_population.csv', header=True, schema=population_schema)
display(population_df)
Population | Year | City |
---|---|---|
6.9 | 1991 | London |
7.2 | 2001 | London |
8.2 | 2011 | London |
2.5 | 2001 | Manchester |
2.7 | 2011 | Manchester |
Reading multiple CSVs from the same directory
We can provide a path to multiple CSVs in the same directory by providing a directory name or a wilcard using an asterisk.
Let’s say we have two CSV files in a data directory we want to read, one with data from 2013 and one with data from 2014. I’m using Databricks here so I’ll use the Databricks file system utility dbutils.fs
to list my directory:
dbutils.fs.ls('/mnt/tmp/ae')
FileInfo(path='dbfs:/mnt/tmp/ae/ae_2013.csv', name='ae_2013.csv', size=1816),
FileInfo(path='dbfs:/mnt/tmp/ae/ae_2014.csv', name='ae_2014.csv', size=1816)
]
I can read both files in by providing the directory:
ae_attendance = spark.read.csv('/mnt/tmp/ae/*', header=True)
display(ae_attendance.limit(10))
Date | Total Attendance | Total Attendence > 4 hours |
---|---|---|
W/E 06/01/2013 | 412,216 | 28,702 |
W/E 13/01/2013 | 389,236 | 20,628 |
W/E 20/01/2013 | 360,739 | 15,279 |
W/E 27/01/2013 | 388,036 | 20,031 |
W/E 03/02/2013 | 423,114 | 24,538 |
W/E 10/02/2013 | 415,039 | 21,682 |
W/E 17/02/2013 | 409,586 | 24,150 |
W/E 24/02/2013 | 400,726 | 21,980 |
W/E 03/03/2013 | 423,610 | 27,622 |
W/E 10/03/2013 | 430,769 | 31,483 |
We can see that we have the data from 2013 and if we look at the last few rows we can also see we have the 2014 data:
import pprint
pprint.pprint(ae_attendance.tail(num=10))
Row(Date='W/E 26/10/2014', Total Attendance='427,291', Total Attendence > 4 hours='26,789'),
Row(Date='W/E 02/11/2014', Total Attendance='417,460', Total Attendence > 4 hours='26,212'),
Row(Date='W/E 09/11/2014', Total Attendance='418,413', Total Attendence > 4 hours='27,364'),
Row(Date='W/E 16/11/2014', Total Attendance='429,287', Total Attendence > 4 hours='30,547'),
Row(Date='W/E 23/11/2014', Total Attendance='430,386', Total Attendence > 4 hours='26,324'),
Row(Date='W/E 30/11/2014', Total Attendance='433,100', Total Attendence > 4 hours='28,007'),
Row(Date='W/E 07/12/2014', Total Attendance='436,377', Total Attendence > 4 hours='35,912'),
Row(Date='W/E 14/12/2014', Total Attendance='440,447', Total Attendence > 4 hours='44,859'),
Row(Date='W/E 21/12/2014', Total Attendance='446,501', Total Attendence > 4 hours='49,825'),
Row(Date='W/E 28/12/2014', Total Attendance='403,314', Total Attendence > 4 hours='38,279')
]
Reading JSON Files as DataFrames
JSON files can be read from using spark.read.json
, if you have your data in the format:
[
{'id': 1, 'name': 'Ben'},
{'id': 2, 'name': 'Alex'},
]
You’ll also need to provide the multiline=True
option otherwise spark will try and read the lines with [
and ]
in as a record as well.
staff_details = spark.read.options(multiline=True).json('/mnt/tmp/staff_details.json')
By default the columns are read in alphabetical order (in a later post we’ll look at selecting columns, which can be used to re-order the columns):
display(staff_details.limit(10))
age | first_name | gender | id | last_name | profession |
---|---|---|---|---|---|
22 | Andrea | Female | 1 | Jacobs | VP Quality Control |
20 | Donald | Male | 2 | Davis | Geologist IV |
29 | Philip | Male | 3 | Harper | Director of Sales |
60 | Aaron | Male | 4 | Hunter | Financial Analyst |
31 | Judy | Female | 5 | King | Marketing Manager |
21 | Nicholas | Male | 6 | Wood | Teacher |
34 | Roger | Male | 7 | Warren | Librarian |
64 | Alice | Female | 8 | Cook | Software Test Engineer IV |
57 | Dennis | Male | 9 | Ortiz | Internal Auditor |
44 | Brenda | Female | 10 | Elliott | Assistant Media Planner |
If you have nested JSON structures or you want to provide your own schema you can do so and I have a dedicated blog post for this – Using PySpark to Read and Flatton JSON data with an enforced schema.
Reading Parquet Files as DataFrames
Parquet files are a columnar file storage format and on large datasets can provide significant space savings and improved scan and deserialisation times. It is often the default choice of file to work with intermediate datasets when working with big data where data is often needed and queried column-wise rather than row-wise like it might in a CSV.
With Parquet files you can choose to only read in the columns you need to and there are flexible compression options for your needs.
Let’s look at reading from a Parquet File using spark.read.parquet:
pokemon = spark.read.parquet('/mnt/tmp/pokemon.parquet')
display(pokemon.limit(10))
# | Name | Type1 | Type2 | HP | Attack | Defense | Sp.Atk | Sp.Def | Speed | Generation | Legendary |
---|---|---|---|---|---|---|---|---|---|---|---|
1 | Bulbasaur | Grass | Poison | 45 | 49 | 49 | 65 | null | 45 | 1 | false |
2 | Ivysaur | Grass | Poison | 60 | 62 | 63 | 80 | 80.0 | 60 | 1 | false |
3 | Venusaur | Grass | Poison | 80 | 82 | 83 | 100 | 100.0 | 80 | 1 | false |
3 | VenusaurMega Venusaur | Grass | Poison | 80 | 100 | 123 | 122 | 120.0 | 80 | 1 | false |
4 | Charmander | Fire | null | 39 | 52 | 43 | 60 | 50.0 | 65 | 1 | false |
5 | Charmeleon | Fire | null | 58 | 64 | 58 | 80 | 65.0 | 80 | 1 | false |
6 | Charizard | Fire | Flying | 78 | 84 | 78 | 109 | 85.0 | 100 | 1 | false |
6 | CharizardMega Charizard X | Fire | Dragon | 78 | 130 | 111 | 130 | 85.0 | 100 | 1 | false |
6 | CharizardMega Charizard Y | Fire | Flying | 78 | 104 | 78 | 159 | 115.0 | 100 | 1 | false |
7 | Squirtle | Water | null | 44 | 48 | 65 | 50 | 64.0 | 43 | 1 | false |
Unlike CSV files, we don’t need to specify that we want to infer the schema here as it is stored with the metadata of the parquet file.
Reading Data from a SQL DB
If you want to read data from a SQL DB you can do so using a JDBC connection.
If you want to connect to an Azure SQL DB, you’ll need to install the maven library "com.microsoft.azure:spark-mssql-connector_<version>"
I will often include this as part of a deployment pipeline by installing the Databricks CLI as part of the CD pipeline, connecting it to my Databricks cluster and installing it with the line:
databricks libraries install --cluster-id $(DATABRICKS-CLUSTER-ID) --maven-coordinates "com.microsoft.azure:spark-mssql-connector_2.12_3.0:1.0.0-alpha"
I also store my database keys in the Databricks secrets utility, but for now let’s take a look at an example without the secrets utility to keep it simple:
JDBC_URL = "jdbc:sqlserver://{}.database.windows.net:{};database={};"
sql_server = 'REDACTED'
sql_port = 1433
sql_db = 'REDACTED'
jdbc_url = JDBC_URL.format(sql_server, sql_port, sql_db)
sql_user = 'REDACTED'
sql_password = 'REDACTED!'
product_df = (
spark.read
.format("jdbc")
.option("url", jdbc_url)
.option("dbtable", 'dbo.Products')
.option("user", sql_user)
.option("password", sql_password)
.load()
)
display(product_df.limit(5))
Id | Name | Sku | CategoryId | SubCategoryId | BrandId | IsActive | ImageUri |
---|---|---|---|---|---|---|---|
1 | Forza Horizon 4 | XB358977 | 1 | 9 | 65 | true | |
2 | Red Dead Redemption 2 | XB360914 | 2 | 1 | 121 | true | |
3 | Overwatch | XB360915 | 2 | 1 | 124 | true | |
4 | PlayerUnknown’s Battlegrounds | XB361190 | 1 | 9 | 36 | true | |
5 | DOOM Eternal | XB359948 | 1 | 9 | 1 | true |