Skip to content

diivi/postgres-cdc-os

Repository files navigation

Postgres CDC to OpenSearch Sync

A production-ready Change Data Capture (CDC) system that synchronizes PostgreSQL data to OpenSearch in real-time using Debezium and Kafka, with full reconciliation capabilities.

Quick Start

🏗️ Architecture

┌──────────────┐      ┌──────────────┐      ┌──────────────┐
│              │      │              │      │              │
│  PostgreSQL  │─────▶│   Debezium   │─────▶│    Kafka     │
│              │ CDC  │  Connector   │      │              │
└──────────────┘      └──────────────┘      └──────────────┘
                                                     │
                                                     │ Events
                                                     ▼
                      ┌─────────────────────────────────────┐
                      │        Sync Worker                  │
                      │  - Consumes CDC events              │
                      │  - Batches updates (50/2s)          │
                      │  - Deduplicates project IDs         │
                      └─────────────────────────────────────┘
                                     │
                                     ▼
                      ┌─────────────────────────────────────┐
                      │        OpenSearch                   │
                      │  - Real-time search index           │
                      │  - Nested user/hashtag queries      │
                      │  - Full-text search with fuzzy      │
                      └─────────────────────────────────────┘
                                     ▲
                                     │
                      ┌──────────────┴───────────────┐
                      │                              │
           ┌──────────▼──────────┐    ┌─────────────▼────────────┐
           │    API Server       │    │  Reconciliation Worker   │
           │  - Search projects  │    │  - Daily full reindex    │
           │  - Filter by user   │    │  - Zero-downtime         │
           │  - Filter by tag    │    │  - Index retention       │
           └─────────────────────┘    └──────────────────────────┘

✨ Features

Real-time Synchronization

  • CDC-based updates: Captures all database changes via Debezium
  • Smart batching: Configurable batch size (default 50) and interval (default 2s)
  • Deduplication: Automatically deduplicates multiple updates to the same project
  • Immediate deletes: Delete operations bypass batching for instant removal

Search Capabilities

  • Full-text search with fuzzy matching
  • Multi-field queries (slug, name, description)
  • Nested queries for users and hashtags
  • Pagination with configurable limits
  • Sorting by date, score, name, or slug
  • Date range filtering
  • Custom field selection

Reconciliation

  • Daily full reindex (default: 3 AM)
  • Zero-downtime switching via atomic alias updates
  • Index versioning with dated indices (e.g., projects-2026-01-24)
  • Automatic cleanup of old indices (7-day retention)
  • Retry logic with exponential backoff

📦 Project Structure

.
├── cmd/
│   ├── server/                    # API server
│   │   └── main.go
│   ├── workers/
│   │   ├── sync-worker/          # CDC sync worker
│   │   │   └── main.go
│   │   └── reconciliation-worker/ # Daily reconciliation
│   │       └── main.go
│   └── tools/
│       └── mapping-json/         # Utility to print mapping
│           └── main.go
├── internal/
│   ├── config/                   # Configuration management
│   ├── database/                 # PostgreSQL client
│   ├── dto/                      # Request/Response DTOs
│   ├── errors/                   # Custom error types
│   ├── handlers/                 # HTTP handlers
│   ├── kafka/                    # Kafka consumer
│   ├── models/                   # Domain models
│   ├── opensearch/               # OpenSearch client
│   ├── processor/                # CDC message processor
│   ├── repository/               # Repository interfaces & implementations
│   └── service/                  # Business logic layer
├── pkg/
│   └── constants/                # Application constants
├── postgres/
│   ├── migrations/               # Database schema
│   └── seeds/                    # Seed data
├── opensearch/
│   └── init-index.sh            # Index initialization
├── scripts/
│   ├── setup.sh                 # Main setup script
│   ├── setup-debezium.sh        # Debezium connector setup
│   └── bulk-load.sh             # Bulk load utility
├── docker-compose.yml           # All services
└── README.md

🚀 Quick Start

Prerequisites

  • Docker & Docker Compose
  • Go 1.24+ (for local development)

Setup

  1. Clone the repository
git clone <repo-url>
cd postgres-cdc-os
  1. Make scripts executable
