Skip to content

feat: switch GCP Pub/Sub subscription to native SDK StreamingPull#722

Merged
alexluong merged 1 commit intomainfrom
native-sdk-streaming-pull
Mar 4, 2026
Merged

feat: switch GCP Pub/Sub subscription to native SDK StreamingPull#722
alexluong merged 1 commit intomainfrom
native-sdk-streaming-pull

Conversation

@alexluong
Copy link
Collaborator

@alexluong alexluong commented Mar 3, 2026

Replace the GoCloud abstraction with the native GCP Pub/Sub SDK for subscriptions, enabling SDK-managed flow control via MaxOutstandingMessages.

Changes

  • Add SubscribeOption/WithConcurrency to propagate concurrency to subscriptions
  • Add ConcurrentSubscription interface so the consumer skips its semaphore when the subscription manages concurrency internally
  • Split consumer into runConcurrent (SDK flow control) and runWithSemaphore
  • Propagate sub.Receive errors through recvErr field
  • Nack buffered messages on shutdown for faster redelivery
  • Fix data race on err variable in runWithSemaphore goroutine

Concurrency model

Before: consumer-side semaphore (all backends)

GoCloud Subscription
  │
  ▼
Consumer receive loop
  │
  ├─ sem <- struct{}{}  ← blocks when N goroutines are in-flight
  │
  └─ go handler()
       ├─ process message
       ├─ ack/nack
       └─ <-sem  ← release slot

The consumer controls concurrency with a channel-based semaphore. This still applies to SQS, Azure Service Bus, RabbitMQ, and in-memory queues.

After: SDK-managed flow control (GCP Pub/Sub)

GCP native SDK (StreamingPull)
  │
  │  MaxOutstandingMessages = N
  │  Won't deliver message N+1 until a prior message is acked
  │
  ▼
msgChan (buffered, size N)
  │
  ▼
Consumer receive loop (runConcurrent)
  │
  └─ go handler()  ← no semaphore, bounded by SDK flow control
       ├─ process message
       └─ ack  ← unblocks SDK to deliver next message

The SDK's MaxOutstandingMessages is the single throttle — at most N un-acked messages exist at any time. Since each handler goroutine holds one un-acked message, goroutine count is bounded by N without needing a consumer-side semaphore.

Goroutines in steady state (GCP path)

Goroutine Role
Background receiver sub.Receive → SDK pulls from Pub/Sub → pushes into msgChan
Consumer loop Reads from msgChan, spawns handler goroutines
Handler goroutines (up to N) Process message, ack/nack

Shutdown sequence

ctx cancelled
  │
  ├─ Consumer loop: Receive(ctx) returns ctx.Err() → breaks
  ├─ wg.Wait() → wait for in-flight handler goroutines
  │
  └─ deferred Shutdown()
       ├─ cancel() → cancels subCtx
       ├─ Background receiver: sub.Receive stops, nacks incoming msgs
       │   └─ closes msgChan, closes done
       ├─ <-done → unblocks
       ├─ Drain msgChan → nack buffered messages for faster redelivery
       └─ client.Close()

Design decisions

  • NumGoroutines = 1: Single StreamingPull stream per subscription. Scaling is done at the subscription level, not via additional goroutines within a subscription.
  • MaxExtension = -1: Disables automatic lease extension. We are intentional about consumer processing logic and do not want the SDK silently extending message leases — if a handler exceeds the ack deadline, the message should be redelivered.

Replace the GoCloud abstraction with the native GCP Pub/Sub SDK for
subscriptions, enabling SDK-managed flow control via MaxOutstandingMessages.

- Add SubscribeOption/WithConcurrency to propagate concurrency to subscriptions
- Add ConcurrentSubscription interface so the consumer skips its semaphore
  when the subscription manages concurrency internally
- Split consumer into runConcurrent (SDK flow control) and runWithSemaphore
- Propagate sub.Receive errors through recvErr field
- Nack buffered messages on shutdown for faster redelivery
- Fix data race on err variable in runWithSemaphore goroutine

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@vercel
Copy link

vercel bot commented Mar 3, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
outpost-docs Building Building Preview, Comment Mar 3, 2026 0:43am
outpost-website Ready Ready Preview, Comment Mar 3, 2026 0:43am

Request Review

@alexluong alexluong merged commit d969f83 into main Mar 4, 2026
5 checks passed
@alexluong alexluong deleted the native-sdk-streaming-pull branch March 4, 2026 09:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants