Event-Driven Architecture on GCP: Mastering Cloud Pub/Sub for Real-Time Systems

Google Cloud Pub/Sub provides the foundation for event-driven architectures at any scale, offering globally distributed messaging with exactly-once delivery semantics and sub-second latency. This comprehensive guide explores Pub/Sub’s enterprise capabilities.

Cloud Pub/Sub Architecture Overview

Google Cloud Pub/Sub Architecture

Pub/Sub Architecture: Topics, Subscriptions, and Delivery Guarantees

Pub/Sub implements a publish-subscribe pattern where publishers send messages to topics and subscribers receive messages through subscriptions. This decoupling enables independent scaling of producers and consumers while providing durable message storage.
  • Message retention: 7-31 days (configurable)
  • At-least-once delivery: Default, may deliver duplicates
  • Exactly-once delivery: Eliminates duplicates (+10% cost)
  • Message ordering: Per ordering key for event sourcing

Creating Topics and Subscriptions

from google.cloud import pubsub_v1

# Create topic
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'user-events')

topic = publisher.create_topic(
    request={{
        "name": topic_path,
        "message_retention_duration": "86400s",  # 1 day
        "schema_settings": {{
            "schema": "projects/my-project/schemas/user-event-schema",
            "encoding": "JSON"
        }}
    }}
)

# Create pull subscription
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'user-events-sub')

subscription = subscriber.create_subscription(
    request={{
        "name": subscription_path,
        "topic": topic_path,
        "ack_deadline_seconds": 60,
        "message_retention_duration": "604800s",  # 7 days
        "enable_exactly_once_delivery": True,  # +10% cost
        "retry_policy": {{
            "minimum_backoff": "10s",
            "maximum_backoff": "600s"
        }},
        "dead_letter_policy": {{
            "dead_letter_topic": "projects/my-project/topics/user-events-dlq",
            "max_delivery_attempts": 10
        }}
    }}
)

Message Flow with Exactly-Once Delivery

Pub/Sub Message Flow

Publishing Messages

import json
from google.cloud import pubsub_v1
from concurrent import futures

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'user-events')

# Simple publish
def publish_message(data: dict):
    message_json = json.dumps(data)
    message_bytes = message_json.encode('utf-8')
    
    future = publisher.publish(
        topic_path,
        message_bytes,
        # Attributes for filtering
        event_type='user_signup',
        region='us-east1'
    )
    
    message_id = future.result()
    print(f"Published: {{message_id}}")

# Batch publishing (more efficient)
publish_futures = []

def get_callback(future, data):
    def callback(future):
        try:
            message_id = future.result()
            print(f"Published {{data}}: {{message_id}}")
        except Exception as e:
            print(f"Error publishing {{data}}: {{e}}")
    return callback

for i in range(100):
    data = {{"user_id": i, "action": "signup"}}
    message_bytes = json.dumps(data).encode('utf-8')
    
    future = publisher.publish(topic_path, message_bytes)
    future.add_done_callback(get_callback(future, data))
    publish_futures.append(future)

# Wait for all
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

# Ordered messages
def publish_ordered(user_id: int, event: str):
    message_bytes = json.dumps({{
        "user_id": user_id,
        "event": event
    }}).encode('utf-8')
    
    # Messages with same ordering_key delivered in order
    future = publisher.publish(
        topic_path,
        message_bytes,
        ordering_key=f"user-{{user_id}}"
    )
    return future.result()

Subscription Types

Pull Subscription (Batch Processing)

from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'user-events-sub')

# Synchronous pull
def pull_messages(max_messages: int = 10):
    response = subscriber.pull(
        request={{
            "subscription": subscription_path,
            "max_messages": max_messages
        }},
        timeout=5.0
    )
    
    ack_ids = []
    for received_message in response.received_messages:
        print(f"Received: {{received_message.message.data}}")
        ack_ids.append(received_message.ack_id)
    
    # Acknowledge in batch
    if ack_ids:
        subscriber.acknowledge(
            request={{
                "subscription": subscription_path,
                "ack_ids": ack_ids
            }}
        )

