Deploy resilient pipelines to serverless infrastructure
Learn how to handle pipeline failures and deploy to serverless infrastructure.
This tutorial builds off the Major League Baseball (MLB) flow created in the S3 and MotherDuck tutorial. Here you’ll experiment with adding failure handling and data quality checks to the flow, and deploy it with Prefect Cloud’s Managed Execution work pool.
Feel free to follow along with this YouTube video as you work through the different sections.
Prerequisites
The pipeline in this tutorial uses S3 and MotherDuck for storing data. MotherDuck is a serverless, cloud-optimized analytics platform built around DuckDB. To run the pipeline, you’ll need:
- A free Prefect Cloud account
- A free MotherDuck account, including:
- A MotherDuck token
- A Secret block in Prefect Cloud that contains the MotherDuck token
- An AWS account, including:
- An IAM user with “AmazonS3FullAccess” permissions
- An S3 bucket
- An S3 block in Prefect Cloud
If you’re missing any of the AWS and MotherDuck resources, you can create them by following the steps in the S3 and MotherDuck tutorial.
Setup the environment
Complete code examples can be found in the dev-day-zoom-out repository.
To kick things off, start by cloning the dev-day-zoom-out
repository by running the following command:
Follow along with the README.md file at the root of the repository to set up your environment. After you’re done, navigate to the directory where all the code for this tutorial is located.
Failure handling
Prefect offers a variety of ways to handle failures in your pipelines.
In this section, you’ll have the opportunity to explore a few different retry strategies.
To see each of these retry modes in action, you can copy each code snippet and replace the original get_recent_games
task in the MLB pipeline.
Run the following command to navigate to the directory containing the pipeline that you’ll modify.
The following code snippets show how to apply a different retries to the get_recent_games
task located in the mlb_flow.py
file.
The task itself has a few modifications to simulate random API failures.
Implement a simple retry
This example shows how to use the @task
decorator to apply a simple retry strategy.
Replace the existing get_recent_games
task with the following.
Implement a delayed retry
A delayed retry in Prefect is a mechanism that automatically retries a task or flow run after a specified delay when it fails.
This feature is particularly useful for handling transient errors or temporary issues with external systems, like rate limiting or network issues.
To implement this retry strategy, add a retry_delay_seconds
utility to the get_recent_games
task’s @task
decorator.
Implement an exponential retry
Exponential backoff is a retry strategy where the delay between retry attempts increases, well, exponentially!
This means that each subsequent retry attempt waits longer than the previous one.
In Prefect, this can be implemented by adding the exponential_backoff
utility to the get_recent_games
task’s @task
decorator.
Implement a retry handler
A retry handler in Prefect is a custom function that determines whether a task should be retried based on specific conditions.
This is useful for more complex retry logic, like retrying only when a specific exception is raised.
To implement a retry handler, you’ll need to define a separate function that Prefect will use to determine whether to retry a task.
This function will take three arguments: task
, task_run
, and state
.
It will return a boolean value indicating whether to retry the task.
Add the following code snippet to the mlb_flow.py
file.
After defining the handler, you can add it to the @task
decorator using the retry_condition_fn
parameter.
Replace the existing get_recent_games
task with the following.
If you’d like to see the complete examples for all of the retry options, run the following command:
Add a Data Quality Check
Prefect’s transactional interface is a powerful feature that enhances the resilience and idempotency of workflows. It contributes to improved runtime performance and autonomous task execution. Transactions also allow you to leverage on-commit and on-rollback hooks to handle side effects and failures. For this section, you’ll use the transactional interface to implement a data quality check for the raw game data.
Run the following command to navigate to the directory containing the pipeline that you’ll modify.
First, you’ll need to import the transaction
utility from the prefect.transactions
module.
The following code snippet can be added to the portion of script where the rest of the tasks are defined.
There is an on_rollback
hook that is designed to delete the file if the data quality check fails.
If less than 5 entries are found in the file, the data quality check will fail, triggering the rollback.
In your flow function, you’ll add a transaction block that will run the data quality check against the file containing the raw game data.
Replace the existing mlb_flow
function with the following.
To show the on-rollback hook in action, you can run the flow using a date range that will yield less game data than is required by the quality check. With the 10 second buffer that happens before the rollback is executed, you’ll see the raw data file spawn in and out of the raw_data folder, as it gets deleted by the hook.
If you’d like to see the complete code for this section, navigate to the directory by running the following command:
Deploy With Managed Execution
In this section, you’ll deploy the MLB flow to serverless infrastructure. That way you don’t have to keep your laptop on 24/7 if you’d like to run this flow on a schedule. Specifically, you’ll deploy the flow to Prefect’s Managed Execution work pool. Managed Execution enables you to run flows on Prefect Cloud’s infrastructure without needing to configure your own cloud provider infrastructure to run your work remotely.
Create a Managed work pool
If you’re authenticated to Prefect Cloud from your terminal, you can run the following command to create a new work pool.
You can create a new work pool in the Prefect Cloud UI by navigating to the Work Pools page.
Run the deployment script
Use this command to navigate to the directory containing the deployment script.
The following code snippet shows the contents of the mlb_flow_deploy.py
script, which uses the from_source()
and deploy()
methods.
The from_source()
method specifies the location of flow code when creating deployments.
It’s particularly useful when you want to pull flow code from a remote storage location at runtime, rather than having it baked into a Docker image.
You can see that it takes two arguments: A URL to a git repository or a storage object, and the path to the file containing the flow and the function name, separated by a colon.
The .deploy()
method enables you to define a deployment programmatically in Python code.
It takes a name for the deployment, the work pool to submit work to, and a dictionary of parameters to pass to the flow.
There is also a job_variables
parameter, which accepts a list of packages to install in the flow’s execution environment.
The package list is generated by the read_requirements
helper function.
The function reads the requirements.txt
file in the directory containing the deployment script, and returns a list of packages to install in the flow’s execution environment.
To see the deployment script in action, you can run it using the following command.
You now have a new deployment in Prefect Cloud! You can find it by navigating to the Deployments page in the Prefect Cloud UI. To kick off a new flow run, click the Quick run button.
Add scheduling
Nice work! You just deployed the MLB flow to serverless infrastructure. You can now add a schedule to the deployment so that it runs on a regular basis.
Schedule with the Python SDK
Cron schedules are commonly used to run workflows at specific times, days, or months. The following example shows how to add a cron schedule to the deployment so that the flow will run every day at 12:00 AM. Try it out for yourself by making the changes to the mlb_flow_deploy.py file and running it again.
After running the updated script, you’ll see that the deployment has a new schedule in the Prefect Cloud UI.
Prefect also supports interval
and rrule
schedule types.
Interval schedules are useful for running flows at specific intervals. For example, you can run a flow every 10 minutes.
For more complex recurrence patterns, Prefect supports RRule schedules. For example, running workflows every Monday, Wednesday, and Friday of the month at 4:00 PM.
Add a schedule with the Prefect Cloud UI
If you’d like to add a schedule to a deployment using the Prefect Cloud UI, navigate to the deployment and click + Schedule.
Next steps
In this tutorial, you learned how to:
- Handle pipeline failures and retries
- Implement data quality checks and rollbacks
- Deploy a flow to serverless infrastructure
- Add a schedule to a deployment
You can continue your exploration of Prefect by checking out the different work pool types to deploy your flows to Docker, Kubernetes, or your favorite cloud provider. If you’re interested in building more workflows, check out Train a machine learning model to learn how to build a machine learning pipeline.
Was this page helpful?