This tutorial builds off the Major League Baseball (MLB) flow created in the S3 and MotherDuck tutorial. Here you’ll experiment with adding failure handling and data quality checks to the flow, and deploy it with Prefect Cloud’s Managed Execution work pool.

Feel free to follow along with this YouTube video as you work through the different sections.

Prerequisites

The pipeline in this tutorial uses S3 and MotherDuck for storing data. MotherDuck is a serverless, cloud-optimized analytics platform built around DuckDB. To run the pipeline, you’ll need:

  • A free Prefect Cloud account
  • A free MotherDuck account, including:
    • A MotherDuck token
    • A Secret block in Prefect Cloud that contains the MotherDuck token
  • An AWS account, including:
    • An IAM user with “AmazonS3FullAccess” permissions
    • An S3 bucket
    • An S3 block in Prefect Cloud

If you’re missing any of the AWS and MotherDuck resources, you can create them by following the steps in the S3 and MotherDuck tutorial.

Setup the environment

Complete code examples can be found in the dev-day-zoom-out repository.

To kick things off, start by cloning the dev-day-zoom-out repository by running the following command:

git clone https://github.com/PrefectHQ/dev-day-zoom-out.git

Follow along with the README.md file at the root of the repository to set up your environment. After you’re done, navigate to the directory where all the code for this tutorial is located.

cd dev-day-zoom-out/track_1_build_workflows/session_2_resilent_workflows/

Failure handling

Prefect offers a variety of ways to handle failures in your pipelines. In this section, you’ll have the opportunity to explore a few different retry strategies. To see each of these retry modes in action, you can copy each code snippet and replace the original get_recent_games task in the MLB pipeline.

Run the following command to navigate to the directory containing the pipeline that you’ll modify.

cd dev-day-zoom-out/track_1_build_workflows/session_2_resilent_workflows/1_starting_flow

The following code snippets show how to apply a different retries to the get_recent_games task located in the mlb_flow.py file. The task itself has a few modifications to simulate random API failures.

Implement a simple retry

This example shows how to use the @task decorator to apply a simple retry strategy. Replace the existing get_recent_games task with the following.


# Get recent games, and retry 10 times if the API fails.
@task(retries=10)
def get_recent_games(team_name, start_date, end_date):
     # Simulate random API failure (70% chance).
    if random.random() < 0.7:
        time.sleep(2)  # Add a delay to see the retries in action.
        raise Exception("Simulated API failure: MLB Stats API is temporarily unavailable")
    
    # Get all games for the provided team and date range.
    team = statsapi.lookup_team(team_name)
    schedule = statsapi.schedule(team=team[0]["id"], start_date=start_date, end_date=end_date)
    for game in schedule:
        print(game['game_id'])
    return [game['game_id'] for game in schedule]

Implement a delayed retry

A delayed retry in Prefect is a mechanism that automatically retries a task or flow run after a specified delay when it fails. This feature is particularly useful for handling transient errors or temporary issues with external systems, like rate limiting or network issues. To implement this retry strategy, add a retry_delay_seconds utility to the get_recent_games task’s @task decorator.


