# Orchestrating Batch and Scheduled Workflows with Airflow

By Day 4, PulseCart's reactive layer is fully operational. Events flow from the FastAPI producer into Pub/Sub, Cloud Run consumers react in real time, and Cloud Tasks handles delayed work like cart abandonment reminders. The system responds to what's happening right now.

But not everything in PulseCart is reactive. Some work is inherently scheduled — it doesn't make sense to trigger it per event. Nightly aggregation of user behavior into personalization scores, reconciling messages that landed in dead-letter topics, computing daily business metrics — these are batch jobs. They run on a schedule, depend on a window of data being complete, and often have multi-step dependencies between tasks.

This is Airflow's domain. And it's explicitly not a duplication of what Pub/Sub and Cloud Tasks do — it's the complement.

* * *

## Why Airflow, Not More Cloud Tasks

Cloud Tasks is excellent for individual delayed tasks. It's not designed for multi-step workflows with dependencies, branching logic, retries at the DAG level, or visibility into which step failed and why.

Airflow gives you:

*   **DAGs** (Directed Acyclic Graphs) — workflows defined as code, with explicit task dependencies
    
*   **Scheduling** — cron-based or data-interval-driven execution
    
*   **Observability** — a UI showing every DAG run, task status, logs, and retry history
    
*   **Backfill** — re-run historical DAG runs if something failed or data was corrected
    
*   **Sensors** — tasks that wait for an external condition before proceeding
    

For PulseCart's nightly batch jobs, none of this is achievable cleanly with Cloud Tasks alone.

* * *

## Cloud Composer vs Self-Hosted Airflow

Cloud Composer is GCP's managed Airflow service. It handles the scheduler, webserver, worker pool, and underlying GKE infrastructure. You deploy DAGs by uploading Python files to a GCS bucket — no Airflow internals to manage.

The tradeoff is cost. Cloud Composer is significantly more expensive than a self-hosted Airflow instance on a single VM or a small GKE cluster. For PulseCart at production scale — where DAG failures have real business consequences and on-call engineers shouldn't be debugging Airflow infrastructure at 2am — Cloud Composer is worth it. For a smaller team or lower stakes, self-hosted on Cloud Run or GKE is a legitimate alternative.

This series uses Cloud Composer, but the DAG code is identical either way.

* * *

## PulseCart's Airflow DAGs

PulseCart runs three scheduled DAGs:

| DAG | Schedule | Purpose |
| --- | --- | --- |
| `pulsecart_nightly_aggregation` | Daily at 02:00 UTC | Aggregate prior day's events into personalization scores |
| `pulsecart_dead_letter_reconciliation` | Every 6 hours | Pull from dead-letter topics, triage, and reprocess eligible messages |
| `pulsecart_daily_metrics` | Daily at 03:00 UTC | Compute business metrics (conversion rate, abandonment rate, revenue) into Cloud SQL |

* * *

## DAG 1: Nightly Aggregation

This DAG pulls the prior day's events from Cloud SQL (written by the Cloud Run consumer as it processes Pub/Sub messages), aggregates them into per-user behavior signals, and writes the results back as personalization scores.

```python
# dags/nightly_aggregation.py
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_sql import CloudSQLExecuteQueryOperator
from airflow.providers.google.cloud.transfers.sql_to_gcs import CloudSQLToGCSOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import pendulum

default_args = {
    "owner": "pulsecart",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["engineering@pulsecart.io"],
}

with DAG(
    dag_id="pulsecart_nightly_aggregation",
    default_args=default_args,
    description="Aggregate prior day events into personalization scores",
    schedule_interval="0 2 * * *",   # 02:00 UTC daily
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["pulsecart", "personalization"],
) as dag:

    extract_events = CloudSQLExecuteQueryOperator(
        task_id="extract_prior_day_events",
        gcp_cloudsql_conn_id="pulsecart_cloudsql",
        sql="""
            INSERT INTO event_aggregates (user_id, date, event_type, event_count)
            SELECT
                user_id,
                DATE(timestamp) AS date,
                event_type,
                COUNT(*) AS event_count
            FROM events
            WHERE DATE(timestamp) = CURRENT_DATE - INTERVAL '1 day'
            GROUP BY user_id, DATE(timestamp), event_type
            ON CONFLICT (user_id, date, event_type)
            DO UPDATE SET event_count = EXCLUDED.event_count;
        """,
    )

    compute_scores = CloudSQLExecuteQueryOperator(
        task_id="compute_personalization_scores",
        gcp_cloudsql_conn_id="pulsecart_cloudsql",
        sql="""
            INSERT INTO personalization_scores (user_id, score, computed_at)
            SELECT
                user_id,
                -- Weighted score: recency, frequency, purchase intent
                (
                    SUM(CASE WHEN event_type = 'order.placed'   THEN event_count * 10 ELSE 0 END) +
                    SUM(CASE WHEN event_type = 'cart.item_added' THEN event_count * 3  ELSE 0 END) +
                    SUM(CASE WHEN event_type = 'product.viewed'  THEN event_count * 1  ELSE 0 END)
                ) AS score,
                NOW() AS computed_at
            FROM event_aggregates
            WHERE date >= CURRENT_DATE - INTERVAL '7 days'
            GROUP BY user_id
            ON CONFLICT (user_id)
            DO UPDATE SET score = EXCLUDED.score, computed_at = EXCLUDED.computed_at;
        """,
    )

    extract_events >> compute_scores
```

