Parallel task processing from a message queue using Azure Kubernetes Service and python

Parallel task processing from a message queue using Azure Kubernetes Service and python

In this post, we’re going to have a look at using Azure Kubernetes Service to scale out the processing of tasks from a message queue using Azure Kubernetes Service (AKS).

We will read in some weather data that has temperature values at a 1-minute granularity, e.g.:

Datetime Temperature
2017-12-01 00:00:00 16.7
2017-12-01 00:01:00 16.8
2017-12-01 00:02:00 16.8

We have 6 CSV files with temperatures at different locations stored in Azure Blob Storage:

  • temperature1.csv
  • temperature2.csv
  • temperature3.csv
  • temperature4.csv
  • temperature5.csv
  • temperature6.csv

And we will be downloading these CSV files aggregating them up to hourly granularity and taking the mean, standard deviation, minimum and maximum temperatures for each hour and storing it back to Azure Blob Storage.

We will split out this processing across nodes on a kubernetes cluster in Azure Kubernetes Service. The files to accompany this blog post can be found here. Some of the commands in this post assume you have cloned this repository and are in the root of this repository.

Create Resource Group

We’ll start by creating a resource group for all of our resources, I’m in West Europe so I’ll use that location:

az group create --name myAKSResourceGroup --location westeurope

We should have an output that looks like:

{
  "id": "/subscriptions/7dfe4fd7-24b7-466f-90d0-467628c1d8b2/resourceGroups/myAKSResourceGroup",
  "location": "westeurope",
  "managedBy": null,
  "name": "myAKSResourceGroup",
  "properties": {
    "provisioningState": "Succeeded"
  },
  "tags": null
}

Create Cluster

We’ll now create our Azure Kubernetes Service (AKS) cluster. We will create a 3 node cluster:

az aks create --resource-group myAKSResourceGroup --name redisQueueCluster --node-count 3 --generate-ssh-keys

This will take a few minutes and you’ll get an output with information about your cluster.

Set kubectl credentials

Kubectl is the kubernetes command line interface, if we have the azure command line tools or (gcloud tools for anyone that’s used GCP as well) installed then we’ll already have kubectl.

To set up the credentials for kubectl, we can run:

az aks get-credentials --resource-group myAKSResourceGroup --name redisQueueCluster

To check this has worked and to view the nodes in our cluster, we’ll then run:

kubectl get nodes

And we should see an output that resembles the following:

NAME                       STATUS    ROLES     AGE       VERSION
aks-nodepool1-26984785-0   Ready     agent     2m       v1.9.11
aks-nodepool1-26984785-1   Ready     agent     2m       v1.9.11
aks-nodepool1-26984785-2   Ready     agent     2m       v1.9.11

Create redis service

We’ll create the redis service in the kubernetes cluster (note that your workers could just as easily read from an external redis instance or Azure Redis Cache).

We’ll first create a redis pod (process running redis). We’ll use our redis-pod.yaml file to set this up, the file contents look like:

apiVersion: v1
kind: Pod
metadata:
  name: redis-master
  labels:
    app: redis
spec:
  containers:
    - name: master
      image: redis
      env:
        - name: MASTER
          value: "true"
      ports:
        - containerPort: 6379

And we run the command:

kubectl create -f ./redis-pod.yaml

We’ll then create a redis service (endpoint to make the pod accessible to other pods/externally). We’ll make this pod exposed externally using our cloud provider’s load balance, so that we can add tasks from our local machine, but if you were adding tasks from within the cluster, there’s no need to expose this pod externally. The redis-service.yaml file looks like:

apiVersion: v1
kind: Service
metadata:
  name: redis
spec:
  ports:
    - port: 6379
      targetPort: 6379
  selector:
    app: redis
  type: LoadBalancer

We run the command:

kubectl create -f ./redis-service.yaml

To check on the status of our redis service and to get the external IP of our redis service:

kubectl get service redis

Which should output something that resembles the following:

NAME      TYPE           CLUSTER-IP    EXTERNAL-IP    PORT(S)          AGE
redis     LoadBalancer   10.0.145.70   40.68.23.150   6379:30233/TCP   1m

Add tasks to redis queue

We can add tasks to our redis queue using the EXTERNAL-IP from the output above.

