Skip to main content

Command Palette

Search for a command to run...

Cloud Run Consumers and Cloud Tasks: Reactive and Delayed Workflows

Day 4 of Building PulseCart: Event-Driven Architecture on GCP

Updated
8 min readView as Markdown
Cloud Run Consumers and Cloud Tasks: Reactive and Delayed Workflows

In Day 3 we built the FastAPI producer that publishes events to Pub/Sub. Now we build what consumes them. This is where the event-driven model pays off — multiple independent services reacting to the same events, each doing exactly one job, without knowing anything about each other.

Day 4 covers two distinct patterns that serve different needs in PulseCart:

  • Cloud Run + Pub/Sub push for immediate, reactive work

  • Cloud Tasks for delayed, scheduled, and explicitly retryable work

Understanding when to use each — and why they're complementary rather than interchangeable — is one of the more practical decisions in this stack.


Pattern 1: Cloud Run Consumer via Pub/Sub Push

When an order.placed event hits pulsecart.commerce-events, we want to react immediately — trigger a confirmation message, update the user's order history, notify the inventory service. This is Cloud Run's job.

A push subscription delivers each Pub/Sub message as an HTTP POST to your Cloud Run service URL. The service processes it and returns a 2xx to acknowledge. No polling, no ack_id management — Pub/Sub handles delivery, Cloud Run handles processing.

Decoding the Push Payload

Pub/Sub wraps the message in an envelope before posting it:

# models/pubsub.py
from pydantic import BaseModel
from typing import Dict, Optional
import base64
import json

class PubSubMessage(BaseModel):
    data: str                          # base64-encoded event JSON
    messageId: str
    publishTime: str
    attributes: Optional[Dict[str, str]] = {}

class PubSubPushEnvelope(BaseModel):
    message: PubSubMessage
    subscription: str

    def decode_event(self) -> dict:
        decoded = base64.b64decode(self.message.data).decode("utf-8")
        return json.loads(decoded)

The Consumer Service

# routers/consumer.py
from fastapi import APIRouter, HTTPException, status, Request
from models.pubsub import PubSubPushEnvelope
from services.idempotency import is_duplicate, mark_processed
from services.tasks import schedule_cart_reminder
from services.messaging import trigger_personalized_message
import logging

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/consumer", tags=["consumer"])


@router.post("/push", status_code=status.HTTP_204_NO_CONTENT)
async def handle_push(envelope: PubSubPushEnvelope):
    event = envelope.decode_event()
    event_id = event.get("event_id")
    event_type = event.get("event_type")

    if not event_id or not event_type:
        # Malformed message — ack it anyway to avoid infinite retry
        logger.error(f"Malformed event, missing event_id or event_type: {event}")
        return

    # Idempotency check — deduplicate before any processing
    if await is_duplicate(event_id):
        logger.info(f"Duplicate event skipped: {event_id}")
        return

    try:
        if event_type == "order.placed":
            await trigger_personalized_message(event, template="order_confirmation")

        elif event_type == "cart.abandoned":
            # Don't send immediately — schedule a delayed reminder via Cloud Tasks
            await schedule_cart_reminder(event, delay_seconds=7200)  # 2 hours

        elif event_type == "payment.failed":
            await trigger_personalized_message(event, template="payment_retry")

        # Mark as processed only after successful handling
        await mark_processed(event_id)

    except Exception as e:
        logger.error(f"Processing failed for {event_id}: {e}")
        # Raise so Pub/Sub retries delivery (don't return 2xx on failure)
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Processing failed: {str(e)}"
        )

Two things to notice here: the cart.abandoned case doesn't process immediately — it hands off to Cloud Tasks for a delayed reminder. And any unhandled exception returns a 5xx, which tells Pub/Sub to retry. Returning a 2xx on failure would silently ack and drop the message.


Idempotency with Redis

Pub/Sub guarantees at-least-once delivery. In practice, duplicate delivery is rare but real — network hiccups, consumer restarts, or Pub/Sub's own retry logic can cause the same message to arrive twice. Without idempotency, a user could receive two order confirmation emails for one purchase.

PulseCart uses Redis (Cloud Memorystore) as the deduplication store. The pattern is simple: before processing any event, check if its event_id exists in Redis. If it does, skip. If it doesn't, process and then write the key with a TTL.

# services/idempotency.py
import aioredis
from config import settings

redis = aioredis.from_url(settings.redis_url, decode_responses=True)

IDEMPOTENCY_TTL = 86400  # 24 hours — longer than Pub/Sub's max retry window


async def is_duplicate(event_id: str) -> bool:
    result = await redis.get(f"processed:{event_id}")
    return result is not None


async def mark_processed(event_id: str) -> None:
    await redis.set(
        f"processed:{event_id}",
        "1",
        ex=IDEMPOTENCY_TTL
    )

The TTL matters. You need it to be longer than Pub/Sub's maximum retry window for a given subscription (configurable, but defaults to 7 days with exponential backoff). 24 hours is conservative for most cases — tune it to your subscription's actual retry policy.

One subtlety: mark_processed is called after successful processing, not before. If processing fails halfway through, the event_id stays unset in Redis and Pub/Sub will redeliver. This means your downstream operations (sending a message, writing to Cloud SQL) also need to be idempotent — or wrapped in a transaction that either fully succeeds or fully rolls back. For PulseCart's use case, the messaging service handles this by checking for an existing message_sent record before sending.


Pattern 2: Cloud Tasks for Delayed and Retryable Work

Not everything should happen immediately. PulseCart's cart abandonment reminder shouldn't fire the moment someone leaves the site — that would be jarring and almost certainly wrong (they might just be switching tabs). The right behavior is to wait two hours, check if the cart is still abandoned, and only then send a reminder.