The task dependency (`extract_events >> compute_scores`) ensures scores are never computed against stale data. If `extract_events` fails, `compute_scores` never runs — and Airflow retries the whole DAG run up to 2 times before alerting.

* * *

## DAG 2: Dead-Letter Reconciliation

Every Pub/Sub subscription has a dead-letter topic (configured in Day 2). Messages end up there when they exceed the maximum delivery attempt count — typically due to a consumer bug, a malformed payload, or a dependency outage during the retry window.

Without reconciliation, dead-lettered messages just sit there. With this DAG, they're triaged every 6 hours: reprocessable messages are re-published to the original topic, and genuinely malformed ones are logged for manual review.

```python
# dags/dead_letter_reconciliation.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from google.cloud import pubsub_v1
import json
import logging
import pendulum

logger = logging.getLogger(__name__)

DEAD_LETTER_TOPICS = [
    ("pulsecart.commerce-events.dead-letter", "pulsecart.commerce-events"),
    ("pulsecart.user-actions.dead-letter",    "pulsecart.user-actions"),
    ("pulsecart.system-events.dead-letter",   "pulsecart.system-events"),
]

PROJECT_ID = "your-gcp-project-id"
MAX_MESSAGES_PER_RUN = 500


def reconcile_dead_letters(dead_letter_topic: str, original_topic: str, **context):
    subscriber = pubsub_v1.SubscriberClient()
    publisher = pubsub_v1.PublisherClient()

    subscription_path = subscriber.subscription_path(
        PROJECT_ID, f"sub-{dead_letter_topic}-reconciler"
    )
    original_topic_path = publisher.topic_path(PROJECT_ID, original_topic)

    reprocessed = 0
    skipped = 0

    response = subscriber.pull(
        request={"subscription": subscription_path, "max_messages": MAX_MESSAGES_PER_RUN}
    )

    for msg in response.received_messages:
        try:
            event = json.loads(msg.message.data.decode("utf-8"))
            event_type = event.get("event_type")

            if not event_type:
                logger.warning(f"Dead-lettered message has no event_type, skipping: {msg.message.message_id}")
                skipped += 1
            else:
                publisher.publish(
                    original_topic_path,
                    data=msg.message.data,
                    ordering_key=event.get("user_id", ""),
                    event_type=event_type,
                    event_id=event.get("event_id", ""),
                    reconciled="true",        # attribute flag so consumers can log it
                )
                reprocessed += 1

            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": [msg.ack_id]}
            )

        except Exception as e:
            logger.error(f"Failed to reconcile message {msg.message.message_id}: {e}")

    logger.info(f"{dead_letter_topic}: reprocessed={reprocessed}, skipped={skipped}")


default_args = {
    "owner": "pulsecart",
    "retries": 1,
    "retry_delay": timedelta(minutes=2),
}

with DAG(
    dag_id="pulsecart_dead_letter_reconciliation",
    default_args=default_args,
    description="Triage and reprocess dead-lettered Pub/Sub messages",
    schedule_interval="0 */6 * * *",   # every 6 hours
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["pulsecart", "reliability"],
) as dag:

    for dead_letter_topic, original_topic in DEAD_LETTER_TOPICS:
        PythonOperator(
            task_id=f"reconcile_{dead_letter_topic.replace('.', '_')}",
            python_callable=reconcile_dead_letters,
            op_kwargs={
                "dead_letter_topic": dead_letter_topic,
                "original_topic": original_topic,
            },
        )
```

