In this post we explain how to use pipelines as part of an agile data science workflow. The primary benefits pipelines provide are
There are many pipelining tools available: Spring Cloud Data Flow for production environments, Airflow and Luigi for general batch processes, and Azkaban and Oozie for Hadoop workflows. We will walk through an example of a Luigi pipeline we used to analyze network traffic logs stored in Greenplum Database (GPDB). We chose Luigi because it is simple to use and written in Python, a staple language for data science.
The example code base and a summary of the use case can be found here. At a high level, our workflow consists of detecting anomalous network traffic using principal component analysis. In total, we developed 24 models - one for each hour of the day. We define the pipeline dependency graph in a single script
pipeline.py. Below we show the header of
pipeline.py, which configures our database connection and pipeline parameters.
First, we initialize a
PSQLConn() object defined in
utils.py to create a database connection factory using the psycopg2 library. We store as environment variables the credentials to connect to our GPDB database. Next we define three classes:
PathConfig which inherit from the
luigi.Config class. These classes configure table and column names in the database, model hyper-parameters, and output path parameters respectively. The values for these variables are set in the pipeline configuration file:
luigi.cfg. By default Luigi searches for this file in the current working directory, but we can also set its path with the environment variable:
LUIGI_CONFIG_PATH. For each section in
luigi.cfg we define a corresponding class with an identical name in
pipeline.py and create a class attribute for each entry in the section.
The building blocks of a pipeline are tasks. The dependency graph for all tasks that make up the workflow in our example is shown below.
The workflow consists of 24 overlapping branches, one for each model. Each branch is broken down into 5 types of tasks as follows
Below is the code for a task which takes in a table representing a sparse matrix and computes the principal components in an output table using the
pca_sparse_train function in MADlib.
A general pipeline task is specified by writing a class which inherits from
luigi.Task and by defining three methods:
requires() method returns a list of the previous task dependencies:
A Luigi task is uniquely specified by its class name and the value of its parameters. That is, two tasks with the same class name and parameters of the same values are in fact the same instance. Luigi parameters serve as a shortcut for defining task constructors.
CreatePCAInputTable()each have two parameters:
hour_id, which represent the current day and the hour of the day we are modeling respectively. In the
requires() method, we specify that
RunPCATask() depends on the input matrix table for the same
hour_id and with all data up to the current day. So each day we run our pipeline, this task is executed 24 times, once for each value of
output() method returns a Luigi target object such as a local file, a file on HDFS, or an entry in a database table to indicate successful completion of a task. A target class contains one method:
exists() which returns
True if it exists. Luigi considers a task successfully completed if and only if the target defined in the task’s
output() method exists. Before executing
RunPCATask(), Luigi checks for the existence of the output target from the
CreatePCAInputTable() task with the same values for the
hour_id parameters. If this target does not exist, Luigi will first run the required task.
run() method contains the logic for executing the task. Each of our tasks consists of the following steps: establishing a connection to the GPDB instance, executing a SQL script in the database, and writing to the output file. The key point is that the query is executed and new tables are created remotely in the database. The pipeline simply acts as a tool to parameterize, coordinate and run SQL queries in GPDB. The only data that is written locally to disc is what we write to the
output() file. For this reason we should use caution to not write large volumes of data to the output file.
After connecting to the database and setting our query parameters, we call the function
utils.py reproduced below.
This function is a simple python wrapper around the PL/pgSQL function we created during the exploratory phase of our analysis to drop/create the necessary tables and execute PCA using MADlib. To safeguard against SQL injection, we bind the python arguments to our function to the query string by using the
mogrify() method and wrapping our parameters in a custom identifier class
QuotedIdentifier, which ensures that the arguments are valid SQL. The identifier checks the function arguments consist solely of alpha-numeric characters, underscores and a single period for parameters containing a schema with a table name.
The final step after running our query is writing to the output target. For this example, we simply select the first row of the output table and copy it to a file. This verifies that we successfully created and populated a new table with the correct column structure in our database. Alternatively, it may be useful to write sanity checks or log interesting info about the new table such as the number of rows in the output table or the variances of the principal components for example. For our final task, which generates a table of anomalous users, we write the list of users to a file for further investigation.
If our task consists of simply executing a query in the database and we do not wish to record any info to a file, we can implement our task as a subclass of the Luigi
PostgresQuery task. The output for this task is a
PostgresTarget, which is an entry in a marker table which contains a list of the tasks completed and their date of execution. For example, to implement the
InitializeUserDefinedFunctions() task as a
PostgresQuery task, we write:
table defines the task description in the marker table, and
query defines the SQL executed by the task. Here we simply call the helper function
utils.py that opens the relevant SQL script as a file object and returns the content as a string since no formating with parameters is required. The difference between a
PostgresQuery task and our other tasks are essentially stylistic.
To complete the pipeline, we define our pipeline dependency graph in a final wrapper task which inherits from
luigi.WrapperTask. In the
requires() method of the wrapper class, we return a list of all the tasks of the pipeline. To run the full pipeline end-to-end, we simply call the wrapper task from command line. Wrapper tasks are particularly useful for pipelines such as ours where we want to call a task multiple times with different parameters.
A major advantage to wrapping our workflow into a pipeline is the ability to incorporate data level tests into our workflow. In our example pipeline we run a simple test checking each column of newly created tables for the presence of null values. Another example of a commonly useful data level test is checking the cardinality of the join keys before joining two tables, as a duplicate record could cause a many-to-many join, which may not be the intended behavior. With tests that break the workflow early on failure and alert the user, we make it easier to pinpoint issues in the data and can save on wasted computation time. To notify others of pipeline failures, Luigi supports email alerts which can be configured in the
luigi.cfg config file.
Before running a pipeline in our analytics environment on the full data set, we recommend testing it locally on a small sample of data. On smaller data sets we can discover bugs quicker, and testing locally eliminates the risk of accidentally deleting a table in our analytics environment. To run GPDB locally, we install the Pivotal Greenplum Sandbox Virtual Machine which is available for free download on the Pivotal Network. This sandbox is an easy-to-use virtual machine which runs in either VirtualBox or VMware Fusion and contains a pre-built instance of the open source Greenplum database, PL/R and PL/Python as well as the Apache MADlib (incubating) library.
After loading the necessary tables in the GPDB sandbox, configuring the environment variables for the database connection to point to the sandbox database and setting
luigi.cfg to match with the local tables, we are ready to run the pipeline. We start the local-scheduler with
$ luigid —background
To run the pipeline end-to-end we run the wrapper task (
$ python -m luigi —module pipeline PipelineTask
We can view our pipeline status and the dependency graph by opening a browser to http://localhost:8082.
In summary, while building a pipeline adds some overhead to your workflow, the benefits of coordinating the running, monitoring and data testing multiple iterations of end-to-end models can be well worth the cost. If you have a workflow with long running batch processes, such as regularly training and scoring variations of a model, then the ability to wrap your workflow into a pipeline is a worthwhile tool to have in your arsenal.