This is exactly what Cloud Tasks is built for: scheduling work to happen at a specific time in the future, with explicit retry control.

Creating a Cloud Tasks Task

# services/tasks.py
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
from config import settings
import json
import datetime

tasks_client = tasks_v2.CloudTasksClient()

QUEUE_PATH = tasks_client.queue_path(
    settings.gcp_project_id,
    settings.gcp_region,
    "pulsecart-cart-reminders"
)

HANDLER_URL = f"https://{settings.cloud_run_consumer_url}/tasks/cart-reminder"


async def schedule_cart_reminder(event: dict, delay_seconds: int = 7200):
    scheduled_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay_seconds)

    timestamp = timestamp_pb2.Timestamp()
    timestamp.FromDatetime(scheduled_time)

    task = {
        "http_request": {
            "http_method": tasks_v2.HttpMethod.POST,
            "url": HANDLER_URL,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({
                "event_id": event["event_id"],
                "user_id": event["user_id"],
                "cart_id": event["payload"].get("cart_id"),
                "total": event["payload"].get("total"),
            }).encode(),
            "oidc_token": {
                "service_account_email": settings.service_account_email
            }
        },
        "schedule_time": timestamp,
        "name": f"{QUEUE_PATH}/tasks/cart-reminder-{event['event_id']}"
    }

    response = tasks_client.create_task(
        request={"parent": QUEUE_PATH, "task": task}
    )
    return response.name

The task name (cart-reminder-{event_id}) is deterministic. Cloud Tasks will reject a duplicate task creation with the same name within the deduplication window — this gives you free idempotency on task scheduling. If schedule_cart_reminder is called twice for the same cart.abandoned event (because Pub/Sub re-delivered it), only one task gets created.

The Cart Reminder Handler

# routers/tasks.py
from fastapi import APIRouter, HTTPException, status
from services.cart import get_cart_status
from services.messaging import trigger_personalized_message
import logging

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/tasks", tags=["tasks"])


@router.post("/cart-reminder", status_code=status.HTTP_204_NO_CONTENT)
async def handle_cart_reminder(payload: dict):
    cart_id = payload.get("cart_id")
    user_id = payload.get("user_id")
    event_id = payload.get("event_id")

    # Re-check cart status at execution time — it may have been purchased
    cart = await get_cart_status(cart_id)

    if cart is None or cart.status == "purchased":
        logger.info(f"Cart {cart_id} no longer abandoned, skipping reminder")
        return

    await trigger_personalized_message(
        {"user_id": user_id, "event_id": event_id, "payload": payload},
        template="cart_abandonment_reminder"
    )

This is the critical check that justifies using Cloud Tasks over a simple asyncio.sleep. Two hours after the event was published, the handler re-validates that the cart is still abandoned before sending anything. If the user purchased in the meantime, the task exits cleanly. You can't do this reliably in memory — if your Cloud Run instance restarts during the wait, the reminder is lost.


Push vs Cloud Tasks: When to Use Each

Concern Pub/Sub Push (Cloud Run) Cloud Tasks
Timing Immediate Delayed / scheduled
Retry control Subscription-level policy Per-task, explicit
Deduplication Manual (Redis) Built-in (task name)
State at execution Event payload only Can re-query state
Cost model Per message delivered Per task created

The decision is straightforward in practice: if the work needs to happen now, use a push subscription into Cloud Run. If the work needs to happen later, or if you need to re-validate state before acting, use Cloud Tasks.


Wiring It Together in main.py

# main.py
from fastapi import FastAPI
from routers.consumer import router as consumer_router
from routers.tasks import router as tasks_router
import logging

logging.basicConfig(level=logging.INFO)

app = FastAPI(
    title="PulseCart Consumer",
    version="1.0.0"
)

app.include_router(consumer_router)
app.include_router(tasks_router)

@app.get("/health")
async def health():
    return {"status": "ok"}

Cloud Run Configuration Notes

A few Cloud Run settings that matter for consumer reliability:

Concurrency: Cloud Run defaults to 80 concurrent requests per instance. For a consumer doing lightweight work (validate, route, trigger), this is fine. If your handlers do heavy I/O, lower this to prevent one slow request from starving others.

Min instances: Set to at least 1 for the commerce-events consumer. Cold starts on a push subscription mean Pub/Sub retries while your instance is spinning up, which adds unnecessary latency on high-priority events.

Timeout: Default is 300 seconds. For push subscription handlers that should be fast, set this lower (30–60 seconds) so slow requests fail loudly rather than silently consuming your concurrency budget.


What's Next

Day 5 brings in Airflow — specifically Cloud Composer — to handle PulseCart's batch and scheduled workflows: the nightly aggregation jobs, dead-letter reconciliation, and how the orchestrated layer complements rather than duplicates the reactive one we've built here.

Building PulseCart: Event-Driven Architecture on GCP

Part 5 of 5

A hands-on series building PulseCart, a fictional e-commerce platform, using GCP's managed event-driven stack — Pub/Sub, Cloud Tasks, Cloud Run, and Airflow — instead of self-managed Kafka. Each post tackles a real production concern: event taxonomy design, delivery semantics, retries and dead-letter handling, batch orchestration, infrastructure-as-code, and observability. Written from real experience building systems that process millions of events and terabytes of data daily, this series is for engineers who want to see event-driven architecture built the way it actually gets built in production, not as a hello-world demo.

Start from the beginning

Welcome to PulseCart: What This Series Is and Who It's For

Day 0 of Building PulseCart: Event-Driven Architecture on GCP