# Streaming pull (async)
def callback(message):
    print(f"Received: {{message.data.decode('utf-8')}}")
    print(f"Attributes: {{message.attributes}}")
    
    try:
        # Process message
        process_event(json.loads(message.data))
        message.ack()  # Success
    except Exception as e:
        print(f"Error: {{e}}")
        message.nack()  # Redelivery

streaming_pull_future = subscriber.subscribe(
    subscription_path,
    callback=callback
)

print(f"Listening for messages on {{subscription_path}}...")
try:
    streaming_pull_future.result()
except KeyboardInterrupt:
    streaming_pull_future.cancel()

Push Subscription (Cloud Run/Functions)

# Create push subscription
push_config = {{
    "push_endpoint": "https://my-service-xyz.run.app/pubsub-handler",
    "oidc_token": {{
        "service_account_email": "pubsub-invoker@my-project.iam.gserviceaccount.com"
    }},
    "attributes": {{
        "x-goog-version": "v1"
    }}
}}

subscriber.create_subscription(
    request={{
        "name": subscription_path,
        "topic": topic_path,
        "push_config": push_config,
        "ack_deadline_seconds": 60
    }}
)

# Cloud Run handler (Flask)
from flask import Flask, request
import base64
import json

app = Flask(__name__)

@app.route('/pubsub-handler', methods=['POST'])
def pubsub_handler():
    envelope = request.get_json()
    
    if not envelope:
        return ('No Pub/Sub message received', 400)
    
    # Decode message
    pubsub_message = envelope.get('message')
    if pubsub_message:
        data = base64.b64decode(pubsub_message['data']).decode('utf-8')
        attributes = pubsub_message.get('attributes', {})
        
        try:
            event = json.loads(data)
            process_event(event)
            return ('', 204)  # Success
        except Exception as e:
            print(f"Error: {{e}}")
            return ('Processing failed', 500)  # Will retry
    
    return ('', 400)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

Advanced Features for Enterprise Workloads

Schema Validation

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Schema, Encoding

schema_client = pubsub_v1.SchemaServiceClient()
project_path = f"projects/my-project"

# Create Avro schema
avro_schema = '''{{
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {{"name": "user_id", "type": "int"}},
    {{"name": "event_type", "type": "string"}},
    {{"name": "timestamp", "type": "long"}},
    {{"name": "metadata", "type": ["null", "string"], "default": null}}
  ]
}}'''

schema = schema_client.create_schema(
    request={{
        "parent": project_path,
        "schema": Schema(
            name=f"{{project_path}}/schemas/user-event-schema",
            type_=Schema.Type.AVRO,
            definition=avro_schema
        ),
        "schema_id": "user-event-schema"
    }}
)

# Attach schema to topic
topic = publisher.create_topic(
    request={{
        "name": topic_path,
        "schema_settings": {{
            "schema": schema.name,
            "encoding": Encoding.JSON
        }}
    }}
)

# Publish with validation
from google.cloud.pubsub_v1.types import PubsubMessage

message = PubsubMessage(
    data=json.dumps({{
        "user_id": 123,
        "event_type": "signup",
        "timestamp": 1704384000,
        "metadata": "source=web"
    }}).encode('utf-8')
)

publisher.publish(topic_path, message.data)  # Validated!

BigQuery Subscription

# Direct to BigQuery (no Dataflow needed!)
bigquery_config = {{
    "table": "my-project.analytics.user_events",
    "write_metadata": True,  # Include publish_time, attributes
    "use_topic_schema": True  # Auto-map schema
}}

subscriber.create_subscription(
    request={{
        "name": subscription_path,
        "topic": topic_path,
        "bigquery_config": bigquery_config
    }}
)

# Messages automatically written to BQ!
# Query with:
# SELECT user_id, event_type, publish_time
# FROM `my-project.analytics.user_events`
# WHERE DATE(publish_time) = CURRENT_DATE()

Production Terraform Configuration

# Topic with schema
resource "google_pubsub_schema" "user_events" {{
  name = "user-event-schema"
  type = "AVRO"
  definition = file("${{path.module}}/schemas/user-event.avsc")
}}

