Skip to content

Drop concurrent UDP requests per sender#228

Open
tomquist wants to merge 1 commit intodevelopfrom
fix-memory-leak-due-to-throttling
Open

Drop concurrent UDP requests per sender#228
tomquist wants to merge 1 commit intodevelopfrom
fix-memory-leak-due-to-throttling

Conversation

@tomquist
Copy link
Owner

@tomquist tomquist commented Feb 7, 2026

Motivation

  • Prevent overlapping UDP requests from the same client from queuing and leaking resources when throttling is enabled.
  • Ensure a single in-flight request per sender so slow powermeter handlers don't cause backlog or unexpected concurrent handling.

Description

  • Add per-sender in-flight tracking with self._in_flight_senders protected by self._in_flight_lock in shelly/shelly.py.
  • Introduce _handle_request_with_tracking to call the existing handler and always remove the sender from the in-flight set after handling completes.
  • Update udp_server to check and add the sender to the in-flight set before submitting work and to log and discard overlapping requests from the same addr[0].
  • Update shelly/shelly_udp_test.py to use a SlowPowermeter, set a client timeout with client.settimeout(0.3), collect responses after issuing concurrent requests, and assert only one response is received during a throttled overlap.
  • Move the Shelly UDP throttling fix note into the ## Next section of CHANGELOG.md.

Testing

  • Ran pytest -q shelly/shelly_udp_test.py and the test passed.

Codex Task

Summary by CodeRabbit

  • Bug Fixes
    • Prevented concurrent processing of overlapping UDP requests from the same sender, eliminating backlog and throttling-related issues for improved responsiveness.

@coderabbitai
Copy link

coderabbitai bot commented Feb 7, 2026

Walkthrough

This pull request implements in-flight request tracking for UDP handling to prevent overlapping processing of concurrent requests from the same client IP. It adds tracking mechanisms, a helper method for cleanup, modified request submission logic, and corresponding test updates.

Changes

Cohort / File(s) Summary
Documentation
CHANGELOG.md
Added "Next" section documenting the change to drop overlapping Shelly UDP requests per sender.
UDP Request Tracking
shelly/shelly.py
Added in_flight_lock and in_flight_senders for concurrent request tracking. Introduced _handle_request_with_tracking() helper method to manage cleanup. Modified udp_server() to check in-flight status before executor submission and discard duplicate requests.
Test Updates
shelly/shelly_udp_test.py
Introduced SlowPowermeter wrapper to simulate delayed processing. Added socket timeout (0.3s). Replaced blocking receive with non-blocking drain loop to collect responses. Updated assertion to expect single response instead of three, reflecting duplicate dropping behavior.

Sequence Diagram

sequenceDiagram
    participant Client as UDP Client
    participant Server as UDP Server
    participant Tracking as In-Flight Tracker
    participant Executor as Thread Executor
    participant Handler as Request Handler

    Client->>Server: Send UDP request from IP X
    Server->>Tracking: Check if IP X in in_flight_senders
    alt Request not in-flight
        Tracking-->>Server: Not tracked
        Server->>Tracking: Add IP X to in_flight_senders
        Server->>Executor: Submit request with tracking wrapper
        Executor->>Handler: Process request
        Handler-->>Executor: Complete
        Executor->>Tracking: Remove IP X from in_flight_senders
        Executor-->>Server: Response ready
    else Request already in-flight
        Tracking-->>Server: Already tracked (duplicate)
        Server-->>Client: Discard duplicate request
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'Drop concurrent UDP requests per sender' directly and clearly summarizes the main change: implementing per-sender request deduplication to prevent overlapping UDP requests.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix-memory-leak-due-to-throttling

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@shelly/shelly.py`:
- Around line 136-146: The code adds addr[0] to the _in_flight_senders set
before calling self._executor.submit(self._handle_request_with_tracking, sock,
data, addr) which can raise and leak the sender; modify the block around
_executor.submit so that the submit call is wrapped in a try/except (catching
exceptions like RuntimeError) and on any exception remove addr[0] from
_in_flight_senders inside the same _in_flight_lock scope (or re-acquire it) to
ensure the sender is unblocked; reference the existing symbols _in_flight_lock,
_in_flight_senders, _executor.submit and _handle_request_with_tracking when
applying the change.
🧹 Nitpick comments (1)
shelly/shelly_udp_test.py (1)

57-65: Timing-sensitive test — consider widening the recv timeout margin.

The 0.3s socket timeout leaves little headroom above the 0.2s SlowPowermeter sleep. On a loaded CI runner, the first response could arrive after the timeout, causing len(responses) == 0 and a flaky failure. Bumping the timeout to 0.5s (and the duration assertion to < 1.0) would keep the test meaningful while reducing flakiness.

Comment on lines +136 to +146
with self._in_flight_lock:
if addr[0] in self._in_flight_senders:
logger.debug(
"Discarding request from %s; previous request still in progress",
addr[0],
)
continue
self._in_flight_senders.add(addr[0])
self._executor.submit(
self._handle_request_with_tracking, sock, data, addr
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Sender leaks in _in_flight_senders if submit raises.

If self._executor.submit(...) throws (e.g., RuntimeError after executor shutdown), addr[0] was already added at Line 143 but the tracking wrapper never runs, so discard in the finally block is never reached. That sender is permanently blocked.

Wrap the submit in a try/except that removes the sender on failure:

Proposed fix
                     self._in_flight_senders.add(addr[0])
-                self._executor.submit(
-                    self._handle_request_with_tracking, sock, data, addr
-                )
+                try:
+                    self._executor.submit(
+                        self._handle_request_with_tracking, sock, data, addr
+                    )
+                except RuntimeError:
+                    with self._in_flight_lock:
+                        self._in_flight_senders.discard(addr[0])
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with self._in_flight_lock:
if addr[0] in self._in_flight_senders:
logger.debug(
"Discarding request from %s; previous request still in progress",
addr[0],
)
continue
self._in_flight_senders.add(addr[0])
self._executor.submit(
self._handle_request_with_tracking, sock, data, addr
)
with self._in_flight_lock:
if addr[0] in self._in_flight_senders:
logger.debug(
"Discarding request from %s; previous request still in progress",
addr[0],
)
continue
self._in_flight_senders.add(addr[0])
try:
self._executor.submit(
self._handle_request_with_tracking, sock, data, addr
)
except RuntimeError:
with self._in_flight_lock:
self._in_flight_senders.discard(addr[0])
🤖 Prompt for AI Agents
In `@shelly/shelly.py` around lines 136 - 146, The code adds addr[0] to the
_in_flight_senders set before calling
self._executor.submit(self._handle_request_with_tracking, sock, data, addr)
which can raise and leak the sender; modify the block around _executor.submit so
that the submit call is wrapped in a try/except (catching exceptions like
RuntimeError) and on any exception remove addr[0] from _in_flight_senders inside
the same _in_flight_lock scope (or re-acquire it) to ensure the sender is
unblocked; reference the existing symbols _in_flight_lock, _in_flight_senders,
_executor.submit and _handle_request_with_tracking when applying the change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant