Cloud Pub/Sub Architecture Overview
Pub/Sub Architecture: Topics, Subscriptions, and Delivery Guarantees
Pub/Sub implements a publish-subscribe pattern where publishers send messages to topics and subscribers receive messages through subscriptions. This decoupling enables independent scaling of producers and consumers while providing durable message storage.- Message retention: 7-31 days (configurable)
- At-least-once delivery: Default, may deliver duplicates
- Exactly-once delivery: Eliminates duplicates (+10% cost)
- Message ordering: Per ordering key for event sourcing
Creating Topics and Subscriptions
from google.cloud import pubsub_v1
# Create topic
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'user-events')
topic = publisher.create_topic(
request={{
"name": topic_path,
"message_retention_duration": "86400s", # 1 day
"schema_settings": {{
"schema": "projects/my-project/schemas/user-event-schema",
"encoding": "JSON"
}}
}}
)
# Create pull subscription
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'user-events-sub')
subscription = subscriber.create_subscription(
request={{
"name": subscription_path,
"topic": topic_path,
"ack_deadline_seconds": 60,
"message_retention_duration": "604800s", # 7 days
"enable_exactly_once_delivery": True, # +10% cost
"retry_policy": {{
"minimum_backoff": "10s",
"maximum_backoff": "600s"
}},
"dead_letter_policy": {{
"dead_letter_topic": "projects/my-project/topics/user-events-dlq",
"max_delivery_attempts": 10
}}
}}
)
Message Flow with Exactly-Once Delivery
Publishing Messages
import json
from google.cloud import pubsub_v1
from concurrent import futures
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'user-events')
# Simple publish
def publish_message(data: dict):
message_json = json.dumps(data)
message_bytes = message_json.encode('utf-8')
future = publisher.publish(
topic_path,
message_bytes,
# Attributes for filtering
event_type='user_signup',
region='us-east1'
)
message_id = future.result()
print(f"Published: {{message_id}}")
# Batch publishing (more efficient)
publish_futures = []
def get_callback(future, data):
def callback(future):
try:
message_id = future.result()
print(f"Published {{data}}: {{message_id}}")
except Exception as e:
print(f"Error publishing {{data}}: {{e}}")
return callback
for i in range(100):
data = {{"user_id": i, "action": "signup"}}
message_bytes = json.dumps(data).encode('utf-8')
future = publisher.publish(topic_path, message_bytes)
future.add_done_callback(get_callback(future, data))
publish_futures.append(future)
# Wait for all
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
# Ordered messages
def publish_ordered(user_id: int, event: str):
message_bytes = json.dumps({{
"user_id": user_id,
"event": event
}}).encode('utf-8')
# Messages with same ordering_key delivered in order
future = publisher.publish(
topic_path,
message_bytes,
ordering_key=f"user-{{user_id}}"
)
return future.result()
Subscription Types
Pull Subscription (Batch Processing)
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'user-events-sub')
# Synchronous pull
def pull_messages(max_messages: int = 10):
response = subscriber.pull(
request={{
"subscription": subscription_path,
"max_messages": max_messages
}},
timeout=5.0
)
ack_ids = []
for received_message in response.received_messages:
print(f"Received: {{received_message.message.data}}")
ack_ids.append(received_message.ack_id)
# Acknowledge in batch
if ack_ids:
subscriber.acknowledge(
request={{
"subscription": subscription_path,
"ack_ids": ack_ids
}}
)
# Streaming pull (async)
def callback(message):
print(f"Received: {{message.data.decode('utf-8')}}")
print(f"Attributes: {{message.attributes}}")
try:
# Process message
process_event(json.loads(message.data))
message.ack() # Success
except Exception as e:
print(f"Error: {{e}}")
message.nack() # Redelivery
streaming_pull_future = subscriber.subscribe(
subscription_path,
callback=callback
)
print(f"Listening for messages on {{subscription_path}}...")
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
Push Subscription (Cloud Run/Functions)
# Create push subscription
push_config = {{
"push_endpoint": "https://my-service-xyz.run.app/pubsub-handler",
"oidc_token": {{
"service_account_email": "pubsub-invoker@my-project.iam.gserviceaccount.com"
}},
"attributes": {{
"x-goog-version": "v1"
}}
}}
subscriber.create_subscription(
request={{
"name": subscription_path,
"topic": topic_path,
"push_config": push_config,
"ack_deadline_seconds": 60
}}
)
# Cloud Run handler (Flask)
from flask import Flask, request
import base64
import json
app = Flask(__name__)
@app.route('/pubsub-handler', methods=['POST'])
def pubsub_handler():
envelope = request.get_json()
if not envelope:
return ('No Pub/Sub message received', 400)
# Decode message
pubsub_message = envelope.get('message')
if pubsub_message:
data = base64.b64decode(pubsub_message['data']).decode('utf-8')
attributes = pubsub_message.get('attributes', {})
try:
event = json.loads(data)
process_event(event)
return ('', 204) # Success
except Exception as e:
print(f"Error: {{e}}")
return ('Processing failed', 500) # Will retry
return ('', 400)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Advanced Features for Enterprise Workloads
Schema Validation
from google.cloud import pubsub_v1
from google.pubsub_v1.types import Schema, Encoding
schema_client = pubsub_v1.SchemaServiceClient()
project_path = f"projects/my-project"
# Create Avro schema
avro_schema = '''{{
"type": "record",
"name": "UserEvent",
"fields": [
{{"name": "user_id", "type": "int"}},
{{"name": "event_type", "type": "string"}},
{{"name": "timestamp", "type": "long"}},
{{"name": "metadata", "type": ["null", "string"], "default": null}}
]
}}'''
schema = schema_client.create_schema(
request={{
"parent": project_path,
"schema": Schema(
name=f"{{project_path}}/schemas/user-event-schema",
type_=Schema.Type.AVRO,
definition=avro_schema
),
"schema_id": "user-event-schema"
}}
)
# Attach schema to topic
topic = publisher.create_topic(
request={{
"name": topic_path,
"schema_settings": {{
"schema": schema.name,
"encoding": Encoding.JSON
}}
}}
)
# Publish with validation
from google.cloud.pubsub_v1.types import PubsubMessage
message = PubsubMessage(
data=json.dumps({{
"user_id": 123,
"event_type": "signup",
"timestamp": 1704384000,
"metadata": "source=web"
}}).encode('utf-8')
)
publisher.publish(topic_path, message.data) # Validated!
BigQuery Subscription
# Direct to BigQuery (no Dataflow needed!)
bigquery_config = {{
"table": "my-project.analytics.user_events",
"write_metadata": True, # Include publish_time, attributes
"use_topic_schema": True # Auto-map schema
}}
subscriber.create_subscription(
request={{
"name": subscription_path,
"topic": topic_path,
"bigquery_config": bigquery_config
}}
)
# Messages automatically written to BQ!
# Query with:
# SELECT user_id, event_type, publish_time
# FROM `my-project.analytics.user_events`
# WHERE DATE(publish_time) = CURRENT_DATE()
Production Terraform Configuration
# Topic with schema
resource "google_pubsub_schema" "user_events" {{
name = "user-event-schema"
type = "AVRO"
definition = file("${{path.module}}/schemas/user-event.avsc")
}}
resource "google_pubsub_topic" "user_events" {{
name = "user-events"
message_retention_duration = "86400s" # 1 day
schema_settings {{
schema = google_pubsub_schema.user_events.id
encoding = "JSON"
}}
}}
# Dead letter topic
resource "google_pubsub_topic" "user_events_dlq" {{
name = "user-events-dlq"
}}
# Pull subscription with exactly-once
resource "google_pubsub_subscription" "user_events_analytics" {{
name = "user-events-analytics"
topic = google_pubsub_topic.user_events.id
ack_deadline_seconds = 60
message_retention_duration = "604800s" # 7 days
enable_exactly_once_delivery = true
retry_policy {{
minimum_backoff = "10s"
maximum_backoff = "600s"
}}
dead_letter_policy {{
dead_letter_topic = google_pubsub_topic.user_events_dlq.id
max_delivery_attempts = 10
}}
expiration_policy {{
ttl = "" # Never expire
}}
}}
# Push subscription to Cloud Run
resource "google_pubsub_subscription" "user_events_processor" {{
name = "user-events-processor"
topic = google_pubsub_topic.user_events.id
push_config {{
push_endpoint = google_cloud_run_service.processor.status[0].url
oidc_token {{
service_account_email = google_service_account.pubsub_invoker.email
}}
}}
filter = "attributes.event_type = \"signup\"" # Filter
}}
# BigQuery subscription
resource "google_pubsub_subscription" "user_events_warehouse" {{
name = "user-events-warehouse"
topic = google_pubsub_topic.user_events.id
bigquery_config {{
table = "${{var.project_id}}:analytics.user_events"
write_metadata = true
use_topic_schema = true
}}
}}
Cost Optimization and Monitoring
| Strategy | Impact | Implementation |
|---|---|---|
| Message batching | -70% API calls | Batch 100+ msgs per publish |
| Compression | -60% bandwidth | gzip before publish |
| Regional topics | -50% latency | Use regional over global |
| Subscription filtering | -40% processing | Filter at source |
| Retention tuning | -30% storage | Reduce from 31d to 7d |
Monitoring Metrics
from google.cloud import monitoring_v3
import time
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/my-project"
# Query Pub/Sub metrics
def get_pubsub_metrics():
interval = monitoring_v3.TimeInterval({{
"end_time": {{"seconds": int(time.time())}},
"start_time": {{"seconds": int(time.time()) - 3600}} # Last hour
}})
# Unacked messages
results = client.list_time_series(
request={{
"name": project_name,
"filter": 'metric.type="pubsub.googleapis.com/subscription/num_unacked_messages_by_region"',
"interval": interval
}}
)
for result in results:
print(f"Subscription: {{result.resource.labels['subscription_id']}}")
for point in result.points:
print(f" Unacked: {{point.value.int64_value}}")
# Publish throughput
results = client.list_time_series(
request={{
"name": project_name,
"filter": 'metric.type="pubsub.googleapis.com/topic/send_request_count"',
"interval": interval
}}
)
for result in results:
print(f"Topic: {{result.resource.labels['topic_id']}}")
total = sum(point.value.int64_value for point in result.points)
print(f" Requests: {{total}}")
Key Takeaways and Best Practices
- Enable exactly-once carefully: +10% cost, use only when needed
- Always configure dead letter topics: Prevent poison messages
- Use message ordering sparingly: Can reduce throughput
- Implement idempotent handlers: Even with exactly-once
- Monitor unacked messages: Alerts on subscription lag
- Batch publishing: 100+ messages per API call
- Use schema validation: Prevent malformed messages
- Regional topics for low latency: Global only if needed
- Filter subscriptions: Reduce processing overhead
- Set appropriate retention: Balance cost vs recovery needs
References
- 📚 Cloud Pub/Sub Documentation
- 📚 Exactly-Once Delivery Guide
- 📚 Python Pub/Sub Client
- 📚 Pub/Sub Pricing
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.