The Prefect Way to Automate & Orchestrate Data Pipelines
I am migrating all my ETL work from Airflow to this super-cool framework.
By Murallie Thuwarakesh, Data Scientist at Stax, Inc.
Illustration from Undraw
I was a big fan of Apache Airflow. Even today, I don’t have many complaints about it. But the new technology Prefect amazed me in many ways, and I can’t help but migrating everything to it.
Prefect (and Airflow) is a workflow automation tool. You can orchestrate individual tasks to do more complex work. You could manage task dependencies, retry tasks when they fail, schedule them, etc.
I trust workflow management is the backbone of every data science project. Even small projects can have remarkable benefits with a tool like Prefect. It eliminates a significant part of repetitive tasks. Not to mention, it also removes the mental clutter in a complex project.
This article covers some of the frequent questions about Prefect. It includes,
- a short intro to Prefect’s core concepts;
- why I decided to migrate from Airflow;
- Prefect’s incredible features and integration with other technologies, and;
- how to decide between its cloud vs. on-premise deployment options.
Quickstart Prefect.
Prefect is both a minimal and complete workflow management tool. It’s unbelievably simple to set up. Yet it can do everything tools such as Airflow can and more.
You can use PyPI, Conda, or Pipenv to install it, and it’s ready to rock. More on this in comparison with the Airflow section.
pip install prefect
# conda install -c conda-forge prefect
# pipenv install --pre prefect
Before we dive into use Prefect, let’s first see an unmanaged workflow. It makes understanding the role of Prefect in workflow management easy.
The below script queries an API (Extract — E), picks the relevant fields from it (Transform — T), and appends them to a file (Load — L). It contains three functions that perform each of the tasks mentioned. It’s a straightforward yet everyday use case of workflow management tools — ETL.
Code by the Author.
This script downloads weather data from the OpenWeatherMap API and stores the windspeed value in a file. ETL applications in real life could be complex. But this example application covers the fundamental aspects very well.
Note: Please replace the API key with a real one. You can get one from https://openweathermap.org/api.
You can run this script with the command python app.py
where app.py is the name of your script file. This will create a new file called windspeed.txt in the current directory with one value. It’s the windspeed at Boston, MA, at the time you reach the API. If you rerun the script, it’ll append another value to the same file.
Your first Prefect ETL workflow.
The above script works well. Yet, it lacks some critical features of a complete ETL, such as retrying and scheduling. Also, as mentioned earlier, a real-life ETL may have hundreds of tasks in a single workflow. Some of them can be run in parallel, whereas some depend on one or more other tasks.
Imagine if there is a temporary network issue that prevents you from calling the API. The script would fail immediately with no further attempt. In live applications, such downtimes aren’t a miracle. They happen for several reasons — server downtime, network downtime, server query limit exceeds.
Also, you have to manually execute the above script every time to update your windspeed.txt file. Yet, scheduling the workflow to run at a specific time in a predefined interval is common in ETL workflows.
This is where tools such as Prefect and Airflow come to the rescue. Here’s how you could tweak the above code to make it a Prefect workflow.
Code by the Author.
The @task
 decorator converts a regular python function into a Prefect task. The optional arguments allow you to specify its retry behavior. We’ve configured the function to attempt three times before it fails in the above example. We’ve also configured it to delay each retry by three minutes.
With this new setup, our ETL is resilient to network issues we discussed earlier.
To test its functioning, disconnect your computer from the network and run the script with python app.py
. You’ll see a message that the first attempt failed, and the next one will begin in the next 3 minutes. Within three minutes, connect your computer back to the internet. The already running script will now finish without any errors.
Scheduling workflows with Prefect.
Retrying is only part of the ETL story. Another challenge for many workflow applications is to run them in scheduled intervals. Prefect’s scheduling API is straightforward for any Python programmer. Here’s how it works.
Code by the Author.
We’ve created an IntervalSchedule object that starts five seconds from the execution of the script. We’ve also configured it to run in a one-minute interval.
If you run the script with python app.py
 and monitor the windspeed.txt file, you will see new values in it every minute.
In addition to this simple scheduling, Prefect’s schedule API offers more control over it. You can schedule workflows in a cron-like method, use clock time with timezones, or do more fun stuff like executing workflow only on weekends. I haven’t covered them all here, but Prefect's official docs about this are perfect.
The Prefect UI.
Like Airflow (and many others,) Prefect too ships with a server with a beautiful UI. It allows you to control and visualize your workflow executions.
Illustration by the Author.
To run this, you need to have docker and docker-compose installed on your computer. But starting it is surprisingly a single command.
$ prefect server start
Illustration by the Author.
This command will start the prefect server, and you can access it through your web browser:Â http://localhost:8080/
.
However, the Prefect server alone could not execute your workflows. Its role is only enabling a control pannel to all your Prefect activities. Because this dashboard is decoupled from the rest of the application, you can use the Prefect cloud to do the same. We’ll discuss this in detail later.
To execute tasks, we need a few more things. The good news is, they, too, aren’t complicated.
Because servers are only a control panel, we need an agent to execute the workflow. The below command will start a local agent. Instead of a local agent, you can choose a docker agent or a Kubernetes one if your project needs them.
$ prefect agent local start
Illustration by the Author.
Once the server and the agent are running, you’ll have to create a project and register your workflow with that project. To do this, change the line that executes the flow to the following.
Code by the Author.
Now in the terminal, you can create a project with the prefect create project <project name>
 command. Then rerunning the script will register it to the project instead of running it immediately.
