A production-ready Change Data Capture (CDC) system that synchronizes PostgreSQL data to OpenSearch in real-time using Debezium and Kafka, with full reconciliation capabilities.
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ │ │ │ │ │
│ PostgreSQL │─────▶│ Debezium │─────▶│ Kafka │
│ │ CDC │ Connector │ │ │
└──────────────┘ └──────────────┘ └──────────────┘
│
│ Events
▼
┌─────────────────────────────────────┐
│ Sync Worker │
│ - Consumes CDC events │
│ - Batches updates (50/2s) │
│ - Deduplicates project IDs │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ OpenSearch │
│ - Real-time search index │
│ - Nested user/hashtag queries │
│ - Full-text search with fuzzy │
└─────────────────────────────────────┘
▲
│
┌──────────────┴───────────────┐
│ │
┌──────────▼──────────┐ ┌─────────────▼────────────┐
│ API Server │ │ Reconciliation Worker │
│ - Search projects │ │ - Daily full reindex │
│ - Filter by user │ │ - Zero-downtime │
│ - Filter by tag │ │ - Index retention │
└─────────────────────┘ └──────────────────────────┘
- CDC-based updates: Captures all database changes via Debezium
- Smart batching: Configurable batch size (default 50) and interval (default 2s)
- Deduplication: Automatically deduplicates multiple updates to the same project
- Immediate deletes: Delete operations bypass batching for instant removal
- Full-text search with fuzzy matching
- Multi-field queries (slug, name, description)
- Nested queries for users and hashtags
- Pagination with configurable limits
- Sorting by date, score, name, or slug
- Date range filtering
- Custom field selection
- Daily full reindex (default: 3 AM)
- Zero-downtime switching via atomic alias updates
- Index versioning with dated indices (e.g.,
projects-2026-01-24) - Automatic cleanup of old indices (7-day retention)
- Retry logic with exponential backoff
.
├── cmd/
│ ├── server/ # API server
│ │ └── main.go
│ ├── workers/
│ │ ├── sync-worker/ # CDC sync worker
│ │ │ └── main.go
│ │ └── reconciliation-worker/ # Daily reconciliation
│ │ └── main.go
│ └── tools/
│ └── mapping-json/ # Utility to print mapping
│ └── main.go
├── internal/
│ ├── config/ # Configuration management
│ ├── database/ # PostgreSQL client
│ ├── dto/ # Request/Response DTOs
│ ├── errors/ # Custom error types
│ ├── handlers/ # HTTP handlers
│ ├── kafka/ # Kafka consumer
│ ├── models/ # Domain models
│ ├── opensearch/ # OpenSearch client
│ ├── processor/ # CDC message processor
│ ├── repository/ # Repository interfaces & implementations
│ └── service/ # Business logic layer
├── pkg/
│ └── constants/ # Application constants
├── postgres/
│ ├── migrations/ # Database schema
│ └── seeds/ # Seed data
├── opensearch/
│ └── init-index.sh # Index initialization
├── scripts/
│ ├── setup.sh # Main setup script
│ ├── setup-debezium.sh # Debezium connector setup
│ └── bulk-load.sh # Bulk load utility
├── docker-compose.yml # All services
└── README.md
- Docker & Docker Compose
- Go 1.24+ (for local development)
- Clone the repository
git clone <repo-url>
cd postgres-cdc-os- Make scripts executable
chmod +x ./scripts/*.sh
chmod +x ./opensearch/init-index.sh- Start all services
./scripts/setup.shThis will:
- Start PostgreSQL, Kafka, Debezium, and OpenSearch
- Run database migrations
- Seed initial data
- Create OpenSearch index
- Configure Debezium connector
- Start sync worker, reconciliation worker, and API server
- Verify the setup
# Health check
curl http://localhost:8080/health
# Search projects
curl "http://localhost:8080/api/v1/projects/search?q=project"GET /api/v1/projects/search?q=<query>&fuzzy=true&limit=20&offset=0&sort_by=created_at&sort_order=descQuery Parameters:
q(required): Search queryfuzzy(optional): Enable fuzzy matching (default: true)operator(optional):andoror(default: or)fields(optional): Fields to search (default: slug, name, description)limit(optional): Results per page (default: 20, max: 100)offset(optional): Pagination offset (max: 10000)sort_by(optional): Sort field (created_at, updated_at, name, slug, _score)sort_order(optional): asc or desc (default: desc)date_from(optional): Filter by created date from (ISO 8601)date_to(optional): Filter by created date to (ISO 8601)
Example:
curl "http://localhost:8080/api/v1/projects/search?q=awesome&fuzzy=true&sort_by=created_at&sort_order=desc"GET /api/v1/projects/by-user/:userId?limit=20&offset=0Example:
curl http://localhost:8080/api/v1/projects/by-user/1# Single or multiple hashtags (query parameter - comma-separated)
GET /api/v1/projects/by-hashtag?hashtags=hashtag1,hashtag2,hashtag3&match=any&limit=20&offset=0
# Multiple hashtags (query parameter - multiple values)
GET /api/v1/projects/by-hashtag?hashtags=golang&hashtags=rust&hashtags=nodejs&match=all&limit=20&offset=0Query Parameters:
hashtags(required): Comma-separated hashtag names or multiple query parametersmatch(optional): Match mode -"any"(OR logic, default) or"all"(AND logic)limit(optional): Results per page (default: 100, max: 100)offset(optional): Offset for pagination (default: 0)
Examples:
# Single hashtag
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang"
# Multiple hashtags - ANY match (OR logic, default)
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust,nodejs"
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust,nodejs&match=any"
# Multiple hashtags - ALL match (AND logic)
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust,nodejs&match=all"
# Multiple hashtags (multiple query params) with ALL match
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang&hashtags=rust&match=all"
# With pagination
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust&match=any&limit=10&offset=0"Match Modes:
match=any(default): Returns projects that have any of the specified hashtags (OR logic)match=all: Returns projects that have all of the specified hashtags (AND logic)
GET /healthConfiguration is managed via environment variables:
# Database
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DB=projectsdb
# Kafka
KAFKA_BROKER=localhost:9092
# OpenSearch
OPENSEARCH_URL=http://localhost:9200
OPENSEARCH_INDEX=projects
# API Server
PORT=8080
# Worker Settings
BATCH_SIZE=50
BATCH_INTERVAL=2sThis project includes API integration testing and end-to-end testing of the CDC flow.
Run E2E tests:
cd e2e && ./run-tests.shRun API tests:
go test -v ./tests/- API Tests: Various search and filter endpoints tested via a dedicated test index
- E2E Tests: Complete CDC flow with 15 comprehensive scenarios including:
- Create/Update/Delete operations
- Junction table operations
- Related table updates triggering reindexing
- API search functionality
- Cascade deletes
- Bulk operations
INSERT INTO projects (name, slug, description)
VALUES ('Test Project', 'test-project', 'A test project');curl "http://localhost:8080/api/v1/projects/search?q=test-project"UPDATE projects SET description = 'Updated description'
WHERE slug = 'test-project';INSERT INTO user_projects (user_id, project_id)
SELECT 1, id FROM projects WHERE slug = 'test-project';curl http://localhost:8080/api/v1/projects/by-user/1DELETE FROM projects WHERE slug = 'test-project';- Why: Reduces OpenSearch load and improves throughput
- How: Accumulates project IDs for 2 seconds or until batch size (50) is reached
- Benefit: Can handle bursts of updates efficiently
- Why: Multiple related tables (users, hashtags) can trigger updates for the same project
- How: Uses a map to track pending project IDs
- Benefit: Prevents redundant reindexing of the same project
- Why: Ensures deleted projects don't appear in search results
- How: Bypasses batching for delete operations
- Benefit: Consistent user experience
- Why: Handles edge cases where CDC might miss events
- How: Daily full reindex with zero-downtime alias switching
- Benefit: Guarantees eventual consistency
- Why: Separates data access logic from business logic
- How: Interfaces define contracts, implementations are swappable
- Benefit: Testable, maintainable, follows SOLID principles
- Sync latency: < 2 seconds (configurable)
- Batch size: 50 projects (configurable)
- Reconciliation time: ~2-5 minutes for 10K projects
- Search latency: < 50ms for most queries
- Throughput: ~1000 updates/second with default batching
# API Server
go run cmd/server/main.go
# Sync Worker
go run cmd/workers/sync-worker/main.go
# Reconciliation Worker
go run cmd/workers/reconciliation-worker/main.godocker build -f Dockerfile.server -t cdc-api-server .
docker build -f Dockerfile.sync-worker -t cdc-sync-worker .
docker build -f Dockerfile.reconciliation-worker -t cdc-reconciliation-worker .go run cmd/tools/mapping-json/main.go | jq .cd terraform
terraform init
terraform plan -out=tfplan
terraform apply tfplan