feature(internal, cmd): protocol-migrate history#546
Open
aristidesstaffieri wants to merge 27 commits intofeature/live-ingest-state-productionfrom
Open
feature(internal, cmd): protocol-migrate history#546aristidesstaffieri wants to merge 27 commits intofeature/live-ingest-state-productionfrom
aristidesstaffieri wants to merge 27 commits intofeature/live-ingest-state-productionfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new protocol history backfill workflow to support protocol state production over the retention window, with a CLI entrypoint and shared ingest helpers to coordinate handoff with live ingestion via CAS cursors.
Changes:
- Introduces
ProtocolMigrateHistoryService(+ extensive unit tests) to backfill per-protocol history using CAS progress tracking and clean handoff semantics. - Adds
protocol-migrate historyCLI command and integration tests covering migration→live handoff and cursor edge-cases. - Extracts shared ingest helpers (ledger fetch retry, processor map building, cursor name helpers) and adds
UpdateHistoryMigrationStatusto the protocols model.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
internal/services/protocol_migrate_history.go |
Implements history migration service with CAS-based cursor advancement and convergence polling. |
internal/services/protocol_migrate_history_test.go |
Adds comprehensive unit tests for migration success/failure, resume, handoff, and convergence polling. |
internal/services/ingest_helpers.go |
Adds shared helpers: getLedgerWithRetry, buildProtocolProcessorMap, and protocol cursor name formatters. |
internal/services/ingest.go |
Reuses buildProtocolProcessorMap; removes duplicated getLedgerWithRetry method. |
internal/services/ingest_live.go |
Switches to shared cursor name helpers and shared getLedgerWithRetry. |
internal/services/ingest_backfill.go |
Switches to shared getLedgerWithRetry. |
internal/services/ingest_test.go |
Updates tests to call shared getLedgerWithRetry directly. |
internal/integrationtests/data_migration_test.go |
Adds integration coverage for history migration→live ingestion handoff and cursor-asymmetry scenarios. |
internal/data/protocols.go |
Adds UpdateHistoryMigrationStatus to support migration lifecycle tracking. |
internal/data/protocols_test.go |
Adds test for UpdateHistoryMigrationStatus. |
internal/data/mocks.go |
Updates ProtocolsModelMock to include UpdateHistoryMigrationStatus. |
internal/data/ingest_store.go |
Adds exported constants for ingest cursor key names. |
cmd/root.go |
Registers the new protocol-migrate command. |
cmd/protocol_data_migrate.go |
Implements protocol-migrate history CLI subcommand and wires it to the service. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
f8fe9b9 to
a530835
Compare
addd208 to
aa7ee47
Compare
…_status to differentiate between not started and in progress migrations
…s tracking - Add known_wasms table (migration, model, mock, and data layer tests) for tracking WASM hashes during checkpoint population - Add KnownWasm field to Models struct - Create WasmIngestionService (wasm_ingestion.go) that runs protocol validators against WASM bytecode and batch-persists hashes to known_wasms - Create CheckpointService (checkpoint.go) that orchestrates single-pass checkpoint population, delegating ContractCode entries to both WasmIngestionService and TokenProcessor, and all other entries to TokenProcessor - Extract readerFactory on checkpointService for injectable checkpoint reader creation - Extract TokenProcessor interface and NewTokenProcessor from TokenIngestionService, moving checkpoint iteration logic out of token_ingestion.go into checkpoint.go - Remove db, archive, and PopulateAccountTokens from TokenIngestionService interface and struct - Remove dbPool parameter from NewTokenIngestionServiceForLoadtest - Wire CheckpointService into IngestServiceConfig and ingestService - Update ingest_live.go to call checkpointService.PopulateFromCheckpoint instead of tokenIngestionService.PopulateAccountTokens - Update ingest.go setupDeps to construct WasmIngestionService and CheckpointService - Add ContractValidatorMock, ProtocolValidatorMock, ChangeReaderMock, CheckpointServiceMock, WasmIngestionServiceMock, TokenProcessorMock, and TokenIngestionServiceMock updates to mocks.go - Add unit tests for WasmIngestionService (10 cases covering ProcessContractCode and PersistKnownWasms) - Add unit tests for CheckpointService (16 cases covering entry routing, error propagation, and context cancellation)
…IngestionService (#524) * Initial plan * Remove validator execution from WasmIngestionService Co-authored-by: aristidesstaffieri <6886006+aristidesstaffieri@users.noreply.github.com> * services/wasm_ingestion: remove ProtocolValidator execution from WasmIngestionService Co-authored-by: aristidesstaffieri <6886006+aristidesstaffieri@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: aristidesstaffieri <6886006+aristidesstaffieri@users.noreply.github.com>
…IngestionService to use config struct WasmIngestionService.ProcessContractCode no longer receives the full bytecode—it only needs the hash to track protocol WASMs. This reduces memory pressure during checkpoint population. TokenIngestionService construction is consolidated into a single NewTokenIngestionService(config) constructor, eliminating the separate NewTokenIngestionServiceForLoadtest variant. The loadtest runner now uses the same constructor with only the fields it needs. Also refactors processContractInstanceChange to return a contractInstanceResult struct instead of multiple return values, extracts newCheckpointData() helper, uses idiomatic nil slices instead of make([]T, 0), and introduces a checkpointTestFixture struct to reduce boilerplate in checkpoint tests. Constructors return concrete types instead of interfaces to allow direct field access in tests.
Persist contract-to-WASM-hash mappings by extending WasmIngestionService with ProcessContractData and PersistProtocolContracts methods. During checkpoint population, ContractData Instance entries are parsed to extract the wasm_hash and contract_id relationship, which is stored in a new protocol_contracts table (FK to protocol_wasms). This mapping will be used by protocol-setup and live ingestion to classify contracts by protocol.
…and backfill Add two new LedgerChangeProcessors (ProtocolWasmProcessor, ProtocolContractProcessor) that extract WASM hashes and contract-to-WASM mappings from ledger changes during live ingestion, catchup, and historical backfill. Previously this data was only populated during checkpoint. - ProtocolWasmProcessor extracts hashes from ContractCode entries - ProtocolContractProcessor extracts contract-to-WASM mappings from ContractData Instance entries - Extended IndexerBuffer with protocolWasmsByHash/protocolContractsByID maps (Push/Get/Merge/Clear) - PersistLedgerData inserts wasms before contracts (FK ordering) with ON CONFLICT DO NOTHING - BatchChanges and processBatchChanges extended for backfill paths
ContractData Instance entries can outlive their referenced ContractCode entries due to independent TTLs, causing FK violations when inserting protocol_contracts during checkpoint population. - Skip contracts referencing unknown WASM hashes in PersistProtocolContracts - Add WHERE EXISTS guard in BatchInsert SQL for live/backfill path - Add test for contracts_with_missing_wasm_skipped scenario
Store wasm_hash and contract_id as raw bytes instead of hex/strkey-encoded strings. Both values originate as [32]byte arrays in XDR, so BYTEA reduces storage by ~50%, improves index performance on fixed-size keys, and removes unnecessary encoding/decoding at the persistence boundary.
The protocol_id on protocol_contracts was always NULL and never queried. It's derivable via the existing FK join: protocol_contracts.wasm_hash → protocol_wasms.wasm_hash → protocol_wasms.protocol_id.
Replace raw []byte with types.HashBytea for WasmHash and ContractID fields in ProtocolWasm and ProtocolContract models. HashBytea implements sql.Scanner and driver.Valuer to auto-convert between raw bytes (DB) and hex strings (Go), consistent with how Transaction.Hash is handled. Updated files: - internal/data/protocol_wasms.go, protocol_contracts.go (models + BatchInsert) - internal/indexer/processors/protocol_wasms.go, protocol_contracts.go - internal/services/wasm_ingestion.go - All corresponding test files
…kpointService
WasmIngestionService was only used by CheckpointService, and
TokenIngestionService's NewTokenProcessor/TokenProcessor interface was
only used by CheckpointService. This inlines all checkpoint-specific
logic directly into CheckpointService, eliminating unnecessary
intermediate service abstractions.
- Rewrite checkpoint.go to absorb all checkpoint logic: checkpointData,
batch, trustline/contract/WASM processing, and protocol persistence
- Replace positional NewCheckpointService args with CheckpointServiceConfig
- Strip token_ingestion.go to live-only (ProcessTokenChanges); remove
TokenProcessor interface, NewTokenProcessor, and checkpoint-only fields
from TokenIngestionServiceConfig
- Delete wasm_ingestion.go (absorbed into checkpoint.go)
- Remove WasmIngestionServiceMock, TokenProcessorMock from mocks.go
- Update ingest.go wiring and simplify TokenIngestionServiceConfig
- Rewrite checkpoint_test.go with data model mocks; port WASM and
checkpoint processor tests from deleted test files
- Add TrustlineAssetModelMock to data/mocks.go
- Add valid AccountId to makeAccountChange() helper to prevent nil pointer dereference - Add missing protocolWasmModel.BatchInsert mock expectation in ContractCodeEntry test - Fix ContextCancellation test to cancel context during reader.Read() instead of before PopulateFromCheckpoint, matching the expected error path
… names This aligns Protocol→Protocols and ProtocolWasm→ProtocolWasms (structs, interfaces, mocks, and Models struct fields) to match the protocols and protocol_wasms table names, consistent with the existing ProtocolContracts convention.
Introduces the infrastructure for protocol processors to produce and
persist protocol-specific state during live ledger ingestion, gated by
per-protocol compare-and-swap cursors that coordinate with concurrent
migration processes.
Key changes:
- ProtocolProcessor interface and ProtocolProcessorInput for protocol-
specific ledger analysis and state persistence
- Processor registry (RegisterProcessor/GetAllProcessors) for protocol
processor discovery at startup
- Dual CAS gating in PersistLedgerData (step 5.5): per-protocol history
and current_state cursors ensure exactly-once writes even when live
ingestion and migration run concurrently
- Protocol contract cache with periodic refresh to avoid per-ledger DB
queries for classified contracts
- Data layer additions: IngestStoreModel.GetTx, CompareAndSwap,
ProtocolContractsModel.GetByProtocolID, ProtocolsModel.GetClassified
Tests:
- Unit tests for processor registry (concurrent safety, overwrite, etc.)
- 5 subtests for PersistLedgerData CAS gating (win, lose, behind, no
cursor, no processors) using a real test DB and sentinel-writing
testProtocolProcessor
- Docker integration test (ProtocolStateProductionTestSuite) exercising
CAS gating against a live ingest container's DB in three phases
… tests
Introduce ProtocolMigrateHistoryService that backfills protocol state
changes for historical ledgers, walking forward from the oldest ingest
cursor to the latest cursor and persisting PersistHistory at each ledger.
The service tracks progress via a per-protocol history_cursor using CAS,
refreshes the protocol contract cache periodically, and marks
history_migration_status on completion.
Supporting changes:
- Add `protocol-data-migrate` CLI command (cmd/protocol_data_migrate.go)
- Add UpdateHistoryMigrationStatus to ProtocolsModel and its mock/tests
- Add per-call tracking (persistedHistorySeqs, persistedCurrentStateSeqs)
to integrationTestProcessor for verifying persistence call counts
Integration test additions:
- Enhance TestHistoryMigrationThenLiveIngestionHandoff with per-ledger
PersistHistory verification across migration and live handoff phases
- Add TestLiveIngestionHistoryCursorReadyCurrentStateLags proving the
asymmetric cursor CAS path: when history_cursor is ready but
current_state_cursor lags, only PersistHistory executes while
PersistCurrentState is correctly skipped
Move duplicated logic into ingest_helpers.go:
- getLedgerWithRetry: was identical method on both ingestService and
protocolMigrateHistoryService, now a package-level function
- buildProtocolProcessorMap: deduplicates processor slice-to-map
conversion with nil/duplicate validation
- protocolHistoryCursorName/protocolCurrentStateCursorName: replaces
scattered Sprintf calls for cursor key formatting
Simplifies getLedgerWithRetry test to call the function directly
without constructing a full ingestService.
The convergence poll in processAllProtocols treated any error from PrepareRange/GetLedger as convergence, including transient RPC failures like connection refused. This could prematurely mark protocols as StatusSuccess during network blips. Now discriminates three cases: poll deadline exceeded (converged), parent context cancelled (propagate), anything else (transient — retry).
The history migration service read cursor positions using hardcoded constants (data.OldestLedgerCursorName, data.LatestLedgerCursorName), ignoring operator overrides via CLI flags. Add configurable cursor name fields with defaults matching the ingest command, so operators who override --latest-ledger-cursor-name or --oldest-ledger-cursor-name get consistent behavior across live ingestion and history migration.
…dger backend The outer loop in protocol history migration transitions the same LedgerBackend instance between BoundedRange and UnboundedRange without explicit reset. This works because captive core internally closes the subprocess before opening a new range, but that behavior is an implementation detail not guaranteed by the LedgerBackend interface. Add an explanatory comment at the transition point and a new integration test (rangeTrackingBackend) that verifies the Bounded→Unbounded→Bounded PrepareRange sequence when the tip advances during the convergence poll.
When processAllProtocols fails, the Run() method was marking all active protocols as StatusFailed, including ones already handed off to live ingestion via CAS failure. This caused handed-off protocols to be re-processed on the next Run(), conflicting with live ingestion's cursor ownership. Change processAllProtocols to return handed-off protocol IDs alongside the error, then split the status update: handed-off protocols get StatusSuccess (live ingestion owns them), while only non-handed-off protocols get StatusFailed.
If the caller passes duplicate protocol IDs (e.g. --protocol-id foo --protocol-id foo), duplicate trackers would be created for the same protocol, causing self-induced CAS failures and incorrect handoff detection. Add order-preserving deduplication as the first operation in validate(), which is the single choke-point for both Run() and processAllProtocols().
c066bff to
6674b16
Compare
Move reusable logic into internal/utils/ as generic functions (RetryWithBackoff[T], BuildMap[T]) and move cursor name helpers to ingestion_utils.go. Inline all call sites in services to use utils directly and delete the ingest helpers file entirely. Also fix variable shadow lint errors in ingest_live.go and protocol_migrate_history.go.
6674b16 to
262156b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #515
What
Implements the
protocol-migrate historyCLI subcommand, backfills protocol history for historical ledgers within the retention window, synchronizing with live ingestion through a shared per-protocol history CAS cursor.PersistHistoryat each ledger with batch processing and CAS-based progress tracking. On CAS failure, detects handoff from live ingestion and exits cleanly. Trackshistory_migration_statuslifecycle (in_progress → success/failed).protocol-data-migrateCLI command (cmd/protocol_data_migrate.go) — accepts --protocol-idflags to select which protocols to backfill.getLedgerWithRetry,buildProtocolProcessorMap, and cursor name formatters deduplicate logic between history migration and live ingestion.UpdateHistoryMigrationStatusadded toProtocolsModelfor migration lifecycle tracking.--latest-ledger-cursor-name/--oldest-ledger-cursor-nameoverrides, matching live ingestion behavior.Why
Protocol state production needs historical data within the retention window before live ingestion can provide complete coverage. Without this command, protocols only have history state from the point live ingestion started using their processors — all prior ledgers are missing. This subcommand lets operators backfill per-protocol, with safe handoff to live
ingestion via CAS cursors ensuring no gaps or double-processing at the migration boundary.
Known limitations
N/A
Issue that this PR addresses
#515
Checklist
PR Structure
allif the changes are broad or impact many packages.Thoroughness
Release