feat: switch GCP Pub/Sub subscription to native SDK StreamingPull#722
Merged
feat: switch GCP Pub/Sub subscription to native SDK StreamingPull#722
Conversation
Replace the GoCloud abstraction with the native GCP Pub/Sub SDK for subscriptions, enabling SDK-managed flow control via MaxOutstandingMessages. - Add SubscribeOption/WithConcurrency to propagate concurrency to subscriptions - Add ConcurrentSubscription interface so the consumer skips its semaphore when the subscription manages concurrency internally - Split consumer into runConcurrent (SDK flow control) and runWithSemaphore - Propagate sub.Receive errors through recvErr field - Nack buffered messages on shutdown for faster redelivery - Fix data race on err variable in runWithSemaphore goroutine Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
alexbouchardd
approved these changes
Mar 3, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Replace the GoCloud abstraction with the native GCP Pub/Sub SDK for subscriptions, enabling SDK-managed flow control via
MaxOutstandingMessages.Changes
SubscribeOption/WithConcurrencyto propagate concurrency to subscriptionsConcurrentSubscriptioninterface so the consumer skips its semaphore when the subscription manages concurrency internallyrunConcurrent(SDK flow control) andrunWithSemaphoresub.Receiveerrors throughrecvErrfielderrvariable inrunWithSemaphoregoroutineConcurrency model
Before: consumer-side semaphore (all backends)
The consumer controls concurrency with a channel-based semaphore. This still applies to SQS, Azure Service Bus, RabbitMQ, and in-memory queues.
After: SDK-managed flow control (GCP Pub/Sub)
The SDK's
MaxOutstandingMessagesis the single throttle — at most N un-acked messages exist at any time. Since each handler goroutine holds one un-acked message, goroutine count is bounded by N without needing a consumer-side semaphore.Goroutines in steady state (GCP path)
sub.Receive→ SDK pulls from Pub/Sub → pushes intomsgChanmsgChan, spawns handler goroutinesShutdown sequence
Design decisions
NumGoroutines = 1: Single StreamingPull stream per subscription. Scaling is done at the subscription level, not via additional goroutines within a subscription.MaxExtension = -1: Disables automatic lease extension. We are intentional about consumer processing logic and do not want the SDK silently extending message leases — if a handler exceeds the ack deadline, the message should be redelivered.