Using PySpark to Read and Flatten JSON data with an enforced schema
In this post we’re going to read a directory of JSON files and enforce a schema on load to make sure each file has all of the columns that we’re expecting.
In our input directory we have a list of JSON files that have sensor readings that we want to read in.
These are stored as daily JSON files.
In [0]:
IN_DIR = '/mnt/data/'
dbutils.fs.ls(IN_DIR)
Out[1]: [FileInfo(path='dbfs:/mnt/data/snx001_2020-01-01.json', name='snx001_2020-01-01.json', size=271360),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-02.json', name='snx001_2020-01-02.json', size=271293),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-03.json', name='snx001_2020-01-03.json', size=271379),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-04.json', name='snx001_2020-01-04.json', size=271330),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-05.json', name='snx001_2020-01-05.json', size=271356),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-06.json', name='snx001_2020-01-06.json', size=271328),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-07.json', name='snx001_2020-01-07.json', size=271359),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-08.json', name='snx001_2020-01-08.json', size=271355),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-09.json', name='snx001_2020-01-09.json', size=271399),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-10.json', name='snx001_2020-01-10.json', size=271403)]
Our sensor data is stored in the format:
{
"sensorName": "snx001",
"sensorDate": "2020-01-01",
"sensorReadings": [
{
"sensorChannel": 1,
"sensorReading": 3.7465084060850105,
"datetime": "2020-01-01 00:00:00"
},
{
"sensorChannel": 2,
"sensorReading": 10.543041369293153,
"datetime": "2020-01-01 00:00:00"
}
...
]
}
So next we’re going to define a schema that we can enforce to read our data.
In this way, any fields that are missing will be filled in with null values and non-nullable values aren’t included.
In [1]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
sensor_schema = StructType(fields=[
StructField('sensorName', StringType(), False),
StructField('sensorDate', StringType(), True),
StructField(
'sensorReadings', ArrayType(
StructType([
StructField('sensorChannel', IntegerType(), False),
StructField('sensorReading', DoubleType(), True),
StructField('datetime', StringType(), True)
])
)
)
])
Now we can use our schema to read the JSON files in our directory
In [2]:
data_df = spark.read.json(IN_DIR + '*.json', schema=sensor_schema)
In [3]:
data_df.show()
+----------+----------+--------------------+
|sensorName|sensorDate| sensorReadings|
+----------+----------+--------------------+
| snx001|2020-01-10|[[1, 4.0787694356...|
| snx001|2020-01-09|[[1, 4.8970115788...|
| snx001|2020-01-03|[[1, 3.9999783849...|
| snx001|2020-01-01|[[1, 5.9464029459...|
| snx001|2020-01-07|[[1, 3.8237213375...|
| snx001|2020-01-05|[[1, 4.5293829342...|
| snx001|2020-01-08|[[1, 6.5363553381...|
| snx001|2020-01-04|[[1, 4.0468524404...|
| snx001|2020-01-06|[[1, 5.1075114236...|
| snx001|2020-01-02|[[1, 3.4558939509...|
+----------+----------+--------------------+
We want the data that’s nested in "sensorReadings"
so we can use explode to get these sub-columns.
In [4]:
from pyspark.sql.functions import explode
data_df = data_df.select(
"sensorName",
explode("sensorReadings").alias("sensorReadingsExplode")
).select("sensorName", "sensorReadingsExplode.*")
data_df.show()
+----------+-------------+------------------+--------------------+
|sensorName|sensorChannel| sensorReading| datetime|
+----------+-------------+------------------+--------------------+
| snx001| 1| 4.078769435609662|2020-01-10T00:00:00Z|
| snx001| 2|10.751988380788951|2020-01-10T00:00:00Z|
| snx001| 1| 6.437546649615771|2020-01-10T00:01:00Z|
| snx001| 2| 9.490029697398635|2020-01-10T00:01:00Z|
| snx001| 1| 6.097560703497864|2020-01-10T00:02:00Z|
| snx001| 2| 9.630233393260067|2020-01-10T00:02:00Z|
| snx001| 1| 4.846386234313744|2020-01-10T00:03:00Z|
| snx001| 2| 10.34636991122751|2020-01-10T00:03:00Z|
| snx001| 1| 4.820133008912831|2020-01-10T00:04:00Z|
| snx001| 2|10.873462194665743|2020-01-10T00:04:00Z|
| snx001| 1|3.3968669621831964|2020-01-10T00:05:00Z|
| snx001| 2| 8.458263097266412|2020-01-10T00:05:00Z|
| snx001| 1| 4.450543706075642|2020-01-10T00:06:00Z|
| snx001| 2|10.813013331419645|2020-01-10T00:06:00Z|
| snx001| 1| 5.107672984936202|2020-01-10T00:07:00Z|
| snx001| 2| 10.15456942322968|2020-01-10T00:07:00Z|
| snx001| 1| 4.784140060483603|2020-01-10T00:08:00Z|
| snx001| 2| 9.532214386858834|2020-01-10T00:08:00Z|
| snx001| 1| 4.626221099761967|2020-01-10T00:09:00Z|
| snx001| 2|10.315756654144062|2020-01-10T00:09:00Z|
+----------+-------------+------------------+--------------------+
only showing top 20 rows
Now we’re ready to save our combined data
In [5]:
data_df.write.parquet(IN_DIR + 'snx001.parquet')