Ben Alex Keen

  • Home
  • About
  • Blog
  • Contact
  • Search

Using PySpark to Read and Flatten JSON data with an enforced schema

29th January 2021|In Azure, Python, Spark|By Ben Keen

Using PySpark to Read and Flatten JSON data with an enforced schema

In this post we’re going to read a directory of JSON files and enforce a schema on load to make sure each file has all of the columns that we’re expecting.

In our input directory we have a list of JSON files that have sensor readings that we want to read in.

These are stored as daily JSON files.

In [0]:
IN_DIR = '/mnt/data/'
dbutils.fs.ls(IN_DIR)
Out[1]: [FileInfo(path='dbfs:/mnt/data/snx001_2020-01-01.json', name='snx001_2020-01-01.json', size=271360), 
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-02.json', name='snx001_2020-01-02.json', size=271293),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-03.json', name='snx001_2020-01-03.json', size=271379),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-04.json', name='snx001_2020-01-04.json', size=271330),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-05.json', name='snx001_2020-01-05.json', size=271356),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-06.json', name='snx001_2020-01-06.json', size=271328),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-07.json', name='snx001_2020-01-07.json', size=271359),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-08.json', name='snx001_2020-01-08.json', size=271355),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-09.json', name='snx001_2020-01-09.json', size=271399),
FileInfo(path='dbfs:/mnt/data/snx001_2020-01-10.json', name='snx001_2020-01-10.json', size=271403)]

Our sensor data is stored in the format:

{
    "sensorName": "snx001",
    "sensorDate": "2020-01-01",
    "sensorReadings": [
        {
            "sensorChannel": 1,
            "sensorReading": 3.7465084060850105,
            "datetime": "2020-01-01 00:00:00"
        },
        {
            "sensorChannel": 2,
            "sensorReading": 10.543041369293153,
            "datetime": "2020-01-01 00:00:00"
        }
        ...
    ]
}

So next we’re going to define a schema that we can enforce to read our data.

In this way, any fields that are missing will be filled in with null values and non-nullable values aren’t included.

In [1]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType

sensor_schema = StructType(fields=[
    StructField('sensorName', StringType(), False),
    StructField('sensorDate', StringType(), True),
    StructField(
        'sensorReadings', ArrayType(
            StructType([
                StructField('sensorChannel', IntegerType(), False),
                StructField('sensorReading', DoubleType(), True),
                StructField('datetime', StringType(), True)
            ])
        )
    )
])

Now we can use our schema to read the JSON files in our directory

In [2]:
data_df = spark.read.json(IN_DIR + '*.json', schema=sensor_schema)
In [3]:
data_df.show()
+----------+----------+--------------------+
|sensorName|sensorDate|      sensorReadings|
+----------+----------+--------------------+
|    snx001|2020-01-10|[[1, 4.0787694356...|
|    snx001|2020-01-09|[[1, 4.8970115788...|
|    snx001|2020-01-03|[[1, 3.9999783849...|
|    snx001|2020-01-01|[[1, 5.9464029459...|
|    snx001|2020-01-07|[[1, 3.8237213375...|
|    snx001|2020-01-05|[[1, 4.5293829342...|
|    snx001|2020-01-08|[[1, 6.5363553381...|
|    snx001|2020-01-04|[[1, 4.0468524404...|
|    snx001|2020-01-06|[[1, 5.1075114236...| 
|    snx001|2020-01-02|[[1, 3.4558939509...|
+----------+----------+--------------------+

We want the data that’s nested in "sensorReadings" so we can use explode to get these sub-columns.

In [4]:
from pyspark.sql.functions import explode

data_df = data_df.select(
    "sensorName",
    explode("sensorReadings").alias("sensorReadingsExplode")
).select("sensorName", "sensorReadingsExplode.*")

data_df.show()
+----------+-------------+------------------+--------------------+
|sensorName|sensorChannel|     sensorReading|            datetime|
+----------+-------------+------------------+--------------------+
|    snx001|            1| 4.078769435609662|2020-01-10T00:00:00Z|
|    snx001|            2|10.751988380788951|2020-01-10T00:00:00Z|
|    snx001|            1| 6.437546649615771|2020-01-10T00:01:00Z|
|    snx001|            2| 9.490029697398635|2020-01-10T00:01:00Z|
|    snx001|            1| 6.097560703497864|2020-01-10T00:02:00Z|
|    snx001|            2| 9.630233393260067|2020-01-10T00:02:00Z|
|    snx001|            1| 4.846386234313744|2020-01-10T00:03:00Z|
|    snx001|            2| 10.34636991122751|2020-01-10T00:03:00Z|
|    snx001|            1| 4.820133008912831|2020-01-10T00:04:00Z|
|    snx001|            2|10.873462194665743|2020-01-10T00:04:00Z|
|    snx001|            1|3.3968669621831964|2020-01-10T00:05:00Z|
|    snx001|            2| 8.458263097266412|2020-01-10T00:05:00Z|
|    snx001|            1| 4.450543706075642|2020-01-10T00:06:00Z|
|    snx001|            2|10.813013331419645|2020-01-10T00:06:00Z|
|    snx001|            1| 5.107672984936202|2020-01-10T00:07:00Z|
|    snx001|            2| 10.15456942322968|2020-01-10T00:07:00Z|
|    snx001|            1| 4.784140060483603|2020-01-10T00:08:00Z|
|    snx001|            2| 9.532214386858834|2020-01-10T00:08:00Z|
|    snx001|            1| 4.626221099761967|2020-01-10T00:09:00Z|
|    snx001|            2|10.315756654144062|2020-01-10T00:09:00Z|
+----------+-------------+------------------+--------------------+
only showing top 20 rows

Now we’re ready to save our combined data

In [5]:
data_df.write.parquet(IN_DIR + 'snx001.parquet')
  • Prev
  • Next
© 2021 Ben Alex Keen All rights reserved