diff --git a/CHANGELOG.md b/CHANGELOG.md index b1179fb..ad8eba6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,10 @@ The following emojis are used to highlight certain changes: ### Fixed +- ✨ Some faulty third-party DHT peers never expire old observed addresses. Peers with dynamic ports (e.g. UPnP on consumer routers) or changing IPs (roaming, ISP changes) accumulate dead addresses over time. A provider record with 60 stale addresses before the one that works makes the peer effectively unreachable, degrading routing results for everyone downstream of someguy. This release adds two layers of stale address filtering: + - **Passive filtering** (fast, inline): after a successful connection, someguy remembers the working address and strips addresses on the same IP and transport that have a different (outdated) port. + - **Active probing** (async, non-blocking, controlled by `SOMEGUY_CACHED_ADDR_BOOK_STALE_PROBING`): on first encounter, when a peer's address set looks suspicious (multiple ports per IP, or more than 3 IPs per address family), each unique address is probed in the background with an ephemeral libp2p handshake. Records that don't need probing stream through immediately; probed results appear at the end of the response once the handshakes complete. If every probe fails (peer offline), all addresses are returned unchanged (fail-open). + ### Security ## [v0.11.1] diff --git a/addr_filter.go b/addr_filter.go new file mode 100644 index 0000000..103a3fe --- /dev/null +++ b/addr_filter.go @@ -0,0 +1,192 @@ +// addr_filter.go provides passive stale-address filtering and detection +// heuristics for the active probing layer (see addr_prober.go). +// +// Problem: some DHT server implementations never expire old observed +// addresses for a peer. Peers with dynamic ports (e.g. UPnP on consumer +// routers) or changing IPs (roaming, ISP changes) accumulate dead addresses +// over time. A provider record with 60 dead port addresses before the one +// that works makes the peer effectively unreachable. +// +// Passive filtering (filterStalePortAddrs): when someguy has previously +// connected to a peer, it remembers the working address. On subsequent +// lookups, addresses on the same IP and layer-4 protocol but with a +// different port are stripped out. This is fast and runs inline. +// +// Detection (needsProbing): when no known-good address exists (first +// encounter), this heuristic checks whether the address set looks +// suspicious -- multiple ports on the same (IP, L4), or multiple IPs +// within the same address family. If so, the record is handed to the +// async probing layer (probeFilterIter in server_routers.go). +package main + +import ( + "strconv" + + "github.com/ipfs/boxo/routing/http/types" + ma "github.com/multiformats/go-multiaddr" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var staleAddrsFilteredCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "stale_addrs_filtered", + Namespace: name, + Subsystem: "addr_filter", + Help: "Number of stale addresses filtered from responses (same IP, different port from last known-good connection)", +}) + +// addrTransportKey groups multiaddrs by IP address and layer-4 protocol. +// Multiaddrs sharing the same key but differing only in port are +// candidates for stale address filtering. +type addrTransportKey struct { + ip string // e.g. "209.222.4.177" or "2001:db8::1" + l4Code int // ma.P_TCP or ma.P_UDP +} + +// extractAddrTransportKey returns the IP, layer-4 protocol, and port from a +// multiaddr. Returns false for relay (circuit), HTTP, and DNS addresses, or +// multiaddrs without a standard IP + transport structure. +func extractAddrTransportKey(addr ma.Multiaddr) (key addrTransportKey, port int, ok bool) { + // skip relay addresses: the IP/port belongs to the relay, not the peer + if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil { + return addrTransportKey{}, 0, false + } + + // skip HTTP addresses: trustless gateway, not a libp2p peer + if _, err := addr.ValueForProtocol(ma.P_HTTP); err == nil { + return addrTransportKey{}, 0, false + } + + if v, err := addr.ValueForProtocol(ma.P_IP4); err == nil { + key.ip = v + } else if v, err := addr.ValueForProtocol(ma.P_IP6); err == nil { + key.ip = v + } else { + return addrTransportKey{}, 0, false + } + + if v, err := addr.ValueForProtocol(ma.P_TCP); err == nil { + key.l4Code = ma.P_TCP + port, _ = strconv.Atoi(v) + ok = true + } else if v, err := addr.ValueForProtocol(ma.P_UDP); err == nil { + key.l4Code = ma.P_UDP + port, _ = strconv.Atoi(v) + ok = true + } + return +} + +// filterStalePortAddrs removes multiaddrs that share the same (IP, layer-4 +// protocol) as connectedAddr but have a different port. These are likely +// stale port forwards from old NAT mappings. +// +// Addrs on different IPs, different L4 protocols, or unparseable addrs +// are kept unchanged. +func filterStalePortAddrs(addrs []types.Multiaddr, connectedAddr ma.Multiaddr) []types.Multiaddr { + if connectedAddr == nil || len(addrs) == 0 { + return addrs + } + + goodKey, goodPort, ok := extractAddrTransportKey(connectedAddr) + if !ok { + return addrs + } + + result := make([]types.Multiaddr, 0, len(addrs)) + var filtered int + + for _, addr := range addrs { + key, port, ok := extractAddrTransportKey(addr.Multiaddr) + if !ok || key != goodKey { + result = append(result, addr) + continue + } + if port == goodPort { + result = append(result, addr) + } else { + filtered++ + } + } + + if filtered > 0 { + staleAddrsFilteredCounter.Add(float64(filtered)) + } + return result +} + +// needsProbing returns true when the addr set shows signs of stale addresses: +// - multi-port: any (IP, L4) group has more than one distinct port +// - multi-IP: any address family (v4 or v6) has more than one distinct IP +func needsProbing(addrs []types.Multiaddr) bool { + type ipL4 struct { + ip string + l4Code int + } + + ports := make(map[ipL4]map[int]struct{}) + v4IPs := make(map[string]struct{}) + v6IPs := make(map[string]struct{}) + + for _, addr := range addrs { + key, port, ok := extractAddrTransportKey(addr.Multiaddr) + if !ok { + continue + } + + k := ipL4(key) + if ports[k] == nil { + ports[k] = make(map[int]struct{}) + } + ports[k][port] = struct{}{} + + // track distinct IPs per address family + if _, err := addr.Multiaddr.ValueForProtocol(ma.P_IP4); err == nil { + v4IPs[key.ip] = struct{}{} + } else if _, err := addr.Multiaddr.ValueForProtocol(ma.P_IP6); err == nil { + v6IPs[key.ip] = struct{}{} + } + } + + // multi-port: any (IP, L4) has >1 port + for _, ps := range ports { + if len(ps) > 1 { + return true + } + } + + // multi-IP: same address family has many distinct IPs. + // 2-3 IPs is normal (dual WAN, cloud instances with public + VPC), + // but 4+ within a single family suggests stale addrs from ISP/roaming changes not being expired by some poorly written third-party DHT peers. + if len(v4IPs) > 3 || len(v6IPs) > 3 { + return true + } + + return false +} + +// findStalePortAddrs returns multiaddrs from allAddrs that share the same +// (IP, layer-4 protocol) as connectedAddr but have a different port. +// Used for cleaning up stale entries from the addr book cache. +func findStalePortAddrs(allAddrs []ma.Multiaddr, connectedAddr ma.Multiaddr) []ma.Multiaddr { + if connectedAddr == nil || len(allAddrs) == 0 { + return nil + } + + goodKey, goodPort, ok := extractAddrTransportKey(connectedAddr) + if !ok { + return nil + } + + var stale []ma.Multiaddr + for _, addr := range allAddrs { + key, port, ok := extractAddrTransportKey(addr) + if !ok || key != goodKey { + continue + } + if port != goodPort { + stale = append(stale, addr) + } + } + return stale +} diff --git a/addr_filter_test.go b/addr_filter_test.go new file mode 100644 index 0000000..87de0aa --- /dev/null +++ b/addr_filter_test.go @@ -0,0 +1,346 @@ +package main + +import ( + "testing" + + "github.com/ipfs/boxo/routing/http/types" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func mustMA(t *testing.T, s string) ma.Multiaddr { + t.Helper() + addr, err := ma.NewMultiaddr(s) + require.NoError(t, err) + return addr +} + +func toTypesAddrs(t *testing.T, strs ...string) []types.Multiaddr { + t.Helper() + result := make([]types.Multiaddr, len(strs)) + for i, s := range strs { + result[i] = types.Multiaddr{Multiaddr: mustMA(t, s)} + } + return result +} + +func typesAddrStrings(addrs []types.Multiaddr) []string { + out := make([]string, len(addrs)) + for i, a := range addrs { + out[i] = a.Multiaddr.String() + } + return out +} + +func maStrings(addrs []ma.Multiaddr) []string { + out := make([]string, len(addrs)) + for i, a := range addrs { + out[i] = a.String() + } + return out +} + +func TestExtractAddrTransportKey(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + addr string + wantKey addrTransportKey + wantPort int + wantOK bool + }{ + { + name: "ip4 tcp", + addr: "/ip4/209.222.4.177/tcp/4001", + wantKey: addrTransportKey{ip: "209.222.4.177", l4Code: ma.P_TCP}, + wantPort: 4001, + wantOK: true, + }, + { + name: "ip4 udp quic-v1", + addr: "/ip4/209.222.4.177/udp/4001/quic-v1", + wantKey: addrTransportKey{ip: "209.222.4.177", l4Code: ma.P_UDP}, + wantPort: 4001, + wantOK: true, + }, + { + name: "ip4 udp quic-v1 webtransport", + addr: "/ip4/209.222.4.177/udp/4001/quic-v1/webtransport", + wantKey: addrTransportKey{ip: "209.222.4.177", l4Code: ma.P_UDP}, + wantPort: 4001, + wantOK: true, + }, + { + name: "ip6 tcp", + addr: "/ip6/2001:db8::1/tcp/4001", + wantKey: addrTransportKey{ip: "2001:db8::1", l4Code: ma.P_TCP}, + wantPort: 4001, + wantOK: true, + }, + { + name: "ip6 udp quic-v1", + addr: "/ip6/2604:1380:4601:f600::5/udp/4001/quic-v1", + wantKey: addrTransportKey{ip: "2604:1380:4601:f600::5", l4Code: ma.P_UDP}, + wantPort: 4001, + wantOK: true, + }, + { + name: "circuit relay addr is skipped", + addr: "/ip4/1.2.3.4/tcp/4001/p2p/12D3KooWCZ67sU8oCvKd82Y6c9NgpqgoZYuZEUcg4upHCjK3n1aj/p2p-circuit", + wantOK: false, + }, + { + name: "dns addr without IP is skipped", + addr: "/dns4/example.com/tcp/443", + wantOK: false, + }, + { + name: "http addr is skipped", + addr: "/ip4/209.222.4.177/tcp/443/tls/http", + wantOK: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + addr := mustMA(t, tt.addr) + key, port, ok := extractAddrTransportKey(addr) + assert.Equal(t, tt.wantOK, ok, "ok mismatch") + if ok { + assert.Equal(t, tt.wantKey, key, "key mismatch") + assert.Equal(t, tt.wantPort, port, "port mismatch") + } + }) + } +} + +func TestFilterStalePortAddrs(t *testing.T) { + t.Parallel() + + t.Run("filters stale UDP ports on same IP", func(t *testing.T) { + addrs := toTypesAddrs(t, + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/209.222.4.177/udp/1282/quic-v1", + "/ip4/209.222.4.177/udp/61078/quic-v1", + "/ip4/209.222.4.177/udp/4001/quic-v1/webtransport", + "/ip4/209.222.4.177/udp/1282/quic-v1/webtransport", + ) + connectedAddr := mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1") + result := filterStalePortAddrs(addrs, connectedAddr) + + got := typesAddrStrings(result) + assert.ElementsMatch(t, []string{ + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/209.222.4.177/udp/4001/quic-v1/webtransport", + }, got) + }) + + t.Run("does not touch TCP when connected via UDP", func(t *testing.T) { + addrs := toTypesAddrs(t, + "/ip4/209.222.4.177/tcp/4001", + "/ip4/209.222.4.177/tcp/61078", + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/209.222.4.177/udp/1282/quic-v1", + ) + connectedAddr := mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1") + result := filterStalePortAddrs(addrs, connectedAddr) + + got := typesAddrStrings(result) + assert.ElementsMatch(t, []string{ + "/ip4/209.222.4.177/tcp/4001", + "/ip4/209.222.4.177/tcp/61078", + "/ip4/209.222.4.177/udp/4001/quic-v1", + }, got) + }) + + t.Run("does not touch different IPs", func(t *testing.T) { + addrs := toTypesAddrs(t, + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/209.222.4.177/udp/1282/quic-v1", + "/ip4/10.0.0.1/udp/5555/quic-v1", + "/ip6/2001:db8::1/udp/4001/quic-v1", + ) + connectedAddr := mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1") + result := filterStalePortAddrs(addrs, connectedAddr) + + got := typesAddrStrings(result) + assert.ElementsMatch(t, []string{ + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/10.0.0.1/udp/5555/quic-v1", + "/ip6/2001:db8::1/udp/4001/quic-v1", + }, got) + }) + + t.Run("keeps relay addrs unchanged", func(t *testing.T) { + addrs := toTypesAddrs(t, + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/209.222.4.177/udp/1282/quic-v1", + "/ip4/1.2.3.4/tcp/4001/p2p/12D3KooWCZ67sU8oCvKd82Y6c9NgpqgoZYuZEUcg4upHCjK3n1aj/p2p-circuit", + ) + connectedAddr := mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1") + result := filterStalePortAddrs(addrs, connectedAddr) + + got := typesAddrStrings(result) + assert.ElementsMatch(t, []string{ + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/1.2.3.4/tcp/4001/p2p/12D3KooWCZ67sU8oCvKd82Y6c9NgpqgoZYuZEUcg4upHCjK3n1aj/p2p-circuit", + }, got) + }) + + t.Run("no-op when connectedAddr is nil", func(t *testing.T) { + addrs := toTypesAddrs(t, + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/209.222.4.177/udp/1282/quic-v1", + ) + result := filterStalePortAddrs(addrs, nil) + assert.Equal(t, addrs, result) + }) + + t.Run("no-op when empty addrs", func(t *testing.T) { + connectedAddr := mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1") + result := filterStalePortAddrs(nil, connectedAddr) + assert.Nil(t, result) + }) +} + +func TestNeedsProbing(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + addrs []string + want bool + }{ + { + name: "multi-port on same IP and L4 triggers probing", + addrs: []string{ + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/209.222.4.177/udp/1282/quic-v1", + }, + want: true, + }, + { + name: "2 v4 IPs does not trigger probing (normal multi-homed)", + addrs: []string{ + "/ip4/209.222.4.177/tcp/4001", + "/ip4/1.2.3.4/tcp/4001", + }, + want: false, + }, + { + name: "3 v4 IPs does not trigger probing", + addrs: []string{ + "/ip4/209.222.4.177/tcp/4001", + "/ip4/1.2.3.4/tcp/4001", + "/ip4/10.20.30.40/tcp/4001", + }, + want: false, + }, + { + name: "4 v4 IPs triggers probing", + addrs: []string{ + "/ip4/209.222.4.177/tcp/4001", + "/ip4/1.2.3.4/tcp/4001", + "/ip4/10.20.30.40/tcp/4001", + "/ip4/50.60.70.80/tcp/4001", + }, + want: true, + }, + { + name: "4 v6 IPs triggers probing", + addrs: []string{ + "/ip6/2001:db8::1/tcp/4001", + "/ip6/2001:db8::2/tcp/4001", + "/ip6/2001:db8::3/tcp/4001", + "/ip6/2001:db8::4/tcp/4001", + }, + want: true, + }, + { + name: "single port and IP does not trigger probing", + addrs: []string{ + "/ip4/209.222.4.177/udp/4001/quic-v1", + "/ip4/209.222.4.177/udp/4001/quic-v1/webtransport", + "/ip4/209.222.4.177/tcp/4001", + }, + want: false, + }, + { + name: "dual-stack same port does not trigger probing", + addrs: []string{ + "/ip4/209.222.4.177/tcp/4001", + "/ip6/2001:db8::1/tcp/4001", + }, + want: false, + }, + { + name: "http addrs are ignored for probing decision", + addrs: []string{ + "/ip4/209.222.4.177/tcp/443/tls/http", + "/ip4/209.222.4.177/tcp/4001", + }, + want: false, + }, + { + name: "empty addrs", + addrs: nil, + want: false, + }, + { + name: "relay-only addrs do not trigger probing", + addrs: []string{ + "/ip4/1.2.3.4/tcp/4001/p2p/12D3KooWCZ67sU8oCvKd82Y6c9NgpqgoZYuZEUcg4upHCjK3n1aj/p2p-circuit", + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + addrs := toTypesAddrs(t, tt.addrs...) + got := needsProbing(addrs) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFindStalePortAddrs(t *testing.T) { + t.Parallel() + + t.Run("finds stale addrs on same IP and L4", func(t *testing.T) { + allAddrs := []ma.Multiaddr{ + mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1"), + mustMA(t, "/ip4/209.222.4.177/udp/1282/quic-v1"), + mustMA(t, "/ip4/209.222.4.177/udp/61078/quic-v1"), + mustMA(t, "/ip4/209.222.4.177/tcp/4001"), + } + connectedAddr := mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1") + stale := findStalePortAddrs(allAddrs, connectedAddr) + + got := maStrings(stale) + assert.ElementsMatch(t, []string{ + "/ip4/209.222.4.177/udp/1282/quic-v1", + "/ip4/209.222.4.177/udp/61078/quic-v1", + }, got) + }) + + t.Run("returns nil when no stale addrs", func(t *testing.T) { + allAddrs := []ma.Multiaddr{ + mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1"), + mustMA(t, "/ip4/209.222.4.177/tcp/4001"), + } + connectedAddr := mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1") + stale := findStalePortAddrs(allAddrs, connectedAddr) + assert.Nil(t, stale) + }) + + t.Run("returns nil when connectedAddr is nil", func(t *testing.T) { + allAddrs := []ma.Multiaddr{ + mustMA(t, "/ip4/209.222.4.177/udp/4001/quic-v1"), + } + stale := findStalePortAddrs(allAddrs, nil) + assert.Nil(t, stale) + }) +} diff --git a/addr_prober.go b/addr_prober.go new file mode 100644 index 0000000..6bad7fd --- /dev/null +++ b/addr_prober.go @@ -0,0 +1,233 @@ +package main + +import ( + "context" + "sync" + "time" + + "github.com/ipfs/boxo/routing/http/types" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + probeAddrAttemptsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "probe_addr_attempts", + Namespace: name, + Subsystem: "addr_prober", + Help: "Number of per-addr probe attempts by result (alive/dead)", + }, []string{"result"}) + + probeAddrDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "probe_addr_duration_seconds", + Namespace: name, + Subsystem: "addr_prober", + Help: "Duration of individual addr probe operations in seconds", + Buckets: []float64{0.1, 0.25, 0.5, 1, 2, 3, 5}, + }) +) + +// addrProber probes individual multiaddrs using a pool of ephemeral libp2p +// hosts. Each host has its own peerstore; by adding a single addr to an +// empty peerstore before calling host.Connect, we ensure only that addr +// is dialed (same technique as vole's identify command). A full libp2p +// handshake proves the peer is real, not just an open port. +// +// The pool (buffered channel) allows concurrent probes for the same peer: +// each goroutine takes a host, probes one addr, and returns the host. +// With pool size 10 and 20 probe targets, probing completes in ~2 rounds. +// Alive ports respond in <500ms; dead ports hit the timeout (default 5s). +// +// This is called from probeFilterIter.dispatchProbe, which runs in +// background goroutines so the streaming response is not blocked. +type addrProber struct { + hostPool chan host.Host // buffered channel used as a pool + timeout time.Duration +} + +func newAddrProber(poolSize int, timeout time.Duration) (*addrProber, error) { + pool := make(chan host.Host, poolSize) + for range poolSize { + h, err := libp2p.New(libp2p.NoListenAddrs) + if err != nil { + // close any hosts already created + close(pool) + for h := range pool { + h.Close() + } + return nil, err + } + pool <- h + } + return &addrProber{ + hostPool: pool, + timeout: timeout, + }, nil +} + +// probeAddr tests whether a single multiaddr is reachable for the given peer +// using a full libp2p handshake via an ephemeral host from the pool. +func (p *addrProber) probeAddr(ctx context.Context, pid peer.ID, addr ma.Multiaddr) bool { + // take a host from the pool + var h host.Host + select { + case h = <-p.hostPool: + case <-ctx.Done(): + return false + } + defer func() { p.hostPool <- h }() + + // clear any previous state for this peer + h.Peerstore().ClearAddrs(pid) + h.Peerstore().AddAddr(pid, addr, time.Minute) + + ctx, cancel := context.WithTimeout(ctx, p.timeout) + defer cancel() + + start := time.Now() + err := h.Connect(ctx, peer.AddrInfo{ID: pid, Addrs: []ma.Multiaddr{addr}}) + probeAddrDurationHistogram.Observe(time.Since(start).Seconds()) + + if err != nil { + probeAddrAttemptsCounter.WithLabelValues("dead").Inc() + return false + } + + // close connection after successful probe + _ = h.Network().ClosePeer(pid) + probeAddrAttemptsCounter.WithLabelValues("alive").Inc() + return true +} + +// addrGroup represents a set of multiaddrs that share the same (IP, port, L4) key. +// We only probe one representative addr per group and apply the result to all. +type addrGroup struct { + representative ma.Multiaddr // shortest/simplest addr in the group + indices []int // indices into the original addrs slice +} + +// probeAddrs probes addrs for a peer, deduplicating by (IP, port, L4). +// +// Multiple multiaddrs can share the same underlying port, e.g. +// /ip4/x/udp/4001/quic-v1 and /ip4/x/udp/4001/quic-v1/webtransport +// both use UDP:4001. We group by (IP, port, L4), probe once per group +// using the shortest multiaddr as representative, and apply the result +// to all members of the group. +// +// Addrs that can't be probed (relay, HTTP, DNS) are kept unchanged. +// If ALL probes fail, the peer is likely offline and we return all addrs +// unchanged (fail-open) so downstream clients can still attempt connection. +func (p *addrProber) probeAddrs(ctx context.Context, pid peer.ID, addrs []types.Multiaddr) []types.Multiaddr { + if len(addrs) == 0 { + return addrs + } + + // group addrs by (IP, port, L4) for deduplication + type groupKey struct { + addrTransportKey + port int + } + + groups := make(map[groupKey]*addrGroup) + var unprobable []int // indices of addrs we can't/shouldn't probe + + for i, addr := range addrs { + key, port, ok := extractAddrTransportKey(addr.Multiaddr) + if !ok { + unprobable = append(unprobable, i) + continue + } + + gk := groupKey{key, port} + g, exists := groups[gk] + if !exists { + g = &addrGroup{ + representative: addr.Multiaddr, + } + groups[gk] = g + } + g.indices = append(g.indices, i) + + // prefer shorter multiaddr as representative (simpler protocol stack) + if len(addr.Multiaddr.Bytes()) < len(g.representative.Bytes()) { + g.representative = addr.Multiaddr + } + } + + if len(groups) == 0 { + return addrs + } + + // probe each unique (IP, port, L4) concurrently + type probeResult struct { + gk groupKey + alive bool + } + + results := make(chan probeResult, len(groups)) + var wg sync.WaitGroup + + for gk, g := range groups { + wg.Go(func() { + alive := p.probeAddr(ctx, pid, g.representative) + results <- probeResult{gk: gk, alive: alive} + }) + } + + go func() { + wg.Wait() + close(results) + }() + + aliveGroups := make(map[groupKey]bool, len(groups)) + aliveCount := 0 + for r := range results { + aliveGroups[r.gk] = r.alive + if r.alive { + aliveCount++ + } + } + + // fail-open: if all probes failed, peer is likely offline; return all addrs unchanged + if aliveCount == 0 { + logger.Debugw("all probes failed, returning all addrs (fail-open)", + "peer", pid, "probed", len(groups)) + return addrs + } + + // build result: keep alive addrs + unprobable addrs, drop dead + result := make([]types.Multiaddr, 0, len(addrs)) + for _, i := range unprobable { + result = append(result, addrs[i]) + } + + var filtered int + for gk, g := range groups { + if aliveGroups[gk] { + for _, i := range g.indices { + result = append(result, addrs[i]) + } + } else { + filtered += len(g.indices) + } + } + + if filtered > 0 { + logger.Debugw("probed and filtered dead addrs", + "peer", pid, "alive", aliveCount, "dead", len(groups)-aliveCount, "filtered_addrs", filtered) + staleAddrsFilteredCounter.Add(float64(filtered)) + } + + return result +} + +func (p *addrProber) close() { + close(p.hostPool) + for h := range p.hostPool { + h.Close() + } +} diff --git a/addr_prober_test.go b/addr_prober_test.go new file mode 100644 index 0000000..73329fc --- /dev/null +++ b/addr_prober_test.go @@ -0,0 +1,182 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/boxo/routing/http/types" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProbeAddr(t *testing.T) { + t.Parallel() + + // create a target libp2p host that listens on a local TCP port + target, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + defer target.Close() + + targetAddr := target.Addrs()[0] + targetID := target.ID() + + prober, err := newAddrProber(2, 5*time.Second) + require.NoError(t, err) + defer prober.close() + + t.Run("probe succeeds for reachable addr", func(t *testing.T) { + ctx := t.Context() + result := prober.probeAddr(ctx, targetID, targetAddr) + assert.True(t, result, "probe should succeed for reachable addr") + }) + + t.Run("probe fails for unreachable addr", func(t *testing.T) { + ctx := t.Context() + // use a port that nothing is listening on + deadAddr := mustMA(t, "/ip4/127.0.0.1/tcp/1") + result := prober.probeAddr(ctx, targetID, deadAddr) + assert.False(t, result, "probe should fail for unreachable addr") + }) +} + +func TestProbeAddrs(t *testing.T) { + t.Parallel() + + // create two target hosts: one reachable, one we'll stop + alive, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + defer alive.Close() + + dead, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + deadAddr := dead.Addrs()[0] + deadID := dead.ID() + dead.Close() // stop listening to make it unreachable + + prober, err := newAddrProber(3, 3*time.Second) + require.NoError(t, err) + defer prober.close() + + t.Run("filters dead addrs, keeps alive", func(t *testing.T) { + ctx := t.Context() + aliveAddr := alive.Addrs()[0] + + addrs := []types.Multiaddr{ + {Multiaddr: aliveAddr}, + {Multiaddr: mustMA(t, "/ip4/127.0.0.1/tcp/1")}, // dead port + } + + result := prober.probeAddrs(ctx, alive.ID(), addrs) + resultStrs := typesAddrStrings(result) + assert.Contains(t, resultStrs, aliveAddr.String()) + assert.NotContains(t, resultStrs, "/ip4/127.0.0.1/tcp/1") + }) + + t.Run("fail-open when all probes fail", func(t *testing.T) { + ctx := t.Context() + addrs := []types.Multiaddr{ + {Multiaddr: deadAddr}, + {Multiaddr: mustMA(t, "/ip4/127.0.0.1/tcp/1")}, + } + + result := prober.probeAddrs(ctx, deadID, addrs) + // should return all addrs unchanged + assert.Len(t, result, len(addrs)) + }) + + t.Run("keeps unprobable addrs (relay, http)", func(t *testing.T) { + ctx := t.Context() + aliveAddr := alive.Addrs()[0] + relayAddr := mustMA(t, "/ip4/1.2.3.4/tcp/4001/p2p/12D3KooWCZ67sU8oCvKd82Y6c9NgpqgoZYuZEUcg4upHCjK3n1aj/p2p-circuit") + httpAddr := mustMA(t, "/ip4/1.2.3.4/tcp/443/tls/http") + + addrs := []types.Multiaddr{ + {Multiaddr: aliveAddr}, + {Multiaddr: mustMA(t, "/ip4/127.0.0.1/tcp/1")}, // dead + {Multiaddr: relayAddr}, + {Multiaddr: httpAddr}, + } + + result := prober.probeAddrs(ctx, alive.ID(), addrs) + resultStrs := typesAddrStrings(result) + assert.Contains(t, resultStrs, aliveAddr.String(), "alive addr should be kept") + assert.Contains(t, resultStrs, relayAddr.String(), "relay addr should be kept") + assert.Contains(t, resultStrs, httpAddr.String(), "http addr should be kept") + assert.NotContains(t, resultStrs, "/ip4/127.0.0.1/tcp/1", "dead addr should be filtered") + }) + + t.Run("deduplicates by (IP, port, L4)", func(t *testing.T) { + ctx := t.Context() + aliveAddr := alive.Addrs()[0] + + // two addrs sharing the same TCP port: one bare, one with /p2p/ suffix + addr1 := aliveAddr + addr2, _ := ma.NewMultiaddr(aliveAddr.String() + "/p2p/" + alive.ID().String()) + + addrs := []types.Multiaddr{ + {Multiaddr: addr1}, + {Multiaddr: addr2}, + } + + result := prober.probeAddrs(ctx, alive.ID(), addrs) + // both should be kept since they share the same alive (IP, port, L4) + assert.Len(t, result, 2) + }) +} + +func TestProbeAddrsConcurrency(t *testing.T) { + t.Parallel() + + // create target + target, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + defer target.Close() + + // small pool to test contention + prober, err := newAddrProber(2, 3*time.Second) + require.NoError(t, err) + defer prober.close() + + ctx := t.Context() + targetAddr := target.Addrs()[0] + + // create multiple distinct addrs to probe (each unique port) + addrs := []types.Multiaddr{ + {Multiaddr: targetAddr}, + {Multiaddr: mustMA(t, "/ip4/127.0.0.1/tcp/1")}, + {Multiaddr: mustMA(t, "/ip4/127.0.0.1/tcp/2")}, + {Multiaddr: mustMA(t, "/ip4/127.0.0.1/tcp/3")}, + } + + result := prober.probeAddrs(ctx, target.ID(), addrs) + resultStrs := typesAddrStrings(result) + assert.Contains(t, resultStrs, targetAddr.String(), "alive addr should survive") + assert.NotContains(t, resultStrs, "/ip4/127.0.0.1/tcp/1") + assert.NotContains(t, resultStrs, "/ip4/127.0.0.1/tcp/2") + assert.NotContains(t, resultStrs, "/ip4/127.0.0.1/tcp/3") +} + +func TestProbeAddrsContextCancellation(t *testing.T) { + t.Parallel() + + prober, err := newAddrProber(2, 5*time.Second) + require.NoError(t, err) + defer prober.close() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + pid := peer.ID("test-peer") + addrs := []types.Multiaddr{ + {Multiaddr: mustMA(t, "/ip4/127.0.0.1/tcp/1")}, + {Multiaddr: mustMA(t, "/ip4/127.0.0.1/tcp/2")}, + } + + // should return all addrs (fail-open) since probes can't run + result := prober.probeAddrs(ctx, pid, addrs) + assert.Len(t, result, len(addrs)) +} diff --git a/cached_addr_book.go b/cached_addr_book.go index b68d2ca..fb5b68d 100644 --- a/cached_addr_book.go +++ b/cached_addr_book.go @@ -86,12 +86,15 @@ var ( ) type peerState struct { - lastConnTime time.Time // last time we successfully connected to this peer - lastFailedConnTime time.Time // last time we failed to find or connect to this peer - connectFailures uint // number of times we've failed to connect to this peer + lastConnTime time.Time // last time we successfully connected to this peer + lastFailedConnTime time.Time // last time we failed to find or connect to this peer + connectFailures uint // number of times we've failed to connect to this peer + connectedAddr ma.Multiaddr // public addr from last successful connection (for stale addr filtering) + lastProbeTime time.Time // last time we probed this peer's individual addrs } type cachedAddrBook struct { + mu sync.Mutex // protects peerCache read-modify-write sequences addrBook peerstore.AddrBook // memory address book peerCache *lru.Cache[peer.ID, peerState] // LRU cache with additional metadata about peer probingEnabled bool @@ -174,6 +177,7 @@ func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) { case ev := <-sub.Out(): switch ev := ev.(type) { case event.EvtPeerIdentificationCompleted: + cab.mu.Lock() pState, exists := cab.peerCache.Peek(ev.Peer) if !exists { pState = peerState{} @@ -181,7 +185,9 @@ func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) { pState.lastConnTime = time.Now() pState.lastFailedConnTime = time.Time{} // reset failed connection time pState.connectFailures = 0 // reset connect failures on successful connection + pState.connectedAddr = ev.Conn.RemoteMultiaddr() cab.peerCache.Add(ev.Peer, pState) + cab.mu.Unlock() peerStateSize.Set(float64(cab.peerCache.Len())) // update metric ttl := cab.getTTL(host.Network().Connectedness(ev.Peer)) @@ -199,6 +205,18 @@ func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) { // We don't have a signed peer record, so we use the listen addresses cab.addrBook.AddAddrs(ev.Peer, ev.ListenAddrs, ttl) } + + // Remove stale addrs (same IP+L4 protocol, different port) from cache. + // Some DHT peers never expire old observed addresses, so peers with + // dynamic ports (UPnP) accumulate many dead addresses over time. + // Without cleanup, clients waste time dialing dead ports, making + // self-hosted peers on consumer networks effectively unreachable. + + if stale := findStalePortAddrs(cab.addrBook.Addrs(ev.Peer), pState.connectedAddr); len(stale) > 0 { + cab.addrBook.SetAddrs(ev.Peer, stale, 0) + logger.Debugw("removed stale addresses from cache", + "peer", ev.Peer, "removed", len(stale)) + } case event.EvtPeerConnectednessChanged: // If the peer is not connected or limited, we update the TTL if !hasValidConnectedness(ev.Connectedness) { @@ -300,6 +318,7 @@ func (cab *cachedAddrBook) GetCachedAddrs(p peer.ID) []types.Multiaddr { // Update the peer cache with information about a failed connection // This should be called when a connection attempt to a peer fails func (cab *cachedAddrBook) RecordFailedConnection(p peer.ID) { + cab.mu.Lock() pState, exists := cab.peerCache.Peek(p) if !exists { pState = peerState{} @@ -309,6 +328,7 @@ func (cab *cachedAddrBook) RecordFailedConnection(p peer.ID) { // we opportunistically remove the dead peer from cache to save time on probing it further if exists && pState.connectFailures > 1 && now.Sub(pState.lastFailedConnTime) > MaxBackoffDuration { cab.peerCache.Remove(p) + cab.mu.Unlock() peerStateSize.Set(float64(cab.peerCache.Len())) // update metric // remove the peer from the addr book. Otherwise it will be probed again in the probe loop cab.addrBook.ClearAddrs(p) @@ -317,6 +337,7 @@ func (cab *cachedAddrBook) RecordFailedConnection(p peer.ID) { pState.lastFailedConnTime = now pState.connectFailures++ cab.peerCache.Add(p, pState) + cab.mu.Unlock() } // Returns true if we should probe a peer (either by dialing known addresses or by dispatching a FindPeer) @@ -342,6 +363,37 @@ func (cab *cachedAddrBook) ShouldProbePeer(p peer.ID) bool { return time.Since(pState.lastFailedConnTime) > backoffDuration } +// getConnectedAddr returns the public multiaddr used for the last successful +// connection to this peer, or nil if unknown. +func (cab *cachedAddrBook) getConnectedAddr(p peer.ID) ma.Multiaddr { + pState, exists := cab.peerCache.Peek(p) + if !exists { + return nil + } + return pState.connectedAddr +} + +// wasRecentlyProbed returns true if the peer was probed within ProbeInterval. +func (cab *cachedAddrBook) wasRecentlyProbed(p peer.ID) bool { + pState, exists := cab.peerCache.Peek(p) + if !exists { + return false + } + return time.Since(pState.lastProbeTime) < ProbeInterval +} + +// recordProbe records the current time as the last probe time for the peer. +func (cab *cachedAddrBook) recordProbe(p peer.ID) { + cab.mu.Lock() + pState, exists := cab.peerCache.Peek(p) + if !exists { + pState = peerState{} + } + pState.lastProbeTime = time.Now() + cab.peerCache.Add(p, pState) + cab.mu.Unlock() +} + func hasValidConnectedness(connectedness network.Connectedness) bool { return connectedness == network.Connected || connectedness == network.Limited } diff --git a/cached_addr_book_test.go b/cached_addr_book_test.go index 004f54a..1262b0d 100644 --- a/cached_addr_book_test.go +++ b/cached_addr_book_test.go @@ -183,7 +183,7 @@ func TestShouldProbePeer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if tt.peerState != (peerState{}) { + if tt.peerState.connectFailures > 0 || !tt.peerState.lastFailedConnTime.IsZero() { cab.peerCache.Add(testPeer, tt.peerState) } result := cab.ShouldProbePeer(testPeer) diff --git a/docs/environment-variables.md b/docs/environment-variables.md index a61394b..68ff1ee 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -8,6 +8,7 @@ - [`SOMEGUY_CACHED_ADDR_BOOK`](#someguy_cached_addr_book) - [`SOMEGUY_CACHED_ADDR_BOOK_RECENT_TTL`](#someguy_cached_addr_book_recent_ttl) - [`SOMEGUY_CACHED_ADDR_BOOK_ACTIVE_PROBING`](#someguy_cached_addr_book_active_probing) + - [`SOMEGUY_CACHED_ADDR_BOOK_STALE_PROBING`](#someguy_cached_addr_book_stale_probing) - [`SOMEGUY_PROVIDER_ENDPOINTS`](#someguy_provider_endpoints) - [`SOMEGUY_PEER_ENDPOINTS`](#someguy_peer_endpoints) - [`SOMEGUY_IPNS_ENDPOINTS`](#someguy_ipns_endpoints) @@ -63,6 +64,16 @@ Whether or not the Cached Address Book should actively probe peers in cache to k Default: `true` +### `SOMEGUY_CACHED_ADDR_BOOK_STALE_PROBING` + +Some faulty third-party DHT peers never expire old observed addresses for other peers. This causes peers with dynamic ports (e.g. UPnP on consumer routers) or changing IPs (roaming, ISP changes) to accumulate dead addresses over time, making them effectively unreachable. + +When enabled, someguy detects first-encounter peers whose address sets look suspicious (multiple ports per IP, or more than 3 IPs per address family) and probes each unique address with an ephemeral libp2p handshake to filter out dead ones before returning results. + +Only applies if `SOMEGUY_CACHED_ADDR_BOOK` is enabled. + +Default: `true` + ### `SOMEGUY_PROVIDER_ENDPOINTS` Comma-separated list of [Delegated Routing V1](https://specs.ipfs.tech/routing/http-routing-v1/) endpoints for provider lookups. diff --git a/main.go b/main.go index 84b9fec..e936203 100644 --- a/main.go +++ b/main.go @@ -52,6 +52,12 @@ func main() { EnvVars: []string{"SOMEGUY_CACHED_ADDR_BOOK_ACTIVE_PROBING"}, Usage: "actively probe peers in cache to keep their multiaddrs up to date", }, + &cli.BoolFlag{ + Name: "cached-addr-book-stale-probing", + Value: true, + EnvVars: []string{"SOMEGUY_CACHED_ADDR_BOOK_STALE_PROBING"}, + Usage: "detect and probe stale addresses caused by faulty third-party DHT peers that never expire old observed addrs (e.g. dead UPnP ports, changed IPs)", + }, &cli.DurationFlag{ Name: "cached-addr-book-recent-ttl", DefaultText: DefaultRecentlyConnectedAddrTTL.String(), @@ -176,6 +182,7 @@ func main() { dhtType: ctx.String("dht"), cachedAddrBook: ctx.Bool("cached-addr-book"), cachedAddrBookActiveProbing: ctx.Bool("cached-addr-book-active-probing"), + cachedAddrBookStaleProbing: ctx.Bool("cached-addr-book-stale-probing"), cachedAddrBookRecentTTL: ctx.Duration("cached-addr-book-recent-ttl"), contentEndpoints: ctx.StringSlice("provider-endpoints"), diff --git a/server.go b/server.go index 518974b..2251651 100644 --- a/server.go +++ b/server.go @@ -79,6 +79,7 @@ type config struct { dhtType string cachedAddrBook bool cachedAddrBookActiveProbing bool + cachedAddrBookStaleProbing bool cachedAddrBookRecentTTL time.Duration contentEndpoints []string @@ -194,17 +195,47 @@ func start(ctx context.Context, cfg *config) error { } // Combine HTTP routers with DHT and additional routers - crRouters := combineRouters(h, dhtRouting, cachedAddrBook, providerHTTPRouters, blockProviderRouters) - prRouters := combineRouters(h, dhtRouting, cachedAddrBook, peerHTTPRouters, nil) + var crRouters router = combineRouters(h, dhtRouting, cachedAddrBook, providerHTTPRouters, blockProviderRouters) + var prRouters router = combineRouters(h, dhtRouting, cachedAddrBook, peerHTTPRouters, nil) ipnsRouters := combineRouters(h, dhtRouting, cachedAddrBook, ipnsHTTPRouters, nil) + // Create addr prober for active per-addr probing of suspicious addr sets. + // This uses ephemeral libp2p hosts to probe individual addrs and filter + // dead ones (stale UPnP ports, changed IPs) before returning results. + var prober *addrProber + if cachedAddrBook != nil && cfg.cachedAddrBookStaleProbing { + var proberErr error + prober, proberErr = newAddrProber(10, 5*time.Second) + if proberErr != nil { + logger.Warnf("failed to create addr prober, stale addr probing disabled: %v", proberErr) + } + } + + // Wrap provider and peer routers with stale addr filtering. + // Some DHT peers never expire old observed addresses, so peers with + // dynamic ports (e.g. UPnP) accumulate many stale addresses that no + // longer work. This causes clients to waste time dialing dead ports, + // effectively making self-hosted peers on consumer networks unreachable. + // This wrapper filters responses from ALL sources (delegated HTTP + DHT) + // using known-good addresses from previous successful connections. + // When prober is available, first-encounter peers with suspicious addr + // sets (multi-port, multi-IP) are actively probed per-addr. + if cachedAddrBook != nil { + crRouters = sanitizeRouter{router: crRouters, cab: cachedAddrBook, prober: prober} + prRouters = sanitizeRouter{router: prRouters, cab: cachedAddrBook, prober: prober} + } + // Create DHT router for GetClosestPeers endpoint var dhtRouters router if cachedAddrBook != nil && dhtRouting != nil { cachedRouter := NewCachedRouter(libp2pRouter{host: h, routing: dhtRouting}, cachedAddrBook) - dhtRouters = sanitizeRouter{cachedRouter} + dhtRouters = sanitizeRouter{router: cachedRouter, cab: cachedAddrBook, prober: prober} } else if dhtRouting != nil { - dhtRouters = sanitizeRouter{libp2pRouter{host: h, routing: dhtRouting}} + dhtRouters = sanitizeRouter{router: libp2pRouter{host: h, routing: dhtRouting}} + } + + if prober != nil { + defer prober.close() } _, port, err := net.SplitHostPort(cfg.listenAddress) @@ -340,9 +371,9 @@ func combineRouters(h host.Host, dht routing.Routing, cachedAddrBook *cachedAddr if cachedAddrBook != nil { cachedRouter := NewCachedRouter(libp2pRouter{host: h, routing: dht}, cachedAddrBook) - dhtRouter = sanitizeRouter{cachedRouter} + dhtRouter = sanitizeRouter{router: cachedRouter} } else if dht != nil { - dhtRouter = sanitizeRouter{libp2pRouter{host: h, routing: dht}} + dhtRouter = sanitizeRouter{router: libp2pRouter{host: h, routing: dht}} } if len(delegatedRouters) == 0 && len(additionalRouters) == 0 { diff --git a/server_routers.go b/server_routers.go index ce7806f..064b08c 100644 --- a/server_routers.go +++ b/server_routers.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "sync/atomic" "time" "github.com/ipfs/boxo/ipns" @@ -511,8 +512,23 @@ func (it *peerChanIter) Close() error { var _ server.ContentRouter = sanitizeRouter{} +// sanitizeRouter wraps a router with address filtering applied to every +// response. Filtering happens in two stages: +// +// 1. Fast inline stage (filterAddrs via iter.Map): strips private addrs +// and, when a known-good connected addr exists, applies passive +// stale-port filtering. +// +// 2. Async probing stage (probeFilterIter): for first-encounter peers +// whose addr sets look suspicious (multi-port, multi-IP), dispatches +// per-addr probing in background goroutines so the NDJSON stream is +// not blocked. Probed results appear after non-probed records. +// +// Stage 2 is only active when both cab and prober are set. type sanitizeRouter struct { router + cab *cachedAddrBook // optional: enables passive stale addr filtering + prober *addrProber // optional: enables active per-addr probing } func (r sanitizeRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { @@ -521,7 +537,8 @@ func (r sanitizeRouter) FindProviders(ctx context.Context, key cid.Cid, limit in return nil, err } - return iter.Map(it, func(v iter.Result[types.Record]) iter.Result[types.Record] { + // Stage 1 (fast, inline): strip private addrs + passive stale-port filtering + filtered := iter.Map(it, func(v iter.Result[types.Record]) iter.Result[types.Record] { if v.Err != nil || v.Val == nil { return v } @@ -530,11 +547,11 @@ func (r sanitizeRouter) FindProviders(ctx context.Context, key cid.Cid, limit in case types.SchemaPeer: result, ok := v.Val.(*types.PeerRecord) if !ok { - logger.Errorw("problem casting find providers result", "Schema", v.Val.GetSchema(), "Type", reflect.TypeOf(v).String()) + logger.Errorw("problem casting find providers result", "Schema", v.Val.GetSchema(), "Type", reflect.TypeFor[iter.Result[types.Record]]().String()) return v } - result.Addrs = filterPrivateMultiaddr(result.Addrs) + result.Addrs = r.filterAddrs(*result.ID, result.Addrs) v.Val = result //lint:ignore SA1019 // ignore staticcheck @@ -542,7 +559,7 @@ func (r sanitizeRouter) FindProviders(ctx context.Context, key cid.Cid, limit in //lint:ignore SA1019 // ignore staticcheck result, ok := v.Val.(*types.BitswapRecord) if !ok { - logger.Errorw("problem casting find providers result", "Schema", v.Val.GetSchema(), "Type", reflect.TypeOf(v).String()) + logger.Errorw("problem casting find providers result", "Schema", v.Val.GetSchema(), "Type", reflect.TypeFor[iter.Result[types.Record]]().String()) return v } @@ -551,7 +568,14 @@ func (r sanitizeRouter) FindProviders(ctx context.Context, key cid.Cid, limit in } return v - }), nil + }) + + // Stage 2 (async): wrap with probeFilterIter for per-addr probing + // of first-encounter peers with suspicious addr sets. + if r.prober != nil && r.cab != nil { + return newProbeFilterIter(filtered, r, ctx), nil + } + return filtered, nil } func (r sanitizeRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { @@ -560,14 +584,19 @@ func (r sanitizeRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) ( return nil, err } - return iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[*types.PeerRecord] { + filtered := iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[*types.PeerRecord] { if v.Err != nil || v.Val == nil { return v } - v.Val.Addrs = filterPrivateMultiaddr(v.Val.Addrs) + v.Val.Addrs = r.filterAddrs(*v.Val.ID, v.Val.Addrs) return v - }), nil + }) + + if r.prober != nil && r.cab != nil { + return r.applyProbeFiltering(filtered, ctx), nil + } + return filtered, nil } func (r sanitizeRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { @@ -576,14 +605,19 @@ func (r sanitizeRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter. return nil, err } - return iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[*types.PeerRecord] { + filtered := iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[*types.PeerRecord] { if v.Err != nil || v.Val == nil { return v } - v.Val.Addrs = filterPrivateMultiaddr(v.Val.Addrs) + v.Val.Addrs = r.filterAddrs(*v.Val.ID, v.Val.Addrs) return v - }), nil + }) + + if r.prober != nil && r.cab != nil { + return r.applyProbeFiltering(filtered, ctx), nil + } + return filtered, nil } //lint:ignore SA1019 // ignore staticcheck @@ -591,6 +625,184 @@ func (r sanitizeRouter) ProvideBitswap(ctx context.Context, req *server.BitswapW return 0, routing.ErrNotSupported } +// filterAddrs applies fast address filters: removes private addrs and, +// when cached addr book has a known-good connected addr, removes stale +// addresses on the same IP with a different port. Does not do active +// probing (that is handled asynchronously by probeFilterIter). +func (r sanitizeRouter) filterAddrs(pid peer.ID, addrs []types.Multiaddr) []types.Multiaddr { + addrs = filterPrivateMultiaddr(addrs) + if r.cab != nil { + if connAddr := r.cab.getConnectedAddr(pid); connAddr != nil { + addrs = filterStalePortAddrs(addrs, connAddr) + } + } + return addrs +} + +// applyProbeFiltering wraps a *types.PeerRecord iterator with async probing. +// Since probeFilterIter operates on types.Record, this converts PeerRecord +// to Record and back, following the same pattern as applyPeerRecordCaching +// in server_cached_router.go. +func (r sanitizeRouter) applyProbeFiltering(it iter.ResultIter[*types.PeerRecord], ctx context.Context) iter.ResultIter[*types.PeerRecord] { + recordIter := iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[types.Record] { + if v.Err != nil { + return iter.Result[types.Record]{Err: v.Err} + } + return iter.Result[types.Record]{Val: v.Val} + }) + + probeIter := newProbeFilterIter(recordIter, r, ctx) + + return iter.Map(probeIter, func(v iter.Result[types.Record]) iter.Result[*types.PeerRecord] { + if v.Err != nil { + return iter.Result[*types.PeerRecord]{Err: v.Err} + } + peerRec, ok := v.Val.(*types.PeerRecord) + if !ok { + return iter.Result[*types.PeerRecord]{Err: errors.New("unexpected record type in probe filter")} + } + return iter.Result[*types.PeerRecord]{Val: peerRec} + }) +} + +var _ iter.ResultIter[types.Record] = &probeFilterIter{} + +// probeFilterIter wraps a types.Record iterator and dispatches per-addr +// probing asynchronously for peers whose address sets look suspicious +// (multiple ports per IP, or multiple IPs per address family). +// +// It follows the same async pattern as cacheFallbackIter (server_cached_router.go): +// +// 1. Pull records from the source iterator one at a time. +// 2. If a PeerRecord needs probing (needsProbing == true, not recently probed), +// dispatch probing in a background goroutine and move to the next record +// without blocking the stream. +// 3. Records that don't need probing pass through immediately. +// 4. After the source is exhausted, drain pending probe results from the +// probeResults channel before signaling completion. +// +// This ensures the NDJSON stream starts flowing immediately for non-probed +// peers, and probed peers appear at the end once their handshakes complete. +type probeFilterIter struct { + sourceIter iter.ResultIter[types.Record] + current iter.Result[types.Record] + probeResults chan iter.Result[types.Record] // receives results from background dispatchProbe goroutines + router sanitizeRouter + ctx context.Context + cancel context.CancelFunc + ongoingProbes atomic.Int32 // tracks in-flight dispatchProbe goroutines +} + +func newProbeFilterIter(sourceIter iter.ResultIter[types.Record], r sanitizeRouter, ctx context.Context) *probeFilterIter { + ctx, cancel := context.WithCancel(ctx) + return &probeFilterIter{ + sourceIter: sourceIter, + router: r, + ctx: ctx, + cancel: cancel, + probeResults: make(chan iter.Result[types.Record], 100), + } +} + +func (it *probeFilterIter) Next() bool { + for { + // Phase 1: pull from source iterator, pass through or dispatch probing. + if it.sourceIter.Next() { + val := it.sourceIter.Val() + if val.Err != nil || val.Val == nil { + it.current = val + return true + } + + // only PeerRecords can be probed; pass through other schemas + if val.Val.GetSchema() != types.SchemaPeer { + it.current = val + return true + } + + record, ok := val.Val.(*types.PeerRecord) + if !ok || record.ID == nil { + it.current = val + return true + } + + // if the addr set looks suspicious and we haven't probed recently, + // dispatch probing in the background and skip to next record + if needsProbing(record.Addrs) && !it.router.cab.wasRecentlyProbed(*record.ID) { + it.router.cab.recordProbe(*record.ID) + it.ongoingProbes.Add(1) // must increment before goroutine launch + go it.dispatchProbe(record) + continue + } + + // addr set looks clean, pass through immediately + it.current = val + return true + } + + // Phase 2: source exhausted, wait for in-flight probe goroutines. + // Same drain pattern as cacheFallbackIter: poll ongoingProbes with + // a short timer to avoid deadlock if count reaches 0 between check + // and channel read. + if it.ongoingProbes.Load() == 0 { + return false + } + + timer := time.NewTimer(100 * time.Millisecond) + select { + case result, ok := <-it.probeResults: + timer.Stop() + if !ok { + return false + } + it.current = result + return true + case <-it.ctx.Done(): + timer.Stop() + return false + case <-timer.C: + // timeout, loop back to recheck ongoingProbes + } + } +} + +func (it *probeFilterIter) Val() iter.Result[types.Record] { + if it.current.Val != nil || it.current.Err != nil { + return it.current + } + return iter.Result[types.Record]{Err: errNoValueAvailable} +} + +func (it *probeFilterIter) Close() error { + if it.cancel != nil { + it.cancel() + } + return it.sourceIter.Close() +} + +// dispatchProbe runs in a background goroutine. It probes all addrs for +// the peer (via addrProber.probeAddrs which handles dedup and fail-open), +// updates the record's addr list, and sends it back through probeResults. +func (it *probeFilterIter) dispatchProbe(record *types.PeerRecord) { + defer it.ongoingProbes.Add(-1) + + probed := it.router.prober.probeAddrs(it.ctx, *record.ID, record.Addrs) + record.Addrs = probed + + if it.ctx.Err() != nil { + return + } + + select { + case it.probeResults <- iter.Result[types.Record]{Val: record}: + case <-it.ctx.Done(): + default: + // channel full or nobody listening, drop the result. + // this is best-effort -- same as cacheFallbackIter.dispatchFindPeer + logger.Debugw("dropping probe result, channel full", "peer", record.ID) + } +} + func filterPrivateMultiaddr(a []types.Multiaddr) []types.Multiaddr { b := make([]types.Multiaddr, 0, len(a)) diff --git a/server_routers_test.go b/server_routers_test.go index 3523ab8..e4cf99a 100644 --- a/server_routers_test.go +++ b/server_routers_test.go @@ -316,7 +316,7 @@ func TestFindProviders(t *testing.T) { mr2Iter := newMockIter[types.Record](ctx) mr2.On("FindProviders", mock.Anything, c, 10).Return(mr2Iter, nil) - d = sanitizeRouter{parallelRouter{ + d = sanitizeRouter{router: parallelRouter{ routers: []router{ &composableRouter{ providers: mr1,