resource "google_pubsub_topic" "user_events" {{
  name = "user-events"
  
  message_retention_duration = "86400s"  # 1 day
  
  schema_settings {{
    schema = google_pubsub_schema.user_events.id
    encoding = "JSON"
  }}
}}

# Dead letter topic
resource "google_pubsub_topic" "user_events_dlq" {{
  name = "user-events-dlq"
}}

# Pull subscription with exactly-once
resource "google_pubsub_subscription" "user_events_analytics" {{
  name  = "user-events-analytics"
  topic = google_pubsub_topic.user_events.id
  
  ack_deadline_seconds = 60
  message_retention_duration = "604800s"  # 7 days
  
  enable_exactly_once_delivery = true
  
  retry_policy {{
    minimum_backoff = "10s"
    maximum_backoff = "600s"
  }}
  
  dead_letter_policy {{
    dead_letter_topic = google_pubsub_topic.user_events_dlq.id
    max_delivery_attempts = 10
  }}
  
  expiration_policy {{
    ttl = ""  # Never expire
  }}
}}

# Push subscription to Cloud Run
resource "google_pubsub_subscription" "user_events_processor" {{
  name  = "user-events-processor"
  topic = google_pubsub_topic.user_events.id
  
  push_config {{
    push_endpoint = google_cloud_run_service.processor.status[0].url
    
    oidc_token {{
      service_account_email = google_service_account.pubsub_invoker.email
    }}
  }}
  
  filter = "attributes.event_type = \"signup\""  # Filter
}}

# BigQuery subscription
resource "google_pubsub_subscription" "user_events_warehouse" {{
  name  = "user-events-warehouse"
  topic = google_pubsub_topic.user_events.id
  
  bigquery_config {{
    table = "${{var.project_id}}:analytics.user_events"
    write_metadata = true
    use_topic_schema = true
  }}
}}

Cost Optimization and Monitoring

StrategyImpactImplementation
Message batching-70% API callsBatch 100+ msgs per publish
Compression-60% bandwidthgzip before publish
Regional topics-50% latencyUse regional over global
Subscription filtering-40% processingFilter at source
Retention tuning-30% storageReduce from 31d to 7d

Monitoring Metrics

from google.cloud import monitoring_v3
import time

client = monitoring_v3.MetricServiceClient()
project_name = f"projects/my-project"

# Query Pub/Sub metrics
def get_pubsub_metrics():
    interval = monitoring_v3.TimeInterval({{
        "end_time": {{"seconds": int(time.time())}},
        "start_time": {{"seconds": int(time.time()) - 3600}}  # Last hour
    }})
    
    # Unacked messages
    results = client.list_time_series(
        request={{
            "name": project_name,
            "filter": 'metric.type="pubsub.googleapis.com/subscription/num_unacked_messages_by_region"',
            "interval": interval
        }}
    )
    
    for result in results:
        print(f"Subscription: {{result.resource.labels['subscription_id']}}")
        for point in result.points:
            print(f"  Unacked: {{point.value.int64_value}}")
    
    # Publish throughput
    results = client.list_time_series(
        request={{
            "name": project_name,
            "filter": 'metric.type="pubsub.googleapis.com/topic/send_request_count"',
            "interval": interval
        }}
    )
    
    for result in results:
        print(f"Topic: {{result.resource.labels['topic_id']}}")
        total = sum(point.value.int64_value for point in result.points)
        print(f"  Requests: {{total}}")

Key Takeaways and Best Practices

  • Enable exactly-once carefully: +10% cost, use only when needed
  • Always configure dead letter topics: Prevent poison messages
  • Use message ordering sparingly: Can reduce throughput
  • Implement idempotent handlers: Even with exactly-once
  • Monitor unacked messages: Alerts on subscription lag
  • Batch publishing: 100+ messages per API call
  • Use schema validation: Prevent malformed messages
  • Regional topics for low latency: Global only if needed
  • Filter subscriptions: Reduce processing overhead
  • Set appropriate retention: Balance cost vs recovery needs

References


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.