Whenever you need to slice and dice a massive dataset, and pandas or SQL just can’t do the trick - pam pam pam, have no fear Spark is here.
This post comes after long and lonesome search for a tutorial on how to spin-up an EMR Cluster, extract the relevant data from a RDS table, executes a set of commands using Pyspark and load the results to a S3 bucket.
The goal of this post is to provide an easy and comprehensive, step by step, guide, and save the time and frustration to anyone who wishes to construct such a system.
In this post we will start with formulating the problem, go over the necessary tools and finally describe the proposed solution.
Small disclaimer - an active AWS account is necessary for this tutorial.
Lets start with what we wish to solve.
The problem that we are facing can be broken down into 3 (as always) sub-problems:
- Spinning up a cluster with all the relevant dependencies.
- Connecting to a data source for fetching and loading the data.
- Processing the raw data using the Spark framework.
In our solution we will use the following tools:
- Apache Spark - an open-source distributed general-purpose cluster-computing framework.
- Pyspark - the Python API for Spark.
- EMR service - Elastic Map Reduce is AWS managed Hadoop framework.
- S3 service - AWS storage service.
- Boto 3 - Boto is the AWS SDK for Python. Enables Python developers to create, configure, and manage AWS services.
And now for the good stuff!
Our solution is broken down to 3 main scripts and a configuration file.
- EMR Handler - the python script that is in charge of spinning-up, configuring and running the EMR cluster.
- Bootstrap - the script used to configure each node in the cluster.
- Processing - the python script that executes the data processing commands.
We will start with the EMR Handler - in the following gist we see the initialization of the EMR Handler object with the values form the configuration file. Also, we find the load cluster method - which calls the the EMR service using the boto client.
Lets drill down on each of the arguments of the load cluster method, for they are the essential part of configuring the cluster to suite our task.
- Name - the name of the cluster.
- LogUri - where the generated logs are stored.
- ReleaseLabel - the software version (5.20 at the time when this post was written)
- Instances - the instance, of which the cluster is composed of, configuration .
- Applications - application necessary for running the tasks at hand.
- BootstrapActions - the configuration of the actions that each node (instance) will take prior to processing phase, in our case setting all the dependencies.
- JobFlowRole, ServiceRole - Roles for security.
- Configurations - spark environment configurations.
- Tags - for reporting or filtering.
- Steps - the steps that are submitted.
For the sake of readability, a method is defined for each major argument:,
Here, I believe, the parameters are very self explanatory, apart from InstanceCount, that receives an integer N that defines N-1 slaves and a single master:
Bootstrap Actions Configuration
Defines where the Bootstrap script is located on S3:
Defines the environment variables necessary to use Pyspark in the submitted tasks (we are going to use the Conda’s python interpreter as our Pyspark interpreter):
An important part here is the extra-paths section that is needed for the use of the MySQL connector.
Defines the steps that will run one spark-submit will be initiated:
An important part in the ‘Run Computations Script’ step is the ‘–packages’ parameters that are needed for the MySQL connector.
For uploading the files to S3:
Constructing the EMR handler, uploading the necessary files to S3, and loading the cluster:
EMR Service Configuration File
A yml file containing all the necessary parameters:
Bootstrap Actions Script
Installing Conda and all the necessary dependencies.
Process Data Script
The file that is uploaded to S3 and executed in one of the spark-submit steps.
It contains a method for extracting data from RDS using Spark, processing this data, and loading the results to S3,
The processing itself is only a select column operation, and can be substituted by any Spark functionality that you wish.
This file also shows the proper way to use a logger within the EMR cluster.
In this tutorial we have gone through the steps needed to spin-up an EMR Cluster, read the relevant data from a table in RDS, executes a set of commands using Pyspark and finally load the results to S3.
I hope this guide helped shed some light on how to use EMR to achieve the relevant results and would like to thank Nimrod Milo for helping me on this glorious quest.