diff --git a/Cargo.lock b/Cargo.lock index adac807ce..64166a4e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28332,6 +28332,38 @@ dependencies = [ "tracing", ] +[[package]] +name = "tesseract-relayer" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "clap", + "futures", + "ismp", + "log", + "parity-scale-codec", + "polkadot-sdk", + "primitive-types 0.13.1", + "proof-indexer", + "serde", + "sp-consensus-beefy", + "subxt 0.42.1", + "subxt-utils", + "tesseract-beefy", + "tesseract-config", + "tesseract-consensus", + "tesseract-evm", + "tesseract-messaging", + "tesseract-primitives", + "tesseract-substrate", + "tokio", + "toml 0.7.8", + "tracing", + "tracing-subscriber 0.3.22", + "transaction-fees", +] + [[package]] name = "tesseract-substrate" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 0f7eb11b3..aa992dea3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ members = [ "tesseract/consensus/tendermint", "tesseract/consensus/proof-indexer", + "tesseract/relayer", # Airdrop "modules/pallets/bridge-drop", diff --git a/tesseract/consensus/relayer/src/any.rs b/tesseract/consensus/relayer/src/any.rs index 55eaef3c2..d7e8961fc 100644 --- a/tesseract/consensus/relayer/src/any.rs +++ b/tesseract/consensus/relayer/src/any.rs @@ -146,6 +146,15 @@ pub struct HyperbridgeHostConfig { pub host: ConsensusHost, } +impl HyperbridgeHostConfig { + pub fn substrate_config(&self) -> SubstrateConfig { + match &self.host { + ConsensusHost::Beefy { substrate, .. } => substrate.clone(), + ConsensusHost::Grandpa(grandpa) => grandpa.substrate.clone(), + } + } +} + impl HyperbridgeHostConfig { /// Constructs an instance of the [`IsmpHost`] from the provided configs pub async fn into_client(self) -> Result, anyhow::Error> diff --git a/tesseract/relayer/Cargo.toml b/tesseract/relayer/Cargo.toml new file mode 100644 index 000000000..902009bd6 --- /dev/null +++ b/tesseract/relayer/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "tesseract-relayer" +version = "0.1.0" +edition = "2021" +description = "Consolidated Hyperbridge relayer — inbound and outbound consensus + messaging" +authors = ["Polytope Labs "] +publish = false + +[[bin]] +name = "tesseract-relayer" +path = "src/main.rs" + +[dependencies] +anyhow = { workspace = true } +async-trait = "0.1.53" +clap = { version = "4.3.5", features = ["derive"] } +codec = { workspace = true, default-features = true, features = ["derive"] } +futures = "0.3.28" +ismp = { workspace = true, default-features = true } +log = "0.4.19" +primitive-types = { workspace = true } +serde = { workspace = true, features = ["derive"] } +toml = "0.7.4" +tokio = { workspace = true, features = ["full"] } +tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } + +proof-indexer = { workspace = true } +sp-consensus-beefy = { workspace = true } +subxt = { workspace = true } +subxt-utils = { workspace = true } +tesseract-beefy = { workspace = true } +tesseract-config = { workspace = true } +tesseract-consensus = { workspace = true } +tesseract-evm = { workspace = true } +tesseract-messaging = { workspace = true } +tesseract-primitives = { workspace = true } +tesseract-substrate = { workspace = true } +transaction-fees = { workspace = true } + +[dependencies.polkadot-sdk] +workspace = true +default-features = true +features = ["sc-service"] diff --git a/tesseract/relayer/src/cli.rs b/tesseract/relayer/src/cli.rs new file mode 100644 index 000000000..271199911 --- /dev/null +++ b/tesseract/relayer/src/cli.rs @@ -0,0 +1,194 @@ +// Copyright (C) Polytope Labs Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Context; +use clap::Parser; +use futures::FutureExt; +use ismp::host::StateMachine; +use polkadot_sdk::sc_service::TaskManager; +use tesseract_consensus::cli::create_client_map; +use tesseract_primitives::{IsmpHost, IsmpProvider}; +use tesseract_substrate::config::{Blake2SubstrateChain, KeccakSubstrateChain}; +use transaction_fees::TransactionPayment; + +use crate::{ + config::HyperbridgeConfig, + outbound, + provider::{ConsensusProofProvider, IndexerProofProvider}, +}; + +#[derive(Parser, Debug)] +#[command(version, about = "Consolidated Hyperbridge relayer")] +pub struct Cli { + /// Path to the relayer config file + #[arg(short, long)] + pub config: String, + + /// Path to the relayer database file (for fee tracking) + #[arg(short, long)] + pub db: String, + + #[command(subcommand)] + pub subcommand: Option, +} + +#[derive(clap::Subcommand, Debug)] +pub enum Subcommand { + /// Capture a BEEFY consensus proof from the relay chain and store it in the indexer DB + CaptureProof { + /// Relay chain websocket RPC URL + #[arg(long)] + relay_rpc: String, + /// PostgreSQL connection URL for the indexer database + #[arg(long)] + db_url: String, + }, +} + +impl Cli { + pub async fn run(self) -> Result<(), anyhow::Error> { + setup_logging()?; + + if let Some(subcommand) = self.subcommand { + return match subcommand { + Subcommand::CaptureProof { relay_rpc, db_url } => + crate::capture_proof::capture_and_store(&relay_rpc, &db_url).await, + }; + } + + log::info!("Initializing tesseract relayer"); + + let config = HyperbridgeConfig::parse_conf(&self.config).await?; + let relayer = config.relayer.clone(); + + let tokio_handle = tokio::runtime::Handle::current(); + let mut task_manager = TaskManager::new(tokio_handle, None)?; + + let tx_payment = Arc::new( + TransactionPayment::initialize(&self.db) + .await + .context("Error initializing fee database")?, + ); + + // Hyperbridge as IsmpHost (for inbound consensus counterparty) + let hyperbridge_host = config + .hyperbridge + .clone() + .into_client::() + .await?; + let coprocessor = hyperbridge_host.provider().state_machine_id().state_id; + let hyperbridge_provider = hyperbridge_host.provider(); + + let host_clients = create_client_map(config.consensus_config()).await?; + + // IsmpProvider map derived from IsmpHost clients + let mut provider_clients: HashMap> = + host_clients.iter().map(|(sm, host)| (*sm, host.provider())).collect(); + provider_clients.insert(coprocessor, hyperbridge_provider.clone()); + + let proof_provider: Option> = match relayer.indexer_db_url { + Some(ref db_url) => { + let indexer = proof_indexer::ProofIndexer::initialize(db_url).await?; + log::info!("Outbound proof provider connected to indexer DB"); + Some(Arc::new(IndexerProofProvider::new(indexer))) + }, + None => None, + }; + + let messaging_config: tesseract_primitives::config::RelayerConfig = relayer.clone().into(); + + for (state_machine, host) in &host_clients { + if !relayer.delivery_endpoints.is_empty() && + !relayer.delivery_endpoints.contains(&state_machine.to_string()) + { + continue; + } + + let provider = host.provider(); + + // -- Inbound consensus: EVM → Hyperbridge -- + { + let hb = hyperbridge_provider.clone(); + let name = format!("inbound-consensus-{}-{}", provider.name(), hb.name()); + let host = host.clone(); + + task_manager.spawn_essential_handle().spawn_blocking( + Box::leak(Box::new(name.clone())), + "consensus", + async move { + let res = host.start_consensus(hb).await; + log::error!(target: "tesseract", "{name} terminated: {res:?}"); + } + .boxed(), + ); + } + + // -- Inbound messaging: EVM → Hyperbridge -- + { + let mut hb_for_messaging = tesseract_substrate::SubstrateClient::< + KeccakSubstrateChain, + >::new(config.hyperbridge.substrate_config()) + .await?; + hb_for_messaging.set_latest_finalized_height(provider.clone()).await?; + + tesseract_messaging::relay( + hb_for_messaging, + provider.clone(), + messaging_config.clone(), + coprocessor, + tx_payment.clone(), + provider_clients.clone(), + &task_manager, + ) + .await?; + } + + // -- Outbound: Hyperbridge → EVM (merged consensus + messaging) -- + if let Some(ref proof_provider) = proof_provider { + let hb = hyperbridge_provider.clone(); + let evm = provider.clone(); + let provider = proof_provider.clone(); + let coproc = coprocessor; + let name = format!("outbound-{}-{}", hb.name(), evm.name()); + + task_manager.spawn_essential_handle().spawn_blocking( + Box::leak(Box::new(name.clone())), + "outbound", + async move { + let res = outbound::run(hb, evm, provider, coproc).await; + log::error!(target: "tesseract", "{name} terminated: {res:?}"); + } + .boxed(), + ); + } + } + + log::info!("Initialized relayer tasks"); + task_manager.future().await?; + + Ok(()) + } +} + +fn setup_logging() -> Result<(), anyhow::Error> { + use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + tracing_subscriber::registry().with(fmt::layer()).with(filter).init(); + + Ok(()) +} diff --git a/tesseract/relayer/src/config.rs b/tesseract/relayer/src/config.rs new file mode 100644 index 000000000..67468b8d2 --- /dev/null +++ b/tesseract/relayer/src/config.rs @@ -0,0 +1,135 @@ +// Copyright (C) Polytope Labs Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::anyhow; +use ismp::{consensus::StateMachineId, host::StateMachine}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tesseract_consensus::any::{AnyConfig, HyperbridgeHostConfig}; +use toml::Table; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RelayerConfig { + // -- Consensus (inbound) -- + pub challenge_period: Option, + pub maximum_update_intervals: Option>, + #[serde(default = "default_true")] + pub enable_hyperbridge_consensus: bool, + + // -- Messaging (inbound) -- + pub module_filter: Option>, + #[serde(default)] + pub minimum_profit_percentage: u32, + #[serde(default)] + pub delivery_endpoints: Vec, + pub fisherman: Option, + pub withdrawal_frequency: Option, + pub minimum_withdrawal_amount: Option, + pub unprofitable_retry_frequency: Option, + pub deliver_failed: Option, + pub disable_fee_accumulation: Option, + + // -- Outbound -- + pub indexer_db_url: Option, +} + +fn default_true() -> bool { + true +} + +impl Default for RelayerConfig { + fn default() -> Self { + Self { + challenge_period: None, + maximum_update_intervals: None, + enable_hyperbridge_consensus: true, + module_filter: None, + minimum_profit_percentage: 0, + delivery_endpoints: Vec::new(), + fisherman: None, + withdrawal_frequency: None, + minimum_withdrawal_amount: None, + unprofitable_retry_frequency: None, + deliver_failed: None, + disable_fee_accumulation: None, + indexer_db_url: None, + } + } +} + +impl From for tesseract_primitives::config::RelayerConfig { + fn from(config: RelayerConfig) -> Self { + tesseract_primitives::config::RelayerConfig { + module_filter: config.module_filter, + minimum_profit_percentage: config.minimum_profit_percentage, + withdrawal_frequency: config.withdrawal_frequency, + minimum_withdrawal_amount: config.minimum_withdrawal_amount, + unprofitable_retry_frequency: config.unprofitable_retry_frequency, + delivery_endpoints: config.delivery_endpoints, + deliver_failed: config.deliver_failed, + fisherman: config.fisherman, + disable_fee_accumulation: config.disable_fee_accumulation, + } + } +} + +/// Consolidated config reusing the consensus relayer's chain config types +/// plus messaging and outbound fields in the [relayer] section. +pub struct HyperbridgeConfig { + pub hyperbridge: HyperbridgeHostConfig, + pub chains: HashMap, + pub relayer: RelayerConfig, +} + +const HYPERBRIDGE: &str = "hyperbridge"; +const RELAYER: &str = "relayer"; + +impl HyperbridgeConfig { + pub async fn parse_conf(config: &str) -> Result { + let toml_str = tokio::fs::read_to_string(config) + .await + .map_err(|err| anyhow!("Error reading config file: {err:?}"))?; + let table = toml_str.parse::()?; + + if !table.contains_key(HYPERBRIDGE) || !table.contains_key(RELAYER) { + return Err(anyhow!("Missing [hyperbridge] or [relayer] section in config")); + } + + let hyperbridge: HyperbridgeHostConfig = + table.get(HYPERBRIDGE).cloned().expect("checked above").try_into()?; + + let relayer: RelayerConfig = + table.get(RELAYER).cloned().expect("checked above").try_into()?; + + let mut chains = HashMap::new(); + for (key, val) in &table { + if key != HYPERBRIDGE && key != RELAYER { + let any_conf: AnyConfig = val.clone().try_into()?; + chains.insert(any_conf.state_machine(), any_conf); + } + } + + Ok(Self { hyperbridge, chains, relayer }) + } + + /// Build the consensus relayer's config for create_client_map + pub fn consensus_config(&self) -> tesseract_consensus::config::HyperbridgeConfig { + tesseract_consensus::config::HyperbridgeConfig { + hyperbridge: self.hyperbridge.clone(), + chains: self.chains.clone(), + relayer: None, + } + } +} diff --git a/tesseract/relayer/src/main.rs b/tesseract/relayer/src/main.rs new file mode 100644 index 000000000..c535ff504 --- /dev/null +++ b/tesseract/relayer/src/main.rs @@ -0,0 +1,28 @@ +// Copyright (C) Polytope Labs Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod capture_proof; +mod cli; +mod config; +mod outbound; +mod provider; + +use clap::Parser; + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let cli = cli::Cli::parse(); + cli.run().await +} diff --git a/tesseract/relayer/src/outbound.rs b/tesseract/relayer/src/outbound.rs new file mode 100644 index 000000000..3553d09d9 --- /dev/null +++ b/tesseract/relayer/src/outbound.rs @@ -0,0 +1,200 @@ +// Copyright (C) Polytope Labs Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::StreamExt; +use ismp::{ + consensus::StateMachineHeight, + events::Event, + host::StateMachine, + messaging::{hash_request, hash_response, Message, Proof, RequestMessage, ResponseMessage}, + router::{Request, RequestResponse, Response}, +}; +use tesseract_primitives::{Hasher, IsmpProvider, Query}; + +use crate::provider::ConsensusProofProvider; + +pub async fn run( + hyperbridge: Arc, + evm_chain: Arc, + proof_provider: Arc, + coprocessor: StateMachine, +) -> Result<(), anyhow::Error> { + let hb_state_machine_id = hyperbridge.state_machine_id(); + let evm_state_machine = evm_chain.state_machine_id().state_id; + + let mut update_stream = + evm_chain.state_machine_update_notification(hb_state_machine_id).await?; + + let mut previous_height = evm_chain.initial_height(); + let hb_name = hyperbridge.name(); + let evm_name = evm_chain.name(); + + while let Some(item) = update_stream.next().await { + let update = match item { + Ok(update) => update, + Err(err) => { + tracing::error!("Outbound {hb_name}->{evm_name}: update stream error: {err:?}"); + continue; + }, + }; + + let events = match hyperbridge.query_ismp_events(previous_height, update.clone()).await { + Ok(events) => events, + Err(err) => { + tracing::error!("Outbound {hb_name}->{evm_name}: failed to query events: {err:?}"); + continue; + }, + }; + + previous_height = update.latest_height; + + let mut requests = Vec::new(); + let mut responses = Vec::new(); + + for event in events { + match event { + Event::PostRequest(req) if req.dest == evm_state_machine => { + requests.push(req); + }, + Event::PostResponse(res) if res.dest_chain() == evm_state_machine => { + responses.push(res); + }, + _ => {}, + } + } + + if requests.is_empty() && responses.is_empty() { + continue; + } + + tracing::info!( + "Outbound {hb_name}->{evm_name}: {} requests, {} responses at height {}", + requests.len(), + responses.len(), + update.latest_height, + ); + + let consensus_msg = match proof_provider.get_proof(update.latest_height).await { + Ok(Some(msg)) => msg, + Ok(None) => { + tracing::info!( + "Outbound {hb_name}->{evm_name}: no consensus proof for height {}, skipping", + update.latest_height, + ); + continue; + }, + Err(err) => { + tracing::error!( + "Outbound {hb_name}->{evm_name}: consensus proof query failed: {err:?}", + ); + continue; + }, + }; + + let height = StateMachineHeight { id: hb_state_machine_id, height: update.latest_height }; + let signer = hyperbridge.address(); + + let mut batch = vec![Message::Consensus(consensus_msg)]; + + if !requests.is_empty() { + let keys: Vec<_> = requests + .iter() + .map(|req| { + let commitment = hash_request::(&Request::Post(req.clone())); + Query { + source_chain: req.source, + dest_chain: req.dest, + nonce: req.nonce, + commitment, + } + }) + .collect(); + + match hyperbridge + .query_requests_proof(update.latest_height, keys, evm_state_machine) + .await + { + Ok(proof) => { + batch.push(Message::Request(RequestMessage { + requests, + proof: Proof { height, proof }, + signer: signer.clone(), + })); + }, + Err(err) => { + tracing::error!( + "Outbound {hb_name}->{evm_name}: request proof failed: {err:?}", + ); + }, + } + } + + if !responses.is_empty() { + let keys: Vec<_> = responses + .iter() + .map(|res| { + let commitment = hash_response::(&Response::Post(res.clone())); + Query { + source_chain: res.post.source, + dest_chain: res.post.dest, + nonce: res.post.nonce, + commitment, + } + }) + .collect(); + + match hyperbridge + .query_responses_proof(update.latest_height, keys, evm_state_machine) + .await + { + Ok(proof) => { + batch.push(Message::Response(ResponseMessage { + datagram: RequestResponse::Response( + responses.into_iter().map(Response::Post).collect(), + ), + proof: Proof { height, proof }, + signer: signer.clone(), + })); + }, + Err(err) => { + tracing::error!( + "Outbound {hb_name}->{evm_name}: response proof failed: {err:?}", + ); + }, + } + } + + // Only consensus message in the batch means all proof queries failed + if batch.len() <= 1 { + continue; + } + + match evm_chain.submit(batch, coprocessor).await { + Ok(_) => { + tracing::info!( + "Outbound {hb_name}->{evm_name}: batch submitted at height {}", + update.latest_height, + ); + }, + Err(err) => { + tracing::error!("Outbound {hb_name}->{evm_name}: batch submission failed: {err:?}",); + }, + } + } + + Err(anyhow::anyhow!("Outbound {hb_name}->{evm_name} update stream ended")) +} diff --git a/tesseract/relayer/src/provider.rs b/tesseract/relayer/src/provider.rs new file mode 100644 index 000000000..ed0148634 --- /dev/null +++ b/tesseract/relayer/src/provider.rs @@ -0,0 +1,56 @@ +// Copyright (C) Polytope Labs Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use codec::Decode; +use ismp::messaging::ConsensusMessage; +use proof_indexer::ProofIndexer; + +#[async_trait::async_trait] +pub trait ConsensusProofProvider: Send + Sync { + /// Returns a consensus proof that finalizes at least up to `target_height`, + /// or None if no suitable proof is available yet. + async fn get_proof( + &self, + target_height: u64, + ) -> Result, anyhow::Error>; +} + +/// V1: queries pre-generated ZK proofs from the indexer PostgreSQL. +pub struct IndexerProofProvider { + indexer: ProofIndexer, +} + +impl IndexerProofProvider { + pub fn new(indexer: ProofIndexer) -> Self { + Self { indexer } + } +} + +#[async_trait::async_trait] +impl ConsensusProofProvider for IndexerProofProvider { + async fn get_proof( + &self, + target_height: u64, + ) -> Result, anyhow::Error> { + let proof = self.indexer.latest_proof().await?; + match proof { + Some(row) if row.finalized_height >= target_height as i64 => { + let msg = ConsensusMessage::decode(&mut &row.consensus_proof[..])?; + Ok(Some(msg)) + }, + _ => Ok(None), + } + } +}