We’ll create a task queue temperature_job with the filename of each of the CSVs we want to analyse.

We can do this by running a short python script (remember to update the host to your external IP address):

import redis

host = '40.68.23.150'
port = 6379

r = redis.Redis(host=host, port=port)

files = ['temperature{}.csv'.format(i) for i in range(1, 7)]

# Push filenames onto queue
r.rpush('temperature_job', *files)

# View items in queue
print(r.lrange('temperature_job', 0, -1))

Create Azure Container Registry

Our redis-master container comes from an openly available redis image.

However, we’ll often want to use our own private containers that are not available on a public container registry, particularly when doing our processing. To do this, we can create our own Azure Container Registry.

We can do this using the following command:

az acr create --resource-group myAKSResourceGroup --name redisQueueContainerRegistry --sku Basic

Note that the container registry name must be unique. Once complete, we will see output with some information about our newly created container registry instance.

To log in to our newly created container registry so that we can push our own local images to it, we can run:

az acr login --name redisQueueContainerRegistry

And we should see the following output:

Login Succeeded

Allow Azure Kubernetes Service access to Azure Container Registry

In order for our AKS instance to pull images from our container registry, we need to allow it access to do so. We can run the following shell script to allow us to do this:

AKS_RESOURCE_GROUP=myAKSResourceGroup
AKS_CLUSTER_NAME=redisQueueCluster
ACR_RESOURCE_GROUP=myAKSResourceGroup
ACR_NAME=redisQueueContainerRegistry

# Get the id of the service principal configured for AKS 
CLIENT_ID=$(az aks show --resource-group $AKS_RESOURCE_GROUP --name $AKS_CLUSTER_NAME --query "servicePrincipalProfile.clientId" --output tsv)

# Get the ACR registry resource id 
ACR_ID=$(az acr show --name $ACR_NAME --resource-group $ACR_RESOURCE_GROUP --query "id" --output tsv) 

# Create role assignment
az role assignment create --assignee $CLIENT_ID --role Reader --scope $ACR_ID

Once complete, we’ll see an output confirming this operation of type Microsoft.Authorization/roleAssignments.

Upload CSVs to Azure Blob Storage

Create Storage Account

We’ll be creating an azure storage account to store our CSV files, we’ll run the following command to do this:

az storage account create \
    --name weatherfiles \
    --resource-group myAKSResourceGroup \
    --location westeurope \
    --sku Standard_LRS \
    --encryption blob

Once complete, we’ll see an output confirming so with details about our newly created storage account.

Get Account Keys

To get the credentials for our storage account:

az storage account keys list --account-name weatherfiles --resource-group myAKSResourceGroup

You should see an output that looks like:

[
  {
    "keyName": "key1",
    "permissions": "Full",
    "value": "<key1_value>"
  },
  {
    "keyName": "key2",
    "permissions": "Full",
    "value": "<key2_value>"
  }
]

We’ll want to store these credentials in environment variables:

export AZURE_STORAGE_ACCOUNT=weatherfiles
export AZURE_STORAGE_ACCESS_KEY=<key>

We’ll also need to create a json store these credentials azure_credentials.json, we’ll be using this later to import this key into your docker container without having to add it to the docker image, create this file as follows:

{
    "storage_account": "weatherfiles",
    "account_key": "<your account key>"
}

Create a container

We can now create a container for our CSV files:

az storage container create --name temperaturefiles

When created, we should see the output

{
  "created": true
}

Upload files to blob

To upload our CSV files we can run the following shell script:

