The technologies that power the burst of AI development have been developing rapidly over the last few years. In order to make a data product, these technologies need to be strung together. In addition, they must be joined with a general-purpose (micro)service, database, etcetera. In the words of Vicky Boykis, We’re still in the steam-powered days of machine learning.
At Royal FloraHolland, we’re building a variety of data products. When doing this, like other data science teams, we’re confronted with the job to join two worlds - that of data and that of services.
The component that’s specific to data products, in addition to those used for microservices in general, is Airflow. Like other monitoring and logging components, there is only one instance of the Airflow service. It contains the logic for all jobs to be run, including their dependency graphs. In addition, it needs to be able to access all data sources and sinks and computing infrastructure to execute jobs.
An agile team that develops a service is empowered to deploy it to production and monitor its performance and logging. The you build it, you run it philosophy allows quick iterations, balancing new functionality with improved resilience. Central to this development is the capacity to be self-sufficient, to be able to control all components (database, authorization, message bus, caching etcetera).
At Royal FloraHolland, the finest marketplace for flowers and plants for over one hundred years, we are developing more and more data products. Empowering growers with real-time analytics in Floriday Insights, helping them set sensible prices in the Clock Presales, assessing quality and consistency of batch photos, etcetera.
As the Data Science team, we try to blend in as much as we can with the
wider development landscape - consisting of Floriday,
Floramondo and
FloraXchange. By deploying our services
on AWS, consolidating logs in logz.io, gathering metrics
in Cloudwatch Metrics and
visualizing in Grafana. We develop using
GitLab for CI/CD, engaging CloudFormation to
manage resources - either with templates or the
Cloud Development Kit.
We’re a bit peculiar in our preference for Python, but since the rest of the
Floriday is an eclectic mix of Java, Kotlin, .NET Core and Typescript, this is
hardly something that gets in the way of interoperability.
Each product is typically a separate GitLab project, with a CloudFormation stack for each the different AWS accounts we use. CloudFormation manages the ECS containers for services, the Lambdas for stream processing and inter-account data transfer, the Sagemaker training jobs for model training, Aurora Serverless for databases, Kinesis for real-time data transfer and S3 for storage of data and models. For security, we only allow access to resources when that’s needed, using IAM roles and policies. While developing the deployment infrastructure consists of always-too-slow iterations, CloudFormation generally works well for deployment of the different components that make up a data product.
The one architectural component that breaks the project-product duality is
Airflow. DAGs that logically and functionally belong to a project become part
of a different airflow-dags
project. In addition to hoarding code from each
of the data products that we have, airflow-dags
forces them to agree on
dependencies. Upgrading the DAG of one project suddenly forces one to upgrade
(and test!) all other DAGs that use the same (transitive) dependency. Apart
from being a code magnet, the Airflow service needs to have access to all
resources that are accessed by any of the DAGs. This makes it very easy for data
dependencies to hide - one project may unknowingly use the policies attached to
the Airflow service by another project to access data.
In object-oriented programming we would refer to Airflow as the God Object.
Airflow is very powerful, allowing dynamic generation of DAGs and able to talk to a deluge of external services. But truth be told, we do not need all of this power. I gladly sacrifice most of this power in preference of a solution that keeps projects independent.
Imaging that we could treat DAGs just like another AWS resource type. Each project would contain not just the code that is needed for an ETL or a training job, but would be able to define the DAG, its tasks, scheduling, alerting etcetera. Deployment would be specified in the CloudFormation template for the AWS enviroment that executes the code. And of course we want scheduling, monitoring and alerting to be available from a single place - allowing inspection of the logs and to trigger jobs.
In order to do this, we need to do two things:
The code that we use to do this is available in the airflow-ecs-dags project.
In order to execute tasks, we need to launch an ECS task from an Airflow task.
Luckily, Airflow contrib
already contains an
ECS Operator.
The only thing missing for our purposes is the fetching of the logs - so that
we can read the logs in the Airflow user interface.
This is the tricky part. We need to encode the DAG (Directed Acyclic Graph)
in some way - preferably a way that’s not too cumbersome to write or to digest.
Ideally, the DAG definition should be part of the “metadata” of an ECS Task.
Since there is no natural way to provide such additional, non-functional,
data we had to come up with our own. Luckily a ContainerDefinition
can
contain additional information in the form of DockerLabels
.
An ECS-Task-DAG in our projects now looks like this (in CloudFormation YAML syntax):
AirflowDag:
Type: AWS::ECS::TaskDefinition
Properties:
ContainerDefinitions:
- Cpu: 64
Essential: true
Image: "task-image:123-abcdefg"
Memory: 1024
MemoryReservation: 256
Name: airflow-dag
DockerLabels:
airflow.dag.name: 'convert-it'
airflow.dag.owner: 'Frank'
airflow.dag.depends_on_past: 'false'
airflow.dag.start_date: '2019-12-04T03:00:00'
airflow.dag.email_on_failure: "true"
airflow.dag.email_on_retry: "false"
airflow.dag.concurrency: "12"
airflow.dag.retries: "1"
airflow.dag.email.0: 'Frank@rfh.example.com'
airflow.dag.email.1: 'DataScience@rfh.example.com'
airflow.tasks.latest_only.class: 'airflow.operators.latest_only_operator.LatestOnlyOperator'
airflow.tasks.convert.args.0: '{{ ds }}'
airflow.tasks.convert.depends.0: 'latest_only'
Family: "airflow-train-dag"
TaskRoleArn: !Ref AirflowDagTaskRole
This DAG consists of two tasks,
latest_only
, a standard Airflow operator.convert
, a command on the container that takes Airflow’s execution date
as argument. Note that this is specified using an Airflow macro.The corresponding container has a simple Dockerfile
:
FROM python:3.7
COPY . .
ENTRYPOINT ["python", "run.py"]
And a python
file contains the actual logic:
from datetime import datetime
def convert(run_date):
dt = datetime.strptime(run_date, "%Y-%m-%d")
print(f'Converted data of date {dt}')
if __name__ == '__main__':
command = sys.argv[1]
if command == 'convert':
convert(run_date=sys.argv[2])
else:
raise Exception(f'Unknown command {command}')
Decoupling the task definition from Airflow introduces some limitations to what we can do:
While these pieces of functionality certainly can be put to good use, they have not presented unsurmountable challenges to us. We were able to:
airflow-dags
project for
those tasks (e.g. sensors) that were common to many DAGs.xcom
use. It was not used a lot, we had no problem eliminating it.By moving our DAG definitions to the projects that need them, we were able to eliminate a lot of interdependencies between projects. Deploying DAGs as part of our (already existing) CloudFormation stacks also makes it easy to synchronize changes to the DAG with those in the model definition (e.g. Sagemaker), the service (ECS), storage (S3) and database (RDS) or any other resource that is present in the stack.
Where previously we had periods of great instability when we were developing operators, or upgrading dependencies, that were shared by multiple projects, this now no longer occurs.
Be sure to check out the airflow-ecs-dags project and feel free to comment!