The three reconciliation tasks run in parallel — each topic's dead-letter queue is independent. Note the `reconciled="true"` message attribute: downstream consumers can detect and log reconciled messages separately, which is useful for tracking how often dead-lettered messages are genuinely recoverable vs. indicative of a deeper bug.

* * *

## DAG 3: Daily Metrics

A simpler DAG that computes the previous day's business metrics into a `daily_metrics` table in Cloud SQL. Downstream, a dashboard reads from this table.

```python
# dags/daily_metrics.py
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_sql import CloudSQLExecuteQueryOperator
from datetime import timedelta
import pendulum

default_args = {
    "owner": "pulsecart",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="pulsecart_daily_metrics",
    default_args=default_args,
    description="Compute prior day business metrics",
    schedule_interval="0 3 * * *",   # 03:00 UTC daily
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["pulsecart", "metrics"],
) as dag:

    compute_metrics = CloudSQLExecuteQueryOperator(
        task_id="compute_daily_metrics",
        gcp_cloudsql_conn_id="pulsecart_cloudsql",
        sql="""
            INSERT INTO daily_metrics (date, orders_placed, carts_abandoned, abandonment_rate, revenue)
            SELECT
                CURRENT_DATE - INTERVAL '1 day'                                    AS date,
                COUNT(*) FILTER (WHERE event_type = 'order.placed')                AS orders_placed,
                COUNT(*) FILTER (WHERE event_type = 'cart.abandoned')              AS carts_abandoned,
                ROUND(
                    COUNT(*) FILTER (WHERE event_type = 'cart.abandoned')::numeric /
                    NULLIF(COUNT(*) FILTER (WHERE event_type = 'cart.item_added'), 0) * 100,
                    2
                )                                                                   AS abandonment_rate,
                SUM((payload->>'total')::numeric) FILTER (WHERE event_type = 'order.placed') AS revenue
            FROM events
            WHERE DATE(timestamp) = CURRENT_DATE - INTERVAL '1 day'
            ON CONFLICT (date)
            DO UPDATE SET
                orders_placed    = EXCLUDED.orders_placed,
                carts_abandoned  = EXCLUDED.carts_abandoned,
                abandonment_rate = EXCLUDED.abandonment_rate,
                revenue          = EXCLUDED.revenue;
        """,
    )
```

* * *

## How Airflow Complements the Reactive Layer

The distinction is worth being explicit about:

| Concern | Pub/Sub + Cloud Run/Tasks | Airflow |
| --- | --- | --- |
| Trigger | An event happened | Time elapsed / schedule |
| Granularity | Per message | Per data window (day, hour) |
| Dependencies | None — consumers are independent | Explicit task dependencies in DAG |
| Observability | Cloud Logging per message | DAG UI, per-task logs, SLA tracking |
| Failure scope | Per message retry | Per DAG run retry, backfill support |
| Use case | React now | Aggregate, reconcile, report |

These two layers don't overlap in PulseCart — they handle genuinely different categories of work. Trying to do nightly aggregation with Cloud Tasks, or trying to do per-event reactions with Airflow, would be fighting the tool.

* * *

## Deploying DAGs to Cloud Composer

Cloud Composer exposes a GCS bucket for DAG storage. Deploying is straightforward:

```bash
# Upload a DAG file to the Composer environment's DAG bucket
gsutil cp dags/nightly_aggregation.py \
  gs://your-composer-bucket/dags/nightly_aggregation.py
```

In practice, this step is part of the GitHub Actions CI/CD pipeline (Day 6) — DAG files are synced to the GCS bucket on every merge to main, so the Airflow scheduler picks up changes automatically within a few minutes.

* * *

## What's Next

Day 6 covers the full production infrastructure: Terraform modules for every service we've built across this series — Pub/Sub topics, Cloud Run, Cloud Tasks queues, Cloud SQL, Redis Memorystore, and Cloud Composer — plus GitHub Actions CI/CD for zero-downtime deploys.
