diff --git a/Cargo.lock b/Cargo.lock index 09f1a9c127..a4bf69a765 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5456,6 +5456,7 @@ dependencies = [ "rpc", "rstest", "serde", + "serde_with", "storage-lmdb", "subsystem", "tempfile", @@ -6136,11 +6137,13 @@ dependencies = [ "rpc", "rpc-description", "rstest", + "rstest_reuse", "serde", "serialization", "siphasher 1.0.2", "storage", "storage-inmemory", + "strum 0.26.3", "subsystem", "test-utils", "thiserror 1.0.69", diff --git a/build-tools/fork-detection/README.md b/build-tools/fork-detection/README.md index 8a764257e9..0537437681 100644 --- a/build-tools/fork-detection/README.md +++ b/build-tools/fork-detection/README.md @@ -31,23 +31,25 @@ Each attempt's directory has the following structure: - `node_log.txt` - the node's log. Some notes: -* Currently the script requires Python 3.13 to run, though we may lift this requirement later. -* The script can send an email when it detects an issue using the local SMTP server +- Currently the script requires Python 3.13 to run, though we may lift this requirement later. +- The script can send an email when it detects an issue using the local SMTP server (if you're on Linux, google for an SMTP Postfix tutorial to set it up). -* Even if the script finds a problem (e.g. a checkpoint mismatch), you're still likely +- Even if the script finds a problem (e.g. a checkpoint mismatch), you're still likely to end up being on the correct chain. To download the actual fork for further investigation you can initiate a separate full sync while using the node's option `--custom-checkpoints-csv-file` to override the correct checkpoints with the wrong ones. -* Once the fork has been downloaded, you'll want to examine the contents of its chainstate db. +- Once the fork has been downloaded, you'll want to examine the contents of its chainstate db. Currently we have the `chainstate-db-dumper` tool that can dump certain info about blocks to a CSV file (the most interesting part of it being the ids of pools that continue producing blocks on that fork). -* Once the fork has been investigated you can "permanently" ban the peers that have been sending it +- Once the fork has been investigated you can "permanently" ban the peers that have been sending it to you, to prevent it from being reported again and again. To do so, you can add their ip addresses to `permabanned_peers.txt` (one address per line, '#' starts a comment) in the script's working directory (it doesn't exist by default, so you'll have to create it first). Note that the file is checked on every iteration, so you can update it while the script is already running and it will come into effect when the next iteration starts. -* The script is likely to fail if a networking error occurs, e.g. if it can't query the API server. +- The script is likely to fail if a networking error occurs, e.g. if it can't query the API server. So, run it in a loop in a shell script (with some delay after each run, to prevent it from spamming - you with warning emails). \ No newline at end of file + you with warning emails). +- If you expect a split to already exist due to a hard fork (in which case reporting it would be useless), + use the `--min-peer-software-version` option to reject all nodes that have not been upgraded. diff --git a/build-tools/fork-detection/detector.py b/build-tools/fork-detection/detector.py index 0211575d55..fd6023a220 100644 --- a/build-tools/fork-detection/detector.py +++ b/build-tools/fork-detection/detector.py @@ -144,6 +144,10 @@ def __init__(self, args, email_sender): "--rpc-password", NODE_RPC_PWD, "--p2p-custom-disconnection-reason-for-banning", BAN_REASON_STRING ] + + if args.min_peer_software_version is not None: + self.node_cmd += ["--p2p-min-peer-software-version", args.min_peer_software_version] + log.info(f"Node run command: {self.node_cmd}") def run(self): @@ -549,6 +553,11 @@ def main(): help=("The from address for the notification email. " "If None, the --notification-email value will be used"), default=None) + parser.add_argument( + "--min-peer-software-version", + help=("The minimum peer software version, e.g. '1.2.0'. " + "Peers with versions below this one will be rejected and discouraged"), + default=None) args = parser.parse_args() email_sender = EmailSender( diff --git a/common/src/chain/config/builder.rs b/common/src/chain/config/builder.rs index ea9593e1e1..4a37543c95 100644 --- a/common/src/chain/config/builder.rs +++ b/common/src/chain/config/builder.rs @@ -385,7 +385,8 @@ impl Builder { dns_seeds: chain_type.dns_seeds(), predefined_peer_addresses: chain_type.predefined_peer_addresses(), default_rpc_port: chain_type.default_rpc_port(), - software_version: SemVer::try_from(env!("CARGO_PKG_VERSION")) + software_version: env!("CARGO_PKG_VERSION") + .parse() .expect("invalid CARGO_PKG_VERSION value"), max_block_header_size: super::MAX_BLOCK_HEADER_SIZE, max_block_size_with_standard_txs: super::MAX_BLOCK_TXS_SIZE, diff --git a/common/src/primitives/semver.rs b/common/src/primitives/semver.rs index 71cc414b9d..70a7ef5c2d 100644 --- a/common/src/primitives/semver.rs +++ b/common/src/primitives/semver.rs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; + use serde::Serialize; use serialization::{Decode, Encode}; @@ -33,16 +35,10 @@ impl SemVer { } } -impl From for String { - fn from(v: SemVer) -> String { - format!("{}.{}.{}", v.major, v.minor, v.patch) - } -} - -impl TryFrom<&str> for SemVer { - type Error = &'static str; +impl FromStr for SemVer { + type Err = &'static str; - fn try_from(v: &str) -> Result { + fn from_str(v: &str) -> Result { let split_version = v.split('.').collect::>(); if split_version.len() != 3 { return Err("Invalid version. Number of components is wrong."); @@ -61,20 +57,22 @@ impl TryFrom<&str> for SemVer { } } -impl TryFrom for SemVer { - type Error = &'static str; - - fn try_from(v: String) -> Result { - Self::try_from(v.as_str()) - } -} - impl std::fmt::Display for SemVer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}.{}.{}", self.major, self.minor, self.patch) } } +// TODO: this is redundant, but it's still used inside a macro in `regtest_chain_config_builder`. +// Refactor the macro and remove this. +impl TryFrom for SemVer { + type Error = ::Err; + + fn try_from(v: String) -> Result { + Self::from_str(v.as_str()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -84,84 +82,90 @@ mod tests { #[test] fn vertest_string() { let version = SemVer::new(1, 2, 3); - assert_eq!(String::from(version), "1.2.3"); + assert_eq!(version.to_string(), "1.2.3"); let version = SemVer::new(0xff, 0xff, 0xff); - assert_eq!(String::from(version), "255.255.255"); + assert_eq!(version.to_string(), "255.255.255"); let version = SemVer::new(0xff, 0xff, 0xffff); - assert_eq!(String::from(version), "255.255.65535"); + assert_eq!(version.to_string(), "255.255.65535"); let version = SemVer::new(1, 2, 0x500); - assert_eq!(String::from(version), "1.2.1280"); + assert_eq!(version.to_string(), "1.2.1280"); assert_eq!( - SemVer::try_from(" "), + SemVer::from_str(" "), Err("Invalid version. Number of components is wrong.") ); assert_eq!( - SemVer::try_from(""), + SemVer::from_str(""), Err("Invalid version. Number of components is wrong.") ); assert_eq!( - SemVer::try_from("1.2"), + SemVer::from_str("1.2"), Err("Invalid version. Number of components is wrong.") ); assert_eq!( - SemVer::try_from("1"), + SemVer::from_str("1"), Err("Invalid version. Number of components is wrong.") ); let version = "hello"; assert_eq!( - SemVer::try_from(version), + SemVer::from_str(version), Err("Invalid version. Number of components is wrong.") ); assert_eq!( - SemVer::try_from(version), + SemVer::from_str(version), Err("Invalid version. Number of components is wrong.") ); - let version = "1.2.3".to_string(); - assert_eq!(SemVer::try_from(version.clone()), Ok(SemVer::new(1, 2, 3))); - assert_eq!(SemVer::try_from(version), Ok(SemVer::new(1, 2, 3))); + let version = "1.2.3"; + assert_eq!(SemVer::from_str(version), Ok(SemVer::new(1, 2, 3))); + assert_eq!( + SemVer::try_from(version.to_owned()), + Ok(SemVer::new(1, 2, 3)) + ); let version = "255.255.255"; - assert_eq!(SemVer::try_from(version), Ok(SemVer::new(255, 255, 255))); - assert_eq!(SemVer::try_from(version), Ok(SemVer::new(255, 255, 255))); + assert_eq!(SemVer::from_str(version), Ok(SemVer::new(255, 255, 255))); + assert_eq!( + SemVer::try_from(version.to_owned()), + Ok(SemVer::new(255, 255, 255)) + ); - let version = "255.255.65535".to_string(); + let version = "255.255.65535"; + assert_eq!(SemVer::from_str(version), Ok(SemVer::new(255, 255, 65535))); assert_eq!( - SemVer::try_from(version.clone()), + SemVer::try_from(version.to_owned()), Ok(SemVer::new(255, 255, 65535)) ); - assert_eq!(SemVer::try_from(version), Ok(SemVer::new(255, 255, 65535))); let version = "255.255.65536"; assert_eq!( - SemVer::try_from(version), + SemVer::from_str(version), Err("Parsing SemVer component to integer failed") ); assert_eq!( - SemVer::try_from(version.to_string()), + SemVer::try_from(version.to_owned()), Err("Parsing SemVer component to integer failed") ); assert_eq!( - SemVer::try_from("1.2.a"), + SemVer::from_str("1.2.a"), Err("Parsing SemVer component to integer failed") ); assert_eq!( - SemVer::try_from("1.2."), + SemVer::from_str("1.2."), Err("Parsing SemVer component to integer failed") ); assert_eq!( - SemVer::try_from("1..3"), + SemVer::from_str("1..3"), Err("Parsing SemVer component to integer failed") ); } diff --git a/node-daemon/docs/RPC.md b/node-daemon/docs/RPC.md index 94f1f87dee..83f1ee2460 100644 --- a/node-daemon/docs/RPC.md +++ b/node-daemon/docs/RPC.md @@ -1071,8 +1071,7 @@ nothing Attempt to connect to a remote node (just once). -For persistent connections see `add_reserved_node` should be used. -Keep in mind that `add_reserved_node` works completely differently. +For persistent connections consider using `add_reserved_node`. Parameters: @@ -1087,7 +1086,10 @@ nothing ### Method `p2p_disconnect` -Disconnect peer, given its id. +Disconnect a peer given its id. + +If it was an outbound connection, the peer address will be removed from the peer database, +and if the connection was inbound, the address will be kept. Parameters: diff --git a/node-lib/Cargo.toml b/node-lib/Cargo.toml index 7a75a7c90a..679159a8bc 100644 --- a/node-lib/Cargo.toml +++ b/node-lib/Cargo.toml @@ -31,6 +31,7 @@ fs4.workspace = true jsonrpsee = { workspace = true, features = ["macros"] } paste.workspace = true serde = { workspace = true, features = ["derive"] } +serde_with.workspace = true thiserror.workspace = true tokio = { workspace = true, default-features = false } toml.workspace = true diff --git a/node-lib/src/config_files/mod.rs b/node-lib/src/config_files/mod.rs index 3ed9d884ce..885ac259fa 100644 --- a/node-lib/src/config_files/mod.rs +++ b/node-lib/src/config_files/mod.rs @@ -199,6 +199,7 @@ fn p2p_config(config: P2pConfigFile, options: &RunOptions) -> P2pConfigFile { node_type, force_dns_query_if_no_global_addresses_known, custom_disconnection_reason_for_banning, + min_peer_software_version, } = config; let networking_enabled = options.p2p_networking_enabled.or(networking_enabled); @@ -226,6 +227,8 @@ fn p2p_config(config: P2pConfigFile, options: &RunOptions) -> P2pConfigFile { .p2p_custom_disconnection_reason_for_banning .clone() .or(custom_disconnection_reason_for_banning); + let min_peer_software_version = + options.p2p_min_peer_software_version.or(min_peer_software_version); P2pConfigFile { networking_enabled, @@ -246,6 +249,7 @@ fn p2p_config(config: P2pConfigFile, options: &RunOptions) -> P2pConfigFile { node_type, force_dns_query_if_no_global_addresses_known, custom_disconnection_reason_for_banning, + min_peer_software_version, } } diff --git a/node-lib/src/config_files/p2p.rs b/node-lib/src/config_files/p2p.rs index 251fc86b92..34c9113111 100644 --- a/node-lib/src/config_files/p2p.rs +++ b/node-lib/src/config_files/p2p.rs @@ -20,9 +20,9 @@ use std::{ time::Duration, }; -use common::primitives::user_agent::mintlayer_core_user_agent; use serde::{Deserialize, Serialize}; +use common::primitives::{semver::SemVer, user_agent::mintlayer_core_user_agent}; use p2p::{ ban_config::BanConfig, config::{NodeType, P2pConfig}, @@ -61,6 +61,7 @@ impl FromStr for NodeTypeConfigFile { /// The p2p subsystem configuration. #[must_use] +#[serde_with::serde_as] #[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(deny_unknown_fields)] pub struct P2pConfigFile { @@ -102,6 +103,11 @@ pub struct P2pConfigFile { pub force_dns_query_if_no_global_addresses_known: Option, /// If set, this text will be sent to banned peers as part of the DisconnectionReason. pub custom_disconnection_reason_for_banning: Option, + /// If the peer's user agent is MintlayerCore (which is always true at the moment), + /// the connection will be rejected and the peer discouraged if the peer's software version + /// is less than the one specified. + #[serde_as(as = "Option")] + pub min_peer_software_version: Option, } impl From for P2pConfig { @@ -125,6 +131,7 @@ impl From for P2pConfig { node_type, force_dns_query_if_no_global_addresses_known, custom_disconnection_reason_for_banning, + min_peer_software_version, } = config_file; P2pConfig { @@ -179,6 +186,8 @@ impl From for P2pConfig { allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + + min_peer_software_version, }, protocol_config: Default::default(), peer_handshake_timeout: Default::default(), diff --git a/node-lib/src/options.rs b/node-lib/src/options.rs index 08245c1e4a..7878a37221 100644 --- a/node-lib/src/options.rs +++ b/node-lib/src/options.rs @@ -25,12 +25,15 @@ use std::{ use clap::{Args, CommandFactory, FromArgMatches, Parser, Subcommand}; use chainstate_launcher::ChainConfig; -use common::chain::{ - self, - config::{ - regtest_options::{regtest_chain_config_builder, ChainConfigOptions}, - ChainType, +use common::{ + chain::{ + self, + config::{ + regtest_options::{regtest_chain_config_builder, ChainConfigOptions}, + ChainType, + }, }, + primitives::semver::SemVer, }; use utils::{ clap_utils, default_data_dir::default_data_dir_for_chain, root_user::ForceRunAsRootOptions, @@ -376,6 +379,12 @@ pub struct RunOptions { #[clap(long, hide = true)] pub p2p_custom_disconnection_reason_for_banning: Option, + /// If the peer's user agent is MintlayerCore (which is always true at the moment), + /// the connection will be rejected and the peer discouraged if the peer's software version + /// is less than the one specified. + #[clap(long, hide = true)] + pub p2p_min_peer_software_version: Option, + /// A maximum tip age in seconds. /// /// The initial block download is finished if the difference between the current time and the @@ -485,6 +494,7 @@ mod tests { p2p_max_clock_diff: Default::default(), p2p_force_dns_query_if_no_global_addresses_known: Default::default(), p2p_custom_disconnection_reason_for_banning: Default::default(), + p2p_min_peer_software_version: Default::default(), max_tip_age: Default::default(), rpc_bind_address: Default::default(), rpc_enabled: Default::default(), diff --git a/node-lib/tests/cli.rs b/node-lib/tests/cli.rs index 580df0fcfc..aa9e630689 100644 --- a/node-lib/tests/cli.rs +++ b/node-lib/tests/cli.rs @@ -15,9 +15,9 @@ use std::{net::SocketAddr, num::NonZeroU64, path::Path, str::FromStr}; -use common::chain::config::create_testnet; use tempfile::TempDir; +use common::{chain::config::create_testnet, primitives::semver::SemVer}; use node_lib::{NodeConfigFile, NodeTypeConfigFile, RunOptions, StorageBackendConfigFile}; use utils_networking::IpOrSocketAddress; @@ -116,6 +116,11 @@ fn read_config_override_values() { let p2p_max_clock_diff = 15; let p2p_force_dns_query_if_no_global_addresses_known = true; let p2p_custom_disconnection_reason_for_banning = "foo".to_owned(); + let p2p_min_peer_software_version = SemVer { + major: 12, + minor: 34, + patch: 56, + }; let rpc_bind_address = "127.0.0.1:5432".parse().unwrap(); let backend_type = StorageBackendConfigFile::InMemory; let node_type = NodeTypeConfigFile::FullNode; @@ -155,9 +160,10 @@ fn read_config_override_values() { p2p_force_dns_query_if_no_global_addresses_known: Some( p2p_force_dns_query_if_no_global_addresses_known, ), - p2p_custom_disconnection_reason_for_banning: (Some( + p2p_custom_disconnection_reason_for_banning: Some( p2p_custom_disconnection_reason_for_banning.clone(), - )), + ), + p2p_min_peer_software_version: Some(p2p_min_peer_software_version), max_tip_age: Some(max_tip_age), rpc_bind_address: Some(rpc_bind_address), rpc_enabled: Some(true), @@ -291,6 +297,10 @@ fn read_config_override_values() { config.p2p.as_ref().unwrap().custom_disconnection_reason_for_banning, Some(p2p_custom_disconnection_reason_for_banning) ); + assert_eq!( + config.p2p.as_ref().unwrap().min_peer_software_version, + Some(p2p_min_peer_software_version) + ); assert_eq!( config.rpc.clone().unwrap().bind_address, diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 497997d29c..8c32d75084 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -39,6 +39,7 @@ once_cell.workspace = true parity-scale-codec.workspace = true serde.workspace = true siphasher.workspace = true +strum.workspace = true thiserror.workspace = true tokio = { workspace = true, default-features = false, features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "sync", "time"] } tokio-stream.workspace = true @@ -56,13 +57,14 @@ pos-accounting = { path = "../pos-accounting" } p2p-backend-test-suite = { path = "backend-test-suite" } p2p-test-utils = { path = "test-utils" } test-utils = { path = "../test-utils" } -tokio = { workspace = true, default-features = false, features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "sync", "time", "test-util"] } bytes.workspace = true criterion.workspace = true ctor.workspace = true num.workspace = true rstest.workspace = true +rstest_reuse.workspace = true +tokio = { workspace = true, default-features = false, features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "sync", "time", "test-util"] } [[test]] name = "backend_tcp" diff --git a/p2p/src/disconnection_reason.rs b/p2p/src/disconnection_reason.rs index 0de7fc5887..b134dbd02e 100644 --- a/p2p/src/disconnection_reason.rs +++ b/p2p/src/disconnection_reason.rs @@ -13,8 +13,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common::{chain::config::MagicBytes, primitives::time::Time}; - +use common::{ + chain::config::MagicBytes, + primitives::{semver::SemVer, time::Time}, +}; use networking::error::{MessageCodecError, NetworkingError}; use p2p_types::services::Services; @@ -109,6 +111,16 @@ pub enum DisconnectionReason { #[display("Your message size {actual_size} exceeded the maximum size {max_size}")] MessageTooLarge { actual_size: usize, max_size: usize }, + #[display( + "This node only accepts peers of version {} and higher, while your version is {}", + min_required_version, + peer_version + )] + PeerSoftwareVersionTooLow { + min_required_version: SemVer, + peer_version: SemVer, + }, + #[display("{_0}")] CustomMessage(String), } @@ -228,6 +240,13 @@ impl DisconnectionReason { needed_services: *needed_services, }), ConnectionValidationError::NetworkingDisabled => Some(Self::NetworkingDisabled), + ConnectionValidationError::MinPeerSoftwareVersionNotSatisfied { + min_version, + actual_version, + } => Some(Self::PeerSoftwareVersionTooLow { + min_required_version: *min_version, + peer_version: *actual_version, + }), }, } } diff --git a/p2p/src/error.rs b/p2p/src/error.rs index ef0a81734d..190fc7395d 100644 --- a/p2p/src/error.rs +++ b/p2p/src/error.rs @@ -18,7 +18,7 @@ use thiserror::Error; use chainstate::{ban_score::BanScore, ChainstateError}; use common::{ chain::{config::MagicBytes, Block, Transaction}, - primitives::{time::Time, Id}, + primitives::{semver::SemVer, time::Time, Id}, }; use mempool::error::{Error as MempoolError, MempoolBanScore}; use networking::error::{MessageCodecError, NetworkingError}; @@ -139,6 +139,11 @@ pub enum ConnectionValidationError { }, #[error("Networking disabled")] NetworkingDisabled, + #[error("Minimum peer software version not satisfied, min version = {min_version}, actual = {actual_version}")] + MinPeerSoftwareVersionNotSatisfied { + min_version: SemVer, + actual_version: SemVer, + }, } #[derive(Error, Debug, Clone, PartialEq, Eq)] @@ -236,7 +241,7 @@ impl BanScore for P2pError { actual_version: _, } => 0, P2pError::MempoolError(err) => err.mempool_ban_score(), - P2pError::ConnectionValidationFailed(_) => 0, + P2pError::ConnectionValidationFailed(err) => err.ban_score(), P2pError::SyncError(err) => err.ban_score(), } } @@ -300,6 +305,27 @@ impl BanScore for SyncError { } } +impl BanScore for ConnectionValidationError { + fn ban_score(&self) -> u32 { + match self { + // TODO: should UnsupportedProtocol and DifferentNetwork have 100 ban score instead? + ConnectionValidationError::UnsupportedProtocol { .. } => 0, + ConnectionValidationError::TimeDiff { .. } => 0, + ConnectionValidationError::DifferentNetwork { .. } => 0, + ConnectionValidationError::TooManyInboundPeersAndThisOneIsDiscouraged => 0, + ConnectionValidationError::TooManyInboundPeersAndCannotEvictAnyone => 0, + ConnectionValidationError::AddressBanned { .. } => 0, + ConnectionValidationError::AddressDiscouraged { .. } => 0, + ConnectionValidationError::NoCommonServices => 0, + ConnectionValidationError::InsufficientServices { .. } => 0, + ConnectionValidationError::NetworkingDisabled => 0, + // Note: the cause of this error is failure to satisfy `PeerManagerConfig::min_peer_software_version`, + // which by design should result in peer discouragement. + ConnectionValidationError::MinPeerSoftwareVersionNotSatisfied { .. } => 100, + } + } +} + impl TryAsRef for P2pError { fn try_as_ref(&self) -> Option<&storage::Error> { match self { diff --git a/p2p/src/interface/p2p_interface.rs b/p2p/src/interface/p2p_interface.rs index 0d3be7ded6..be5faf275d 100644 --- a/p2p/src/interface/p2p_interface.rs +++ b/p2p/src/interface/p2p_interface.rs @@ -29,6 +29,9 @@ pub trait P2pInterface: Send + Sync { async fn enable_networking(&mut self, enable: bool) -> crate::Result<()>; async fn connect(&mut self, addr: IpOrSocketAddress) -> crate::Result<()>; + // Note: at this moment this method will remove the peer address from the peerdb + // if the connection was outbound and keep it if the connection was inbound. + // TODO: revise this. async fn disconnect(&mut self, peer_id: PeerId) -> crate::Result<()>; async fn list_banned(&self) -> crate::Result>; diff --git a/p2p/src/peer_manager/config.rs b/p2p/src/peer_manager/config.rs index 6e65a27557..9d6d7a4b5b 100644 --- a/p2p/src/peer_manager/config.rs +++ b/p2p/src/peer_manager/config.rs @@ -15,6 +15,7 @@ use std::time::Duration; +use common::primitives::semver::SemVer; use utils::make_config_setting; use super::{ @@ -125,6 +126,10 @@ pub struct PeerManagerConfig { /// Peer db configuration. pub peerdb_config: PeerDbConfig, + + /// If this node's and the peer's user agent is the same, the connection will be rejected and + /// the peer discouraged if the peer's software version is less than the one specified. + pub min_peer_software_version: Option, } impl PeerManagerConfig { diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index f1f574d1bd..78adfc7354 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -107,6 +107,7 @@ const PEER_ADDRESS_RESEND_COUNT: usize = 2; const PEER_ADDRESSES_ROLLING_BLOOM_FILTER_SIZE: usize = 5000; const PEER_ADDRESSES_ROLLING_BLOOM_FPP: f64 = 0.001; +#[derive(Debug)] enum OutboundConnectType { Automatic { block_relay_only: bool, @@ -146,6 +147,7 @@ impl From<&OutboundConnectType> for PeerRole { } } +#[derive(Debug)] struct PendingConnect { outbound_connect_type: OutboundConnectType, } @@ -446,14 +448,14 @@ where } } - /// Adjust peer score + /// Adjust peer score. /// /// Discourage the peer if the score reaches the corresponding threshold. fn adjust_peer_score( &mut self, peer_id: PeerId, score: u32, - reason: &(impl std::fmt::Display + ?Sized), + adjustment_reason: &(impl std::fmt::Display + ?Sized), ) { let peer = match self.peers.get(&peer_id) { Some(peer) => peer, @@ -465,7 +467,7 @@ where "[peer id = {}] Ignoring peer score adjustment because the peer is whitelisted (adjustment: {}, reason: {})", peer_id, score, - reason + adjustment_reason ); return; } @@ -482,7 +484,7 @@ where peer_id, score, peer.score, - reason + adjustment_reason ); if let Some(o) = self.observer.as_mut() { @@ -495,55 +497,96 @@ where } } - /// Adjust peer score after a failed handshake. + /// Adjust peer score after a failed connection attempt; discourage the peer if the threshold + /// has been reached. /// - /// Note that currently intermediate scores are not stored in the peer db, so this call will - /// only make any effect if the passed score is bigger than the threshold. - fn adjust_peer_score_on_failed_handshake( + /// Note: + /// 1. Since intermediate scores are not stored in the peer db, this function will only + /// have any effect if the passed score is bigger than the threshold. + /// 2. When this function is called, it's likely that `PeerContext` hasn't even been created + /// for the peer yet. One implication of this is that the call to `discourage` will not + /// be able to disconnect the peer. + fn adjust_peer_score_on_failed_connection_attempt( &mut self, peer_address: SocketAddress, + peer_role: PeerRole, + failure_type: ConnectionFailureTypeForScoreAdjustment, score: u32, - reason: &(impl std::fmt::Display + ?Sized), + adjustment_reason: &(impl std::fmt::Display + ?Sized), ) { - let whitelisted_node = - self.pending_outbound_connects - .get(&peer_address) - .is_some_and(|pending_connect| { - self.is_whitelisted_node( - (&pending_connect.outbound_connect_type).into(), - &peer_address, - ) - }); - if whitelisted_node { + let context = match failure_type { + ConnectionFailureTypeForScoreAdjustment::FailedHandshake => "failed handshake", + ConnectionFailureTypeForScoreAdjustment::AcceptingConnectionFailed => { + "connection failure" + } + }; + + if score < *self.p2p_config.ban_config.discouragement_threshold { log::info!( concat!( - "Ignoring peer score adjustment on failed handshake for peer at address {} ", - "because the peer is whitelisted (adjustment: {}, reason: {})" + "Ignoring peer score adjustment on {} for peer at address {} ", + "because the adjustment is below the threshold and will not have any effect ", + "(adjustment: {}, reason: {}, threshold: {})" ), + context, peer_address, score, - reason, + adjustment_reason, + *self.p2p_config.ban_config.discouragement_threshold ); + return; } - log::info!( - "Adjusting peer score of a peer at address {} by {} on failed handshake, reason: {}", - peer_address, - score, - reason - ); + if self.is_whitelisted_node(peer_role, &peer_address) { + log::info!( + concat!( + "Ignoring peer score adjustment on {} for peer at address {} ", + "because the peer is whitelisted (adjustment: {}, reason: {})" + ), + context, + peer_address, + score, + adjustment_reason, + ); + } else { + log::info!( + "Adjusting peer score of a peer at address {} by {} on {}, reason: {}", + peer_address, + score, + context, + adjustment_reason + ); - if let Some(o) = self.observer.as_mut() { - o.on_peer_ban_score_adjustment(peer_address, score); - } + if let Some(o) = self.observer.as_mut() { + o.on_peer_ban_score_adjustment(peer_address, score); + } - if score >= *self.p2p_config.ban_config.discouragement_threshold { - let address = peer_address.as_bannable(); - self.discourage(address); + self.discourage(peer_address.as_bannable()); } } + fn adjust_peer_score_on_failed_handshake( + &mut self, + peer_address: SocketAddress, + score: u32, + adjustment_reason: &(impl std::fmt::Display + ?Sized), + ) { + let peer_role = self + .pending_outbound_connects + .get(&peer_address) + .map_or(PeerRole::Inbound, |pending_connect| { + (&pending_connect.outbound_connect_type).into() + }); + self.adjust_peer_score_on_failed_connection_attempt( + peer_address, + peer_role, + ConnectionFailureTypeForScoreAdjustment::FailedHandshake, + score, + adjustment_reason, + ); + } + fn bannable_peers_for_addr(&self, address: BannableAddress) -> Vec { self.peers .values() @@ -582,6 +625,7 @@ where } } + /// Discourage the specified address and disconnect all corresponding peers. fn discourage(&mut self, address: BannableAddress) { let to_disconnect = self.bannable_peers_for_addr(address); @@ -652,7 +696,7 @@ where Ok(()) } - /// Initiate a new outbound connection or send an error via `response_sender` if it's not possible. + /// Initiate a new outbound connection. fn connect(&mut self, address: SocketAddress, outbound_connect_type: OutboundConnectType) { let block_relay_only = outbound_connect_type.block_relay_only(); @@ -719,8 +763,6 @@ where /// The decision to close the connection is made either by the user via RPC /// or by the [`PeerManager::heartbeat()`] function which has decided to cull /// this connection in favor of another potential connection. - /// - /// If the `response` channel is not empty, the peer is marked as disconnected by the user and no reconnect attempts are made. fn disconnect( &mut self, peer_id: PeerId, @@ -796,6 +838,18 @@ where P2pError::ConnectionValidationFailed(ConnectionValidationError::NoCommonServices), ); + if let Some(min_version) = self.p2p_config.peer_manager_config.min_peer_software_version { + if info.user_agent == self.p2p_config.user_agent && info.software_version < min_version + { + return Err(P2pError::ConnectionValidationFailed( + ConnectionValidationError::MinPeerSoftwareVersionNotSatisfied { + min_version, + actual_version: info.software_version, + }, + )); + } + } + match peer_role { PeerRole::Inbound => { // If the maximum number of inbound connections is reached, @@ -1115,23 +1169,28 @@ where if let Err(accept_err) = &accept_res { log::debug!("Connection rejected for peer {peer_id}: {accept_err}"); + self.adjust_peer_score_on_failed_connection_attempt( + peer_address, + peer_role, + ConnectionFailureTypeForScoreAdjustment::AcceptingConnectionFailed, + accept_err.ban_score(), + &accept_err, + ); + let disconnection_reason = DisconnectionReason::from_error(accept_err, &self.p2p_config); - // Disconnect should always succeed unless the node is shutting down. - // But at this moment there is a possibility for backend to be shut down - // before peer manager, at least in tests, so we don't "expect" and log - // the error instead. - // TODO: investigate why peer manager can be shut down before the backend (it shouldn't - // be this way according to an earlier comment). - // TODO: we probably shouldn't use "log::error" if the error happened during - // shutdown. Probably, peer manager should accept the "shutdown" flag, like other - // p2p components do, and ignore/log::info the errors it it's set (this also applies - // to other places, search for "log::error" in this file). + // Disconnect the peer. Note that we can't call `self.disconnect` here because + // that function assumes that `PeerContext` has already been created, which is + // unlikely in the case when `try_accept_connection` has failed. + // Also note that we do the disconnection unconditionally, even if the peer has been + // discouraged. This is again due to the fact that `PeerContext` has likely not been + // created for the peer yet, in which case `discourage` couldn't have performed + // the disconnection. let disconnect_result = self.peer_connectivity_handle.disconnect(peer_id, disconnection_reason); if let Err(err) = disconnect_result { - log::error!("Disconnect failed unexpectedly: {err:?}"); + log::warn!("Disconnecting peer {peer_address} failed: {err}"); } if peer_role.is_outbound() { @@ -2338,6 +2397,12 @@ where } } +#[derive(Copy, Clone, Debug)] +enum ConnectionFailureTypeForScoreAdjustment { + FailedHandshake, + AcceptingConnectionFailed, +} + #[cfg(test)] mod tests; diff --git a/p2p/src/peer_manager/peerdb/address_data/tests.rs b/p2p/src/peer_manager/peerdb/address_data/tests.rs index 127c555d26..a4e119a04a 100644 --- a/p2p/src/peer_manager/peerdb/address_data/tests.rs +++ b/p2p/src/peer_manager/peerdb/address_data/tests.rs @@ -13,12 +13,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use rstest::rstest; + use randomness::{ distributions::{Distribution, WeightedIndex}, rngs::StepRng, Rng, }; -use rstest::rstest; use test_utils::random::{make_seedable_rng, Seed}; use super::*; diff --git a/p2p/src/peer_manager/peers_eviction/tests.rs b/p2p/src/peer_manager/peers_eviction/tests.rs index f4703bdfb0..de86853c28 100644 --- a/p2p/src/peer_manager/peers_eviction/tests.rs +++ b/p2p/src/peer_manager/peers_eviction/tests.rs @@ -803,6 +803,7 @@ mod outbound { force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), } } @@ -833,6 +834,7 @@ mod outbound { force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), } } @@ -860,6 +862,7 @@ mod outbound { force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), } } } diff --git a/p2p/src/peer_manager/tests/ban.rs b/p2p/src/peer_manager/tests/ban.rs index 8d04330405..dd8069b493 100644 --- a/p2p/src/peer_manager/tests/ban.rs +++ b/p2p/src/peer_manager/tests/ban.rs @@ -329,6 +329,7 @@ async fn no_outgoing_connection_to_banned_peer(#[case] seed: Seed) { force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), })); let time_getter = BasicTestTimeGetter::new(); diff --git a/p2p/src/peer_manager/tests/connections.rs b/p2p/src/peer_manager/tests/connections.rs index 0f6c7f6d44..1976f6147f 100644 --- a/p2p/src/peer_manager/tests/connections.rs +++ b/p2p/src/peer_manager/tests/connections.rs @@ -1113,6 +1113,7 @@ where feeler_connections_interval: Default::default(), force_dns_query_if_no_global_addresses_known: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), }; // Start the first peer manager @@ -1320,6 +1321,7 @@ async fn discovered_node_2_groups() { feeler_connections_interval: Default::default(), force_dns_query_if_no_global_addresses_known: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), }; // Start the first peer manager @@ -1490,6 +1492,7 @@ async fn discovered_node_separate_groups() { feeler_connections_interval: Default::default(), force_dns_query_if_no_global_addresses_known: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), }; // Start the first peer manager @@ -1890,6 +1893,7 @@ mod feeler_connections_test_utils { main_loop_tick_interval: Default::default(), force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), + min_peer_software_version: Default::default(), }, // Disable pings to simplify the test. ping_check_period: Duration::ZERO.into(), @@ -1976,6 +1980,7 @@ async fn reject_connection_to_existing_ip(#[case] seed: Seed) { force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), }, // Disable pings so that they don't interfere with the testing logic. @@ -2147,6 +2152,7 @@ async fn feeler_connection_to_ip_address_of_inbound_peer(#[case] seed: Seed) { force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), })); let time_getter = BasicTestTimeGetter::new(); diff --git a/p2p/src/peer_manager/tests/discouragement.rs b/p2p/src/peer_manager/tests/discouragement.rs index ae95f07a18..887da14c58 100644 --- a/p2p/src/peer_manager/tests/discouragement.rs +++ b/p2p/src/peer_manager/tests/discouragement.rs @@ -205,6 +205,7 @@ async fn dont_reject_incoming_connection_from_discouraged_peer_if_limit_not_reac force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), })); let time_getter = BasicTestTimeGetter::new(); @@ -280,6 +281,7 @@ async fn reject_incoming_connection_from_discouraged_peer_if_limit_reached(#[cas force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), })); let time_getter = BasicTestTimeGetter::new(); @@ -401,6 +403,7 @@ async fn no_outgoing_connection_to_discouraged_peer(#[case] seed: Seed) { force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), })); let time_getter = BasicTestTimeGetter::new(); diff --git a/p2p/src/peer_manager/tests/eviction.rs b/p2p/src/peer_manager/tests/eviction.rs index 72ca2292a9..9399d3c906 100644 --- a/p2p/src/peer_manager/tests/eviction.rs +++ b/p2p/src/peer_manager/tests/eviction.rs @@ -122,6 +122,7 @@ mod dont_evict_if_blocks_in_flight { feeler_connections_interval: Default::default(), force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), + min_peer_software_version: Default::default(), }, ping_check_period: Duration::ZERO.into(), diff --git a/p2p/src/rpc.rs b/p2p/src/rpc.rs index e563904974..baa33469f7 100644 --- a/p2p/src/rpc.rs +++ b/p2p/src/rpc.rs @@ -33,12 +33,14 @@ trait P2pRpc { /// Attempt to connect to a remote node (just once). /// - /// For persistent connections see `add_reserved_node` should be used. - /// Keep in mind that `add_reserved_node` works completely differently. + /// For persistent connections consider using `add_reserved_node`. #[method(name = "connect")] async fn connect(&self, addr: IpOrSocketAddress) -> RpcResult<()>; - /// Disconnect peer, given its id. + /// Disconnect a peer given its id. + /// + /// If it was an outbound connection, the peer address will be removed from the peer database, + /// and if the connection was inbound, the address will be kept. #[method(name = "disconnect")] async fn disconnect(&self, peer_id: PeerId) -> RpcResult<()>; diff --git a/p2p/src/test_helpers.rs b/p2p/src/test_helpers.rs index 1812a8c49f..b94c9f4eb0 100644 --- a/p2p/src/test_helpers.rs +++ b/p2p/src/test_helpers.rs @@ -272,6 +272,7 @@ pub fn test_p2p_config_with_peer_db_config(peerdb_config: PeerDbConfig) -> P2pCo feeler_connections_interval: Default::default(), force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), + min_peer_software_version: Default::default(), }) } @@ -321,6 +322,7 @@ pub fn test_peer_mgr_config_with_no_auto_outbound_connections() -> PeerManagerCo feeler_connections_interval: Default::default(), allow_same_ip_connections: Default::default(), force_dns_query_if_no_global_addresses_known: Default::default(), + min_peer_software_version: Default::default(), } } diff --git a/p2p/src/tests/correct_handshake.rs b/p2p/src/tests/correct_handshake.rs index 17a523998b..b660ef48bb 100644 --- a/p2p/src/tests/correct_handshake.rs +++ b/p2p/src/tests/correct_handshake.rs @@ -85,9 +85,9 @@ where let connect_result = connect_result_receiver.await.unwrap(); assert!(connect_result.is_ok()); - // Check that the connection is still up and we can receive the next message (we don't care - // which one it is though). - let _msg = msg_stream.recv().await.unwrap(); + // Check that the connection is still up and we can receive the next message. + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::AddrListRequest(_)); // This is mainly needed to ensure that the corresponding events, if any, reach // peer manager before we end the test. @@ -169,9 +169,9 @@ where let msg = msg_stream.recv().await.unwrap(); assert_matches!(msg, Message::Handshake(HandshakeMessage::HelloAck { .. })); - // Check that the connection is still up and we can receive the next message (we don't care - // which one it is though). - let _msg = msg_stream.recv().await.unwrap(); + // Check that the connection is still up and we can receive the next message. + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::HeaderListRequest(_)); // This is mainly needed to ensure that the corresponding events, if any, reach // peer manager before we end the test. diff --git a/p2p/src/tests/helpers/test_node.rs b/p2p/src/tests/helpers/test_node.rs index 7eabce09fe..2312a57c17 100644 --- a/p2p/src/tests/helpers/test_node.rs +++ b/p2p/src/tests/helpers/test_node.rs @@ -262,7 +262,10 @@ where &self.chainstate } - // Note: the returned receiver will become readable only after the handshake is finished. + // Note: + // 1) This will initiate a connection of the type OutboundConnectType::Manual, for which peer + // score adjustments are ignored. + // 2) The returned receiver will become readable only after the handshake is finished. pub fn start_connecting( &self, address: SocketAddress, diff --git a/p2p/src/tests/incorrect_handshake.rs b/p2p/src/tests/incorrect_handshake.rs index 3e2800bff2..40fe1bb8c3 100644 --- a/p2p/src/tests/incorrect_handshake.rs +++ b/p2p/src/tests/incorrect_handshake.rs @@ -26,11 +26,12 @@ use test_utils::{assert_matches, BasicTestTimeGetter}; use crate::{ message::HeaderList, net::default_backend::types::{HandshakeMessage, Message}, + peer_manager, test_helpers::{test_p2p_config, TEST_PROTOCOL_VERSION}, tests::helpers::TestNode, }; -async fn incorrect_handshake_outgoing() +async fn incorrect_handshake_outgoing_manual() where TTM: TestTransportMaker, TTM::Transport: TransportSocket, @@ -97,20 +98,102 @@ where #[tracing::instrument] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn incorrect_handshake_outgoing_tcp() { - run_with_timeout(incorrect_handshake_outgoing::()).await; +async fn incorrect_handshake_outgoing_manual_tcp() { + run_with_timeout(incorrect_handshake_outgoing_manual::()).await; } #[tracing::instrument] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn incorrect_handshake_outgoing_channels() { - run_with_timeout(incorrect_handshake_outgoing::()).await; +async fn incorrect_handshake_outgoing_manual_channels() { + run_with_timeout(incorrect_handshake_outgoing_manual::()).await; } #[tracing::instrument] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn incorrect_handshake_outgoing_noise() { - run_with_timeout(incorrect_handshake_outgoing::()).await; +async fn incorrect_handshake_outgoing_manual_noise() { + run_with_timeout(incorrect_handshake_outgoing_manual::()).await; +} + +async fn incorrect_handshake_outgoing_auto() +where + TTM: TestTransportMaker, + TTM::Transport: TransportSocket, +{ + let time_getter = BasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let p2p_config = Arc::new(test_p2p_config()); + + let mut test_node = TestNode::::start( + true, + time_getter.clone(), + Arc::clone(&chain_config), + ChainstateConfig::new(), + Arc::clone(&p2p_config), + TTM::make_transport(), + TTM::make_address().into(), + TEST_PROTOCOL_VERSION.into(), + None, + ) + .await; + + let transport = TTM::make_transport(); + let mut listener = transport.bind(vec![TTM::make_address()]).await.unwrap(); + + let address = listener.local_addresses().unwrap()[0].into(); + + test_node.discover_peer(address).await; + // Advance time to allow a heartbeat to happen, where the new connection will be attempted. + time_getter.advance_time(peer_manager::HEARTBEAT_INTERVAL_MAX); + + let (stream, _) = listener.accept().await.unwrap(); + + let mut msg_stream = + BufferedTranscoder::new(stream, Some(*p2p_config.protocol_config.max_message_size)); + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::Hello { .. })); + + // Send some other message instead of HelloAck. + msg_stream.send(Message::HeaderList(HeaderList::new(Vec::new()))).await.unwrap(); + + // The connection should be closed. + msg_stream.recv().await.unwrap_err(); + + // Unlike incorrect_handshake_outgoing_manual, here the peer score will be adjusted and + // the peer discouraged. + + // This is mainly needed to ensure that the corresponding event reaches peer manager before + // we end the test. + test_node.wait_for_ban_score_adjustment().await; + + // The peer address should be discouraged. + let test_node_remnants = test_node.join().await; + // TODO: check the actual address instead of the count, same in other places. + assert!(test_node_remnants.peer_mgr.peerdb().list_discouraged().count() > 0); + + // For consistency, check that we don't ban automatically. + assert_eq!( + test_node_remnants.peer_mgr.peerdb().list_banned().count(), + 0 + ); +} + +#[tracing::instrument] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn incorrect_handshake_outgoing_auto_tcp() { + run_with_timeout(incorrect_handshake_outgoing_auto::()).await; +} + +#[tracing::instrument] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn incorrect_handshake_outgoing_auto_channels() { + run_with_timeout(incorrect_handshake_outgoing_auto::()).await; +} + +#[tracing::instrument] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn incorrect_handshake_outgoing_auto_noise() { + run_with_timeout(incorrect_handshake_outgoing_auto::()).await; } async fn incorrect_handshake_incoming() diff --git a/p2p/src/tests/min_peer_software_version.rs b/p2p/src/tests/min_peer_software_version.rs new file mode 100644 index 0000000000..7e62edea1a --- /dev/null +++ b/p2p/src/tests/min_peer_software_version.rs @@ -0,0 +1,475 @@ +// Copyright (c) 2026 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use itertools::Itertools as _; +use rstest::rstest; + +use chainstate::ChainstateConfig; +use common::primitives::{ + semver::SemVer, + user_agent::{mintlayer_core_user_agent, UserAgent}, +}; +use networking::{ + test_helpers::{TestTransportChannel, TestTransportMaker}, + transport::{BufferedTranscoder, ConnectedSocketInfo as _, TransportListener, TransportSocket}, +}; +use p2p_test_utils::run_with_timeout; +use p2p_types::socket_address::SocketAddress; +use test_utils::{assert_matches, BasicTestTimeGetter}; + +use crate::{ + config::P2pConfig, + net::default_backend::types::{HandshakeMessage, Message, P2pTimestamp}, + peer_manager::{self, config::PeerManagerConfig, PeerManagerInterface}, + test_helpers::TEST_PROTOCOL_VERSION, + tests::helpers::TestNode, +}; + +type Transport = ::Transport; + +#[derive(Debug)] +struct TestParams { + min_version: Option, + peer_version: SemVer, + peer_user_agent: UserAgent, + node_user_agent: UserAgent, + accept: bool, +} + +#[rstest_reuse::template] +fn test_params_list( + #[values( + // No min version => the connection should be accepted no matter what. + TestParams { + min_version: None, + peer_version: "0.0.0".parse().unwrap(), + peer_user_agent: mintlayer_core_user_agent(), + node_user_agent: mintlayer_core_user_agent(), + accept: true + }, + // If user agents differ, the min version doesn't matter. + TestParams { + min_version: Some("1.1.1".parse().unwrap()), + peer_version: "0.0.0".parse().unwrap(), + peer_user_agent: "Agent1".try_into().unwrap(), + node_user_agent: "Agent2".try_into().unwrap(), + accept: true + }, + // Same agent, same version => accept + TestParams { + min_version: Some("12.34.56".parse().unwrap()), + peer_version: "12.34.56".parse().unwrap(), + peer_user_agent: "SameAgent".try_into().unwrap(), + node_user_agent: "SameAgent".try_into().unwrap(), + accept: true + }, + // Same agent, bigger peer version => accept + TestParams { + min_version: Some("12.34.56".parse().unwrap()), + peer_version: "12.34.57".parse().unwrap(), + peer_user_agent: "SameAgent".try_into().unwrap(), + node_user_agent: "SameAgent".try_into().unwrap(), + accept: true + }, + // Same agent, smaller peer version => reject + TestParams { + min_version: Some("12.34.56".parse().unwrap()), + peer_version: "12.34.55".parse().unwrap(), + peer_user_agent: "SameAgent".try_into().unwrap(), + node_user_agent: "SameAgent".try_into().unwrap(), + accept: false + }, + )] + test_params: TestParams, +) { +} + +fn make_p2p_config(test_params: &TestParams) -> P2pConfig { + let peer_manager_config = PeerManagerConfig { + min_peer_software_version: test_params.min_version, + + max_inbound_connections: Default::default(), + preserved_inbound_count_address_group: Default::default(), + preserved_inbound_count_ping: Default::default(), + preserved_inbound_count_new_blocks: Default::default(), + preserved_inbound_count_new_transactions: Default::default(), + outbound_full_relay_count: Default::default(), + outbound_full_relay_extra_count: Default::default(), + outbound_block_relay_count: Default::default(), + outbound_block_relay_extra_count: Default::default(), + outbound_block_relay_connection_min_age: Default::default(), + outbound_full_relay_connection_min_age: Default::default(), + stale_tip_time_diff: Default::default(), + main_loop_tick_interval: Default::default(), + enable_feeler_connections: Default::default(), + feeler_connections_interval: Default::default(), + force_dns_query_if_no_global_addresses_known: Default::default(), + allow_same_ip_connections: Default::default(), + peerdb_config: Default::default(), + }; + + P2pConfig { + peer_manager_config, + user_agent: test_params.node_user_agent.clone(), + + bind_addresses: Default::default(), + socks5_proxy: Default::default(), + disable_noise: Default::default(), + boot_nodes: Default::default(), + reserved_nodes: Default::default(), + whitelisted_addresses: Default::default(), + ban_config: Default::default(), + outbound_connection_timeout: Default::default(), + ping_check_period: Default::default(), + ping_timeout: Default::default(), + peer_handshake_timeout: Default::default(), + max_clock_diff: Default::default(), + node_type: Default::default(), + allow_discover_private_ips: Default::default(), + sync_stalling_timeout: Default::default(), + protocol_config: Default::default(), + custom_disconnection_reason_for_banning: Default::default(), + } +} + +#[tracing::instrument] +#[rstest_reuse::apply(test_params_list)] +#[rstest] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn outbound_manual_connection(test_params: TestParams) { + run_with_timeout(async { + let time_getter = BasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let p2p_config = Arc::new(make_p2p_config(&test_params)); + + let mut test_node = TestNode::::start( + true, + time_getter.clone(), + Arc::clone(&chain_config), + ChainstateConfig::new(), + Arc::clone(&p2p_config), + TestTransportChannel::make_transport(), + TestTransportChannel::make_address().into(), + TEST_PROTOCOL_VERSION.into(), + None, + ) + .await; + + let transport = TestTransportChannel::make_transport(); + let mut listener = + transport.bind(vec![TestTransportChannel::make_address()]).await.unwrap(); + + let connect_result_receiver = + test_node.start_connecting(listener.local_addresses().unwrap()[0].into()); + + let (stream, _) = listener.accept().await.unwrap(); + + let mut msg_stream = + BufferedTranscoder::new(stream, Some(*p2p_config.protocol_config.max_message_size)); + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::Hello { .. })); + + msg_stream + .send(Message::Handshake(HandshakeMessage::HelloAck { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: test_params.peer_user_agent, + software_version: test_params.peer_version, + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), + })) + .await + .unwrap(); + + // Note: since it's a manual outbound connection, the peer should not be discouraged even + // if the connection has been rejected. + + let test_node_remnants = if test_params.accept { + let connect_result = connect_result_receiver.await.unwrap(); + assert!(connect_result.is_ok()); + + // Check that the connection is still up and we can receive the next message. + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::AddrListRequest(_)); + + // This is mainly needed to ensure that the corresponding events, if any, reach + // peer manager before we end the test. + test_node.expect_no_punishment().await; + + let test_node_remnants = test_node.join().await; + + // PeerContext still exists and has zero score. + assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1); + let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score; + assert_eq!(peer_score, 0); + + test_node_remnants + } else { + // connect_result should indicate a failed connection + let connect_result = connect_result_receiver.await.unwrap(); + assert!(connect_result.is_err()); + + // The node should have sent WillDisconnect. + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::WillDisconnect(_)); + + // Then the connection should have been closed. + msg_stream.recv().await.unwrap_err(); + + // This is mainly needed to ensure that the corresponding events, if any, reach + // peer manager before we end the test. + test_node.expect_no_punishment().await; + + let test_node_remnants = test_node.join().await; + + // No PeerContext, since the connection has been closed. + assert_eq!(test_node_remnants.peer_mgr.peers().len(), 0); + + test_node_remnants + }; + + // No discouragements in the peer db. + let discouragements_count = test_node_remnants.peer_mgr.peerdb().list_discouraged().count(); + assert_eq!(discouragements_count, 0); + + // Just in case, also check that the peer hasn't been banned. + let bans_count = test_node_remnants.peer_mgr.peerdb().list_banned().count(); + assert_eq!(bans_count, 0); + }) + .await; +} + +#[tracing::instrument] +#[rstest_reuse::apply(test_params_list)] +#[rstest] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn outbound_auto_connection(test_params: TestParams) { + run_with_timeout(async { + let time_getter = BasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let p2p_config = Arc::new(make_p2p_config(&test_params)); + + let mut test_node = TestNode::::start( + true, + time_getter.clone(), + Arc::clone(&chain_config), + ChainstateConfig::new(), + Arc::clone(&p2p_config), + TestTransportChannel::make_transport(), + TestTransportChannel::make_address().into(), + TEST_PROTOCOL_VERSION.into(), + None, + ) + .await; + + let transport = TestTransportChannel::make_transport(); + let mut listener = + transport.bind(vec![TestTransportChannel::make_address()]).await.unwrap(); + + let peer_address = listener.local_addresses().unwrap()[0].into(); + test_node.discover_peer(peer_address).await; + // Advance time to allow a heartbeat to happen, where the new connection will be attempted. + time_getter.advance_time(peer_manager::HEARTBEAT_INTERVAL_MAX); + + let (stream, _) = listener.accept().await.unwrap(); + + let mut msg_stream = + BufferedTranscoder::new(stream, Some(*p2p_config.protocol_config.max_message_size)); + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::Hello { .. })); + + msg_stream + .send(Message::Handshake(HandshakeMessage::HelloAck { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: test_params.peer_user_agent, + software_version: test_params.peer_version, + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), + })) + .await + .unwrap(); + + let test_node_remnants = if test_params.accept { + // Check that the connection is still up and we can receive the next message. + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::AddrListRequest(_)); + + // This is mainly needed to ensure that the corresponding events, if any, reach + // peer manager before we end the test. + test_node.expect_no_punishment().await; + + let test_node_remnants = test_node.join().await; + + // No discouragements in the peer db. + let discouragements_count = + test_node_remnants.peer_mgr.peerdb().list_discouraged().count(); + assert_eq!(discouragements_count, 0); + + // PeerContext still exists and has zero score. + assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1); + let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score; + assert_eq!(peer_score, 0); + + test_node_remnants + } else { + // The node should have sent WillDisconnect. + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::WillDisconnect(_)); + + // Then the connection should have been closed. + msg_stream.recv().await.unwrap_err(); + + // Unlike in the outbound manual connection case, here the peer score should be adjusted + // and the peer discouraged. + + // Note: not using wait_for_ban_score_adjustment, because the score adjustment is + // initiated in the peer manager directly in this case, so it should happen immediately. + + let test_node_remnants = test_node.join().await; + + // The peer address should be discouraged. + let discouraged_addrs = test_node_remnants + .peer_mgr + .peerdb() + .list_discouraged() + .map(|(addr, _)| addr) + .collect_vec(); + assert_eq!(&discouraged_addrs, &[peer_address.as_bannable()]); + + // No PeerContext, since the connection has been closed. + assert_eq!(test_node_remnants.peer_mgr.peers().len(), 0); + + test_node_remnants + }; + + // Just in case, check that the peer hasn't been banned. + let bans_count = test_node_remnants.peer_mgr.peerdb().list_banned().count(); + assert_eq!(bans_count, 0); + }) + .await; +} + +#[tracing::instrument] +#[rstest_reuse::apply(test_params_list)] +#[rstest] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn inbound_connection(test_params: TestParams) { + run_with_timeout(async { + let time_getter = BasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let p2p_config = Arc::new(make_p2p_config(&test_params)); + + let mut test_node = TestNode::::start( + true, + time_getter.clone(), + Arc::clone(&chain_config), + ChainstateConfig::new(), + Arc::clone(&p2p_config), + TestTransportChannel::make_transport(), + TestTransportChannel::make_address().into(), + TEST_PROTOCOL_VERSION.into(), + None, + ) + .await; + + let transport = TestTransportChannel::make_transport(); + + let stream = transport.connect(test_node.local_address().socket_addr()).await.unwrap(); + let peer_address = SocketAddress::new(stream.local_address().unwrap()); + + let mut msg_stream = + BufferedTranscoder::new(stream, Some(*p2p_config.protocol_config.max_message_size)); + + msg_stream + .send(Message::Handshake(HandshakeMessage::Hello { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: test_params.peer_user_agent, + software_version: test_params.peer_version, + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), + handshake_nonce: 0, + })) + .await + .unwrap(); + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::HelloAck { .. })); + + let test_node_remnants = if test_params.accept { + // Check that the connection is still up and we can receive the next message. + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::HeaderListRequest(_)); + + // This is mainly needed to ensure that the corresponding events, if any, reach + // peer manager before we end the test. + test_node.expect_no_punishment().await; + + let test_node_remnants = test_node.join().await; + + // No discouragements in the peer db. + let discouragements_count = + test_node_remnants.peer_mgr.peerdb().list_discouraged().count(); + assert_eq!(discouragements_count, 0); + + // PeerContext still exists and has zero score. + assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1); + let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score; + assert_eq!(peer_score, 0); + + test_node_remnants + } else { + // The node should have sent WillDisconnect. + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::WillDisconnect(_)); + + // Then the connection should have been closed. + msg_stream.recv().await.unwrap_err(); + + // Note: not using wait_for_ban_score_adjustment, because the score adjustment is + // initiated in the peer manager directly in this case, so it should happen immediately. + + let test_node_remnants = test_node.join().await; + + // The peer address should be discouraged. + let discouraged_addrs = test_node_remnants + .peer_mgr + .peerdb() + .list_discouraged() + .map(|(addr, _)| addr) + .collect_vec(); + assert_eq!(&discouraged_addrs, &[peer_address.as_bannable()]); + + // No PeerContext, since the connection has been closed. + assert_eq!(test_node_remnants.peer_mgr.peers().len(), 0); + + test_node_remnants + }; + + // Just in case, check that the peer hasn't been banned. + let bans_count = test_node_remnants.peer_mgr.peerdb().list_banned().count(); + assert_eq!(bans_count, 0); + }) + .await; +} diff --git a/p2p/src/tests/misbehavior.rs b/p2p/src/tests/misbehavior.rs index a7739289f4..83a063337f 100644 --- a/p2p/src/tests/misbehavior.rs +++ b/p2p/src/tests/misbehavior.rs @@ -78,9 +78,9 @@ where let msg = msg_stream.recv().await.unwrap(); assert_matches!(msg, Message::Handshake(HandshakeMessage::HelloAck { .. })); - // Check that the connection is still up and we can receive the next message (we don't care - // which one it is though). - let _msg = msg_stream.recv().await.unwrap(); + // Check that the connection is still up and we can receive the next message. + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::HeaderListRequest(_)); // Send an unexpected Hello msg_stream diff --git a/p2p/src/tests/mod.rs b/p2p/src/tests/mod.rs index 765507e292..d7529d27d5 100644 --- a/p2p/src/tests/mod.rs +++ b/p2p/src/tests/mod.rs @@ -21,6 +21,7 @@ mod correct_handshake; mod disable_networking; mod disconnect_on_will_disconnect_msg; mod incorrect_handshake; +mod min_peer_software_version; mod misbehavior; mod peer_discovery_on_stale_tip; mod same_handshake_nonce; diff --git a/p2p/src/tests/peer_discovery_on_stale_tip.rs b/p2p/src/tests/peer_discovery_on_stale_tip.rs index 9a1ddd306b..ffc5999643 100644 --- a/p2p/src/tests/peer_discovery_on_stale_tip.rs +++ b/p2p/src/tests/peer_discovery_on_stale_tip.rs @@ -122,8 +122,8 @@ async fn peer_discovery_on_stale_tip_impl( feeler_connections_interval: Default::default(), force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), - peerdb_config: Default::default(), + min_peer_software_version: Default::default(), }; let p2p_config = Arc::new(make_p2p_config(peer_mgr_config)); @@ -313,6 +313,7 @@ async fn new_full_relay_connections_on_stale_tip_impl(seed: Seed) { force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), }; let main_node_p2p_config = Arc::new(make_p2p_config(main_node_peer_mgr_config)); @@ -340,6 +341,7 @@ async fn new_full_relay_connections_on_stale_tip_impl(seed: Seed) { force_dns_query_if_no_global_addresses_known: Default::default(), allow_same_ip_connections: Default::default(), peerdb_config: Default::default(), + min_peer_software_version: Default::default(), }; let extra_nodes_p2p_config = Arc::new(make_p2p_config(extra_nodes_peer_mgr_config)); diff --git a/wallet/wallet-rpc-daemon/docs/RPC.md b/wallet/wallet-rpc-daemon/docs/RPC.md index 59d489593a..feeff2f66d 100644 --- a/wallet/wallet-rpc-daemon/docs/RPC.md +++ b/wallet/wallet-rpc-daemon/docs/RPC.md @@ -2837,7 +2837,12 @@ nothing ### Method `node_disconnect_peer` -Disconnect a remote peer in the node +Disconnect a remote peer in the node. + +Note that this will also cause the node to forget the peer address if the connection was +an outbound one, which will effectively prevent the node from reconnecting to it until +the address is re-discovered. For a more refined control over connections to a specific +address, consider banning/unbanning it explicitly. Parameters: diff --git a/wallet/wallet-rpc-lib/src/rpc/interface.rs b/wallet/wallet-rpc-lib/src/rpc/interface.rs index a583ddb8ce..9aaed224b4 100644 --- a/wallet/wallet-rpc-lib/src/rpc/interface.rs +++ b/wallet/wallet-rpc-lib/src/rpc/interface.rs @@ -921,7 +921,12 @@ trait WalletRpc { #[method(name = "node_connect_to_peer")] async fn connect_to_peer(&self, address: String) -> rpc::RpcResult<()>; - /// Disconnect a remote peer in the node + /// Disconnect a remote peer in the node. + /// + /// Note that this will also cause the node to forget the peer address if the connection was + /// an outbound one, which will effectively prevent the node from reconnecting to it until + /// the address is re-discovered. For a more refined control over connections to a specific + /// address, consider banning/unbanning it explicitly. #[method(name = "node_disconnect_peer")] async fn disconnect_peer(&self, peer_id: u64) -> rpc::RpcResult<()>;