# The retry_delay_seconds option accepts a list of integers for customized retry behavior.
# This task will retry 10 times with a delay of 1 second each time.
@task(retries=10, retry_delay_seconds=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
def get_recent_games(team_name, start_date, end_date):
    # Simulate random API failure (70% chance).
    if random.random() < 0.7:
        raise Exception("Simulated API failure: MLB Stats API is temporarily unavailable")
    
    # If no failure, proceed with actual API call.
    team = statsapi.lookup_team(team_name)
    schedule = statsapi.schedule(team=team[0]["id"], start_date=start_date, end_date=end_date)
    for game in schedule:
        print(game['game_id'])
    return [game['game_id'] for game in schedule]

Implement an exponential retry

Exponential backoff is a retry strategy where the delay between retry attempts increases, well, exponentially! This means that each subsequent retry attempt waits longer than the previous one. In Prefect, this can be implemented by adding the exponential_backoff utility to the get_recent_games task’s @task decorator.

# The exponential_backoff utility will automatically generate a list of retry delays that correspond to an exponential backoff retry strategy.
# This task will retry 10 times with a delay of 2, 4, 8, and 16 seconds.
@task(retries=4, retry_delay_seconds=exponential_backoff(backoff_factor=2))
def get_recent_games(team_name, start_date, end_date):
    # Simulate random API failure (70% chance).
    if random.random() < 0.1:
        raise Exception("Simulated API failure: MLB Stats API is temporarily unavailable")
    
    # Get all games for the provided team and date range.
    team = statsapi.lookup_team(team_name)
    schedule = statsapi.schedule(team=team[0]["id"], start_date=start_date, end_date=end_date)
    for game in schedule:
        print(game['game_id'])
    return [game['game_id'] for game in schedule]

Implement a retry handler

A retry handler in Prefect is a custom function that determines whether a task should be retried based on specific conditions. This is useful for more complex retry logic, like retrying only when a specific exception is raised. To implement a retry handler, you’ll need to define a separate function that Prefect will use to determine whether to retry a task. This function will take three arguments: task, task_run, and state. It will return a boolean value indicating whether to retry the task. Add the following code snippet to the mlb_flow.py file.


# Custom retry handler.
def retry_handler(task, task_run, state) -> bool:
    """Custom retry handler that specifies when to retry a task"""
    try:
        # Attempt to get the result of the task.
        state.result()
    except Exception as e:
        # If Exception is a TimeoutError, retry.
        if isinstance(e, TimeoutError):
            return True
        # For any other exception, do not retry.
        return False

After defining the handler, you can add it to the @task decorator using the retry_condition_fn parameter. Replace the existing get_recent_games task with the following.

# The retry_condition_fn parameter enables us to specify a custom retry condition function
@task(retries=10, retry_condition_fn=retry_handler)
def get_recent_games(team_name, start_date, end_date):
    # Generate random number.
    failure_chance = random.random()
    
    # Simulate different types of failures.
    if failure_chance < 0.3:
        time.sleep(2)  # Allow us to see the retries in action.
        raise Exception("Simulated API failure: MLB Stats API is temporarily unavailable")
    elif failure_chance >= 0.4:
        time.sleep(2)  # Allow us to see the retries in action.
        raise TimeoutError("Simulated timeout: Request took too long") # Simulate empty response.
    
    # If no failure, proceed with actual API call.
    team = statsapi.lookup_team(team_name)
    schedule = statsapi.schedule(team=team[0]["id"], start_date=start_date, end_date=end_date)
    for game in schedule:
        print(game['game_id'])
    return [game['game_id'] for game in schedule]

If you’d like to see the complete examples for all of the retry options, run the following command:

cd dev-day-zoom-out/track_1_build_workflows/session_2_resilent_workflows/2_retries

Add a Data Quality Check

Prefect’s transactional interface is a powerful feature that enhances the resilience and idempotency of workflows. It contributes to improved runtime performance and autonomous task execution. Transactions also allow you to leverage on-commit and on-rollback hooks to handle side effects and failures. For this section, you’ll use the transactional interface to implement a data quality check for the raw game data.

Run the following command to navigate to the directory containing the pipeline that you’ll modify.

cd dev-day-zoom-out/track_1_build_workflows/session_2_resilent_workflows/1_starting_flow

First, you’ll need to import the transaction utility from the prefect.transactions module.

The following code snippet can be added to the portion of script where the rest of the tasks are defined. There is an on_rollback hook that is designed to delete the file if the data quality check fails. If less than 5 entries are found in the file, the data quality check will fail, triggering the rollback.

In your flow function, you’ll add a transaction block that will run the data quality check against the file containing the raw game data. Replace the existing mlb_flow function with the following.

To show the on-rollback hook in action, you can run the flow using a date range that will yield less game data than is required by the quality check. With the 10 second buffer that happens before the rollback is executed, you’ll see the raw data file spawn in and out of the raw_data folder, as it gets deleted by the hook.

If you’d like to see the complete code for this section, navigate to the directory by running the following command:

cd dev-day-zoom-out/track_1_build_workflows/session_2_resilent_workflows/3_rollbacks

Deploy With Managed Execution

In this section, you’ll deploy the MLB flow to serverless infrastructure. That way you don’t have to keep your laptop on 24/7 if you’d like to run this flow on a schedule. Specifically, you’ll deploy the flow to Prefect’s Managed Execution work pool. Managed Execution enables you to run flows on Prefect Cloud’s infrastructure without needing to configure your own cloud provider infrastructure to run your work remotely.

Create a Managed work pool

If you’re authenticated to Prefect Cloud from your terminal, you can run the following command to create a new work pool.

prefect work-pool create managed-pool --type prefect:managed

You can create a new work pool in the Prefect Cloud UI by navigating to the Work Pools page.

Run the deployment script

Use this command to navigate to the directory containing the deployment script.

cd dev-day-zoom-out/track_1_build_workflows/session_2_resilent_workflows/4_deploy_and_schedule

The following code snippet shows the contents of the mlb_flow_deploy.py script, which uses the from_source() and deploy() methods.

from prefect import flow
from pathlib import Path

# This helper function reads the requirements.txt file.
def read_requirements(file_path="requirements.txt"):
    """Read and parse requirements.txt file"""
    requirements = Path(file_path).read_text().splitlines()
    # Filter out empty lines and comments.
    return [req.strip() for req in requirements if req.strip() and not req.startswith('#')]

if __name__ == "__main__":
    flow.from_source(
        source="https://github.com/PrefectHQ/dev-day-zoom-out.git",
        entrypoint="track_1_build_workflows/session_2_resilent_workflows/4_deploy_and_schedule/mlb_flow_managed.py:mlb_flow",
    ).deploy(
        name="mlb-managed-flow",
        work_pool_name="managed-pool",
        parameters={"team_name": "phillies", "start_date": "06/01/2024", "end_date": "06/30/2024"},
        job_variables={"pip_packages": read_requirements()}
    )


The from_source() method specifies the location of flow code when creating deployments. It’s particularly useful when you want to pull flow code from a remote storage location at runtime, rather than having it baked into a Docker image. You can see that it takes two arguments: A URL to a git repository or a storage object, and the path to the file containing the flow and the function name, separated by a colon.

The .deploy() method enables you to define a deployment programmatically in Python code. It takes a name for the deployment, the work pool to submit work to, and a dictionary of parameters to pass to the flow. There is also a job_variables parameter, which accepts a list of packages to install in the flow’s execution environment.

The package list is generated by the read_requirements helper function. The function reads the requirements.txt file in the directory containing the deployment script, and returns a list of packages to install in the flow’s execution environment.

To see the deployment script in action, you can run it using the following command.

python mlb_flow_deploy.py

You now have a new deployment in Prefect Cloud! You can find it by navigating to the Deployments page in the Prefect Cloud UI. To kick off a new flow run, click the Quick run button.

Add scheduling

Nice work! You just deployed the MLB flow to serverless infrastructure. You can now add a schedule to the deployment so that it runs on a regular basis.

Schedule with the Python SDK

Cron schedules are commonly used to run workflows at specific times, days, or months. The following example shows how to add a cron schedule to the deployment so that the flow will run every day at 12:00 AM. Try it out for yourself by making the changes to the mlb_flow_deploy.py file and running it again.

from prefect import flow
from pathlib import Path

# This helper function reads the requirements.txt file.
def read_requirements(file_path="requirements.txt"):
    """Read and parse requirements.txt file"""
    requirements = Path(file_path).read_text().splitlines()
    # Filter out empty lines and comments.
    return [req.strip() for req in requirements if req.strip() and not req.startswith('#')]

if __name__ == "__main__":
    flow.from_source(
        source="https://github.com/PrefectHQ/dev-day-zoom-out.git",
        entrypoint="track_1_build_workflows/session_2_resilent_workflows/4_deploy_and_schedule/mlb_flow_managed.py:mlb_flow",
    ).deploy(
        name="mlb-managed-flow",
        work_pool_name="managed-pool",
        parameters={"team_name": "phillies", "start_date": "06/01/2024", "end_date": "06/30/2024"},
        job_variables={"pip_packages": read_requirements()},
        # This schedule will run the flow every day at 12:00 AM.
        cron="0 0 * * *"
    )

After running the updated script, you’ll see that the deployment has a new schedule in the Prefect Cloud UI. Prefect also supports interval and rrule schedule types.

Interval schedules are useful for running flows at specific intervals. For example, you can run a flow every 10 minutes.

# Run every 10 minutes.
interval=timedelta(minutes=10)

For more complex recurrence patterns, Prefect supports RRule schedules. For example, running workflows every Monday, Wednesday, and Friday of the month at 4:00 PM.

# Run Monday, Tuesday, Wednesday, Thursday, Friday at 4:00 PM UTC.
rrule="FREQ=WEEKLY;BYDAY=MO,TU,WE,TH,FR;BYHOUR=16;BYMINUTE=0"

Add a schedule with the Prefect Cloud UI

If you’d like to add a schedule to a deployment using the Prefect Cloud UI, navigate to the deployment and click + Schedule.

Next steps

In this tutorial, you learned how to:

  • Handle pipeline failures and retries
  • Implement data quality checks and rollbacks
  • Deploy a flow to serverless infrastructure
  • Add a schedule to a deployment

You can continue your exploration of Prefect by checking out the different work pool types to deploy your flows to Docker, Kubernetes, or your favorite cloud provider. If you’re interested in building more workflows, check out Train a machine learning model to learn how to build a machine learning pipeline.