$ prefect create project 'Tutorial'
$ python app.py
Illustration by the Author.
In the web UI, you can see the new Project ‘Tutorial’ is in the dropdown, and our windspeed tracker is in the list of flows. The flow is already scheduled and running. If you prefer, you can run them manually as well.
Illustration by the Author.
Running workflows with parameters.
The workflow we created in the previous exercise is rigid. It queries only for Boston, MA, and we can not change it. This is where we can use parameters. Here’s how we tweak our code to accept a parameter at the run time.
Code by the Author.
We’ve changed the function to accept the city argument and set it dynamically in the API query. Inside the Flow, we create a parameter object with the default value ‘Boston’ and pass it to the Extract task.
If you run the windspeed tracker workflow manually in the UI, you’ll see a section called input. Here you can set the value of the city for every execution.
Illustration by the Author.
This is a convenient way to run workflows. In many cases, ETLs and any other workflow come with run-time parameters.
Why did I decide to migrate from Airflow to Prefect?
Airflow is a fantastic platform for workflow management. It saved me a ton of time on many projects. Yet, we need to appreciate new technologies taking over the old ones. That’s the case with Airflow and Prefect.
Airflow got many things right, but its core assumptions never anticipated the rich variety of data applications that have emerged.
— Prefect Documentation.
What I describe here aren’t dead-ends if you’re preferring Airflow. We have workarounds for most problems. Yet, it’s convenient in Prefect because the tool natively supports them.
Prefect’s installation is exceptionally straightforward compared to Airflow. For trained eyes, it may not be a problem. Yet, for whoever wants to start on workflow orchestration and automation, it’s a hassle.
Airflow needs a server running in the backend to perform any task. Yet, in Prefect, a server is optional. This is a massive benefit of using Prefect. I have many pet projects running on my computer as services. Earlier, I had to have an Airflow server commencing at the startup. Because Prefect could run standalone, I don’t have to turn on this additional server anymore.
Airflow doesn’t have the flexibility to run workflows (or DAGs) with parameters. The workaround I use to have is to let the application read them from a database. This isn’t an excellent programming technique for such a simple task. Prefect’s parameter concept is exceptional on this front.
Prefect allows having different versions of the same workflow. Every time you register a workflow to the project, it creates a new version. If you need to run a previous version, you can easily select it in a dropdown. This isn’t possible with Airflow.
Prefect also allows us to create teams and role-based access controls. Each team could manage its configuration. Authorization is a critical part of every modern application, and Prefect handles it in the best way possible.
Lastly, I find Prefect’s UI more intuitive and appealing. Airflow’s UI, especially its task execution visualization, was difficult at first to understand.
Prefect’s ecosystem and integration with other technologies.
Prefect has inbuilt integration with many other technologies. It eliminates a ton of overhead and makes working with them super easy.
Live projects often have to deal with several technologies. For example, when your ETL fails, you may want to send an email or a Slack notification to the maintainer.
In Prefect, sending such notifications is effortless. You can use the EmailTask from the Prefect’s task library, set the credentials, and start sending emails.
You can learn more about Prefect’s rich ecosystem in their official documentation. In this article, we’ll see how to send email notifications.
To send emails, we need to make the credentials accessible to the Prefect agent. You can do that by creating the below file in $HOME/.prefect/config.toml
.
Code by the Author.
Your app is now ready to send emails. Here’s how we send a notification when we successfully captured a windspeed measure.
Code by the Author.
In the above code, we’ve created an instance of the EmailTask class. We’ve used all the static elements of our email configurations during initiating. Then inside the Flow, we’ve used it with passing variable content.
This configuration above will send an email with the captured windspeed measurement. But its subject will always remain ‘A new windspeed captured.’
Perfect Cloud vs. On-Premis Server Deployments.
We’ve already looked into how we can start an on-premise server. Because this server is only a control panel, you could easily use the cloud version instead. To do this, we have few additional steps to follow.
- Create a Prefect cloud account.
- Generate a key from the API Key Page.
- In your terminal, set the backend to cloud:Â
prefect backend cloud
. - Also login with the generated key:Â
prefect auth login --key YOUR_API_KEY
. - Now, start the agent as usual.Â
prefect agent local start
.
In the cloud dashboard, you can manage everything you did on the local server before.
A big question when choosing between cloud and server versions is security. According to Prefect‘s docs, the server only stores workflow execution-related data and voluntary information provided by the user. Since the agent in your local computer executes the logic, you can control where you store your data.
The cloud option is suitable for performance reasons too. With one cloud server, you can manage more than one agent. Thus, you can scale your app effortlessly.
Final Thoughts
Airflow was my ultimate choice for building ETLs and other workflow management applications. Yet, Prefect changed my mind, and now I’m migrating everything from Airflow to Prefect.
Prefect is a straightforward tool that is flexible to extend beyond what Airflow can do. You can run it even inside a Jupyter notebook. Also, you can host it as a complete task management solution.
In addition to the central problem of workflow management, Prefect solves several other issues you may frequently encounter in a live system. Managing teams with authorization controls, sending notifications are some of them.
In this article, we’ve discussed how to create an ETL that
- retries some tasks as configured;
- run workflows in a schedule;
- accepts run-time parameters, and;
- sends an email notification when it’s done.
We’ve only scratched the surface of Prefects capabilities. I recommend reading the official documentation for more information.
Thanks for reading, friend! It seems you, and I have lots of common interests. I’d love to connect with you on LinkedIn, Twitter, and Medium.
Not a Medium member yet? Please use this link to become a member. You can enjoy thousands of insightful articles and support me as I earn a small commission for referring you.
Bio: Murallie Thuwarakesh (@Thuwarakesh) is a Data Scientist at Stax, Inc., and a top writer on Medium for Analytics. Murallie shares what he explores in data science every day.
Original. Reposted with permission.
Related:
- Prefect: How to Write and Schedule Your First ETL Pipeline with Python
- Build a synthetic data pipeline using Gretel and Apache Airflow
- Development & Testing of ETL Pipelines for AWS Locally