for filename in temperature_csvs/*.csv; do
    az storage blob upload \
        --container-name temperaturefiles \
        --name $(basename $filename) \
        --file $filename
done

Creating worker to consume tasks and aggregate CSV files

We will be using the redis worker rediswq.py file that can be found in the kubernetes docs here and is also available in the repository for this blog post in order to read tasks from our redis queue. This provides helper functions for using redis as a queue including, for example, leasing items on the queue.

We’ll then define a python file to download our CSV files, this python file is available here but in this post, we’ll go through step-by-step.

We first define our imports, we’ll need the redis, azure-storage, numpy and pandas external packages installed and we’ll be importing from our rediswq.py file.

import datetime
from io import BytesIO
import json
import os

from azure.storage.blob import BlockBlobService
import pandas as pd
import numpy as np
import redis

import rediswq

Next we’ll define a function to get our azure storage credentials, we’ll grab the credentials path to the json file we created above (azure_credentials.json) from the environment variable AZURE_CREDENTIALS_PATH.

AZURE_CREDENTIALS_PATH = os.environ.get('AZURE_CREDENTIALS_PATH')
STORAGE_CONTAINER = 'temperaturefiles'


def get_account_credentials():
    with open(AZURE_CREDENTIALS_PATH, 'r') as f:
        azure_credentials = json.load(f)
    storage_account = azure_credentials.get('storage_account')
    account_key = azure_credentials.get('account_key')
    return storage_account, account_key

Then we download our temperature file from our azure storage account, we’ll stream the blob to a BytesIO object and read directly from into a pandas DataFrame this using pandas read_csv function:

def read_from_temperature_csv_file(filename, block_blob_service):
    my_stream_obj = BytesIO()
    block_blob_service.get_blob_to_stream(STORAGE_CONTAINER, filename, my_stream_obj)
    my_stream_obj.seek(0)
    return pd.read_csv(my_stream_obj)

We can then aggregate our DataFrame using the pandas DataFrame resample method. We first convert the Datetime column from strings to datetime objects, then set this column as our index for resampling. For more information on resampling time series data, see my blog post on resampling here.

def aggregate_df(temperature_df):
    dt_format = '%Y-%m-%d %H:%M:%S'
    temperature_df['Datetime'] = pd.to_datetime(temperature_df['Datetime'], format=dt_format)
    return temperature_df.set_index('Datetime').resample('H').agg([np.mean, np.std, np.max, np.min])

We can then save this aggregated temperature DataFrame to a CSV file in the BLOB store.

def save_aggregated_temperature(aggregated_df, filename, block_blob_service):
    output_filename = '.'.join(filename.split('.')[:-1]) + '_aggregated.csv'
    block_blob_service.create_blob_from_bytes(
        STORAGE_CONTAINER,
        output_filename,
        aggregated_df.to_csv(index=False).encode('utf-8'))

Then to put all the above functions together, we create one function that can be called with a filename parameter:

def aggregate_temperature_file(filename):
    storage_account, account_key = get_account_credentials()
    block_blob_service = BlockBlobService(account_name=storage_account, account_key=account_key)
    temperature_df = read_from_temperature_csv_file(filename, block_blob_service)
    aggregated_df = aggregate_df(temperature_df)
    save_aggregated_temperature(aggregated_df, filename, block_blob_service)

We will also create a main function that will read from the redis queue, determine whether there are any filenames on the redis queue and, if so, process the next filename on the queue.

def main():
    redis_host = os.environ.get("REDIS_HOST")
    if not redis_host:
        redis_host = "redis"
    q = rediswq.RedisWQ(name="temperature_job", host=redis_host)
    print("Worker with sessionID: " +  q.sessionID())
    print("Initial queue state: empty=" + str(q.empty()))
    while not q.empty():
        item = q.lease(lease_secs=180, block=True, timeout=2) 
        if item is not None:
            filename = item.decode("utf=8")
            print("Aggregating " + filename)
            aggregate_temperature_file(filename)
            q.complete(item)
        else:
            print("Waiting for work")
            import time
            time.sleep(5)


if __name__ == '__main__':
    main()

Containerising worker script

Our worker script needs to be containerised and we will use docker to containerise our script and push this docker image to the Azure Container Service, ready to be pulled by our Azure Kubernetes Service.

The Dockerfile here is a very simple one, it uses the publically-available python base image and pip installs some external package requirements, before copying over the python files we require and running our worker script:

FROM python
RUN pip install redis numpy pandas azure-storage
COPY ./aggregate_temperature.py /aggregate_temperature.py
COPY ./rediswq.py /rediswq.py

CMD  python aggregate_temperature.py

To build our docker image run from the root of the repository:

docker build -t aggregate-temperature .

You should see an output that resembles the following as the script runs through the steps:

Sending build context to Docker daemon  209.8MB
Step 1/5 : FROM python
 ---> a187104266fb
Step 2/5 : RUN pip install redis numpy pandas azure-storage
 ---> Using cache
 ---> 01a514bbd792
Step 3/5 : COPY ./aggregate_temperature.py /aggregate_temperature.py
 ---> 887ee9c41929
Step 4/5 : COPY ./rediswq.py /rediswq.py
 ---> 3fed313cadc4
Step 5/5 : CMD  python aggregate_temperature.py
 ---> Running in 3e7124bacf36
Removing intermediate container 3e7124bacf36
 ---> ce7a17b998af
Successfully built ce7a17b998af
Successfully tagged aggregate-temperature:latest

Pushing image to Azure Container Registry

Now that we’ve built our docker image, we can push it to our Azure Container Registry. First we need to get our Azure Container Registry server URL:

az acr list --resource-group myAKSResourceGroup --query "[].{acrLoginServer:loginServer}" --output table

This should output something like:

AcrLoginServer
--------------------------------------
redisqueuecontainerregistry.azurecr.io

This URL is then used to push our built container to the registry. If we assume the URL above, we can tag our image:

docker tag aggregate-temperature redisqueuecontainerregistry.azurecr.io/aggregate-temperature

Then push our image:

docker push redisqueuecontainerregistry.azurecr.io/aggregate-temperature

We should see an output that resembles the following:

The push refers to repository [redisqueuecontainerregistry.azurecr.io/aggregate-temperature]
fd7441059c0a: Pushed
366d45665d4a: Pushed
e43801e1fb1f: Pushed
4c9ede4ddbda: Pushed
c134b6c064f6: Pushed
8eb8b96ceebb: Pushed
d62f0ea9a15e: Pushed
9978d084fd77: Pushed
1191b3f5862a: Pushed
08a01612ffca: Pushed
8bb25f9cdc41: Pushed
f715ed19c28b: Pushed
latest: digest: sha256:1f41ac17d3322628b72e23011d869e3f57062256680926b88ad47f4ae6706a38 size: 2846

Creating Kubernetes Secret

Before we create the job to run scale out our processing of these CSV files, we’ll start with creating a Kubernetes secret. This is an encrypted repository that kubernetes uses to securely store keys that we don’t want to include in git/docker repositories.

We can create our secret for our Azure Storage Account credentials:

kubectl create secret generic azure-credentials --from-file=./azure_credentials.json

And we should see as an output:

secret "azure-credentials" created

Creating Kubernetes Job

We’ll now create our kubernetes job to scale out our processing of the CSV files from Azure Blob store.

We’ll use the job.yaml file with the configuration for our kubernetes job. This job file defines things like

  • Which image to use
  • How many pods we’ll be running in parallel
  • Where to mount the secret key directory
  • Our environment variable of the path to our azure credentials

And looks as follows:

apiVersion: batch/v1
kind: Job
metadata:
  name: temperature-job
spec:
  parallelism: 3
  template:
    metadata:
      name: temperature-job
    spec:
      containers:
      - name: c
        image: redisqueuecontainerregistry.azurecr.io/aggregate-temperature
        env:
        - name: AZURE_CREDENTIALS_PATH
          value: /var/secrets/azure/azure_credentials.json
        volumeMounts:
        - name: azure-credentials
          mountPath: "/var/secrets/azure"
          readOnly: true
      volumes:
      - name: azure-credentials
        secret:
          secretName: azure-credentials
      restartPolicy: OnFailure

To run this job:

kubectl create -f ./job.yaml

We should see an output of:

job.batch "temperature-job" created

To see a description of the job, we can run:

kubectl describe jobs/temperature-job

And to see the status of the pods we can run:

kubectl get pods

This output will look like:

NAME                    READY     STATUS              RESTARTS   AGE
redis-master            1/1       Running             0          20m
temperature-job-fqg92   0/1       ContainerCreating   0          38s
temperature-job-pprsp   0/1       ContainerCreating   0          38s
temperature-job-s4s48   0/1       ContainerCreating   0          38s

Until the containers are up and running, then:

NAME                    READY     STATUS    RESTARTS   AGE
redis-master            1/1       Running   0          24m
temperature-job-fqg92   1/1       Running   0          4m
temperature-job-pprsp   1/1       Running   0          4m
temperature-job-s4s48   1/1       Running   0          4m

Remove Resources

Because everything here was created in the same resource group, to remove everything, we can just run:

az group delete --name myAKSResourceGroup --yes --no-wait