chmod +x ./scripts/*.sh
chmod +x ./opensearch/init-index.sh
  1. Start all services
./scripts/setup.sh

This will:

  • Start PostgreSQL, Kafka, Debezium, and OpenSearch
  • Run database migrations
  • Seed initial data
  • Create OpenSearch index
  • Configure Debezium connector
  • Start sync worker, reconciliation worker, and API server
  1. Verify the setup
# Health check
curl http://localhost:8080/health

# Search projects
curl "http://localhost:8080/api/v1/projects/search?q=project"

🔌 API Endpoints

Search Projects

GET /api/v1/projects/search?q=<query>&fuzzy=true&limit=20&offset=0&sort_by=created_at&sort_order=desc

Query Parameters:

  • q (required): Search query
  • fuzzy (optional): Enable fuzzy matching (default: true)
  • operator (optional): and or or (default: or)
  • fields (optional): Fields to search (default: slug, name, description)
  • limit (optional): Results per page (default: 20, max: 100)
  • offset (optional): Pagination offset (max: 10000)
  • sort_by (optional): Sort field (created_at, updated_at, name, slug, _score)
  • sort_order (optional): asc or desc (default: desc)
  • date_from (optional): Filter by created date from (ISO 8601)
  • date_to (optional): Filter by created date to (ISO 8601)

Example:

curl "http://localhost:8080/api/v1/projects/search?q=awesome&fuzzy=true&sort_by=created_at&sort_order=desc"

Get Projects by User

GET /api/v1/projects/by-user/:userId?limit=20&offset=0

Example:

curl http://localhost:8080/api/v1/projects/by-user/1

Get Projects by Hashtag(s)

# Single or multiple hashtags (query parameter - comma-separated)
GET /api/v1/projects/by-hashtag?hashtags=hashtag1,hashtag2,hashtag3&match=any&limit=20&offset=0

# Multiple hashtags (query parameter - multiple values)
GET /api/v1/projects/by-hashtag?hashtags=golang&hashtags=rust&hashtags=nodejs&match=all&limit=20&offset=0

Query Parameters:

  • hashtags (required): Comma-separated hashtag names or multiple query parameters
  • match (optional): Match mode - "any" (OR logic, default) or "all" (AND logic)
  • limit (optional): Results per page (default: 100, max: 100)
  • offset (optional): Offset for pagination (default: 0)

Examples:

# Single hashtag
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang"

# Multiple hashtags - ANY match (OR logic, default)
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust,nodejs"
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust,nodejs&match=any"

# Multiple hashtags - ALL match (AND logic)
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust,nodejs&match=all"

# Multiple hashtags (multiple query params) with ALL match
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang&hashtags=rust&match=all"

# With pagination
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust&match=any&limit=10&offset=0"

Match Modes:

  • match=any (default): Returns projects that have any of the specified hashtags (OR logic)
  • match=all: Returns projects that have all of the specified hashtags (AND logic)

Health Check

GET /health

⚙️ Configuration

Configuration is managed via environment variables:

# Database
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DB=projectsdb

# Kafka
KAFKA_BROKER=localhost:9092

# OpenSearch
OPENSEARCH_URL=http://localhost:9200
OPENSEARCH_INDEX=projects

# API Server
PORT=8080

# Worker Settings
BATCH_SIZE=50
BATCH_INTERVAL=2s

🧪 Testing

This project includes API integration testing and end-to-end testing of the CDC flow.

Quick Start

Run E2E tests:

cd e2e && ./run-tests.sh

Run API tests:

go test -v ./tests/

Test Coverage

  • API Tests: Various search and filter endpoints tested via a dedicated test index
  • E2E Tests: Complete CDC flow with 15 comprehensive scenarios including:
    • Create/Update/Delete operations
    • Junction table operations
    • Related table updates triggering reindexing
    • API search functionality
    • Cascade deletes
    • Bulk operations

Manual Testing

1. Insert a new project

INSERT INTO projects (name, slug, description) 
VALUES ('Test Project', 'test-project', 'A test project');

2. Search for it (should appear within 2 seconds)

curl "http://localhost:8080/api/v1/projects/search?q=test-project"

3. Update the project

UPDATE projects SET description = 'Updated description' 
WHERE slug = 'test-project';

4. Add users to the project

INSERT INTO user_projects (user_id, project_id)
SELECT 1, id FROM projects WHERE slug = 'test-project';

5. Search by user

curl http://localhost:8080/api/v1/projects/by-user/1

6. Delete the project

DELETE FROM projects WHERE slug = 'test-project';

🏗️ Design Decisions

Batching Strategy

  • Why: Reduces OpenSearch load and improves throughput
  • How: Accumulates project IDs for 2 seconds or until batch size (50) is reached
  • Benefit: Can handle bursts of updates efficiently

Deduplication

  • Why: Multiple related tables (users, hashtags) can trigger updates for the same project
  • How: Uses a map to track pending project IDs
  • Benefit: Prevents redundant reindexing of the same project

Immediate Deletes

  • Why: Ensures deleted projects don't appear in search results
  • How: Bypasses batching for delete operations
  • Benefit: Consistent user experience

Reconciliation Worker

  • Why: Handles edge cases where CDC might miss events
  • How: Daily full reindex with zero-downtime alias switching
  • Benefit: Guarantees eventual consistency

Repository Pattern

  • Why: Separates data access logic from business logic
  • How: Interfaces define contracts, implementations are swappable
  • Benefit: Testable, maintainable, follows SOLID principles

📊 Performance Characteristics

  • Sync latency: < 2 seconds (configurable)
  • Batch size: 50 projects (configurable)
  • Reconciliation time: ~2-5 minutes for 10K projects
  • Search latency: < 50ms for most queries
  • Throughput: ~1000 updates/second with default batching

📝 Development

Run locally

# API Server
go run cmd/server/main.go

# Sync Worker
go run cmd/workers/sync-worker/main.go

# Reconciliation Worker
go run cmd/workers/reconciliation-worker/main.go

Build Docker images

docker build -f Dockerfile.server -t cdc-api-server .
docker build -f Dockerfile.sync-worker -t cdc-sync-worker .
docker build -f Dockerfile.reconciliation-worker -t cdc-reconciliation-worker .

View OpenSearch mapping

go run cmd/tools/mapping-json/main.go | jq .

Terraform to provision infrastructure

cd terraform
terraform init
terraform plan -out=tfplan
terraform apply tfplan

About

A CDC pipeline to sync changes between postgres and opensearch via debezium and kafka

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors