From 5d9b4160ee9326212e467d5f26debbede2b82c56 Mon Sep 17 00:00:00 2001 From: earocorn Date: Wed, 26 Nov 2025 14:19:16 -0600 Subject: [PATCH 1/2] Fix issue with AsynchronousSocketChannel on Windows --- .../impl/service/hivemq/WebSocketProxy.java | 66 +++++++++++++++++-- 1 file changed, 62 insertions(+), 4 deletions(-) diff --git a/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java b/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java index 62f71be36..71dc933b1 100644 --- a/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java +++ b/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java @@ -21,6 +21,11 @@ import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketListener; @@ -45,6 +50,9 @@ public class WebSocketProxy implements WebSocketListener ByteBuffer socketReadBuffer; CompletionHandler socketReadHandler; CompletionHandler socketWriteHandler; + + private final Queue writeQueue = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean writeInProgress = new AtomicBoolean(false); WebSocketProxy(InetSocketAddress mqttHost, Logger logger) @@ -143,13 +151,63 @@ public void onWebSocketClose(int statusCode, String reason) @Override - public void onWebSocketBinary(byte[] payload, int offset, int len) - { - if (mqttSocket != null && mqttSocket.isOpen()) - mqttSocket.write(ByteBuffer.wrap(payload)); + public void onWebSocketBinary(byte[] payload, int offset, int len) { + if (mqttSocket == null || !mqttSocket.isOpen()) + return; + + ByteBuffer buf = ByteBuffer.wrap(Arrays.copyOfRange(payload, offset, offset + len)); + + writeQueue.add(buf); + tryWrite(); + } + + private void tryWrite() { + if (!writeInProgress.compareAndSet(false, true)) + return; + + ByteBuffer first = writeQueue.poll(); + if (first == null) { + writeInProgress.set(false); + return; + } + + // Wrapper so the handler can mutate the current buffer + class State { + ByteBuffer current = first; + } + + State state = new State(); + + mqttSocket.write(state.current, null, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { + // Write the remaining bytes + if (state.current.hasRemaining()) { + mqttSocket.write(state.current, null, this); + return; + } + + // Get next buffer if finished + ByteBuffer next = writeQueue.poll(); + if (next == null) { + writeInProgress.set(false); + return; + } + + state.current = next; + mqttSocket.write(state.current, null, this); + } + + @Override + public void failed(Throwable exc, Void attachment) { + writeInProgress.set(false); + log.error("Error writing to MQTT TCP socket", exc); + } + }); } + @Override public void onWebSocketText(String message) { From 6a4ac1c1a3f224952b013f9c441989b4c3e04875 Mon Sep 17 00:00:00 2001 From: Alex Almanza Date: Thu, 26 Feb 2026 14:12:22 -0600 Subject: [PATCH 2/2] Add check to use queue ONLY for Windows systems --- .../impl/service/hivemq/WebSocketProxy.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java b/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java index 71dc933b1..bd3185f59 100644 --- a/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java +++ b/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java @@ -53,12 +53,14 @@ public class WebSocketProxy implements WebSocketListener private final Queue writeQueue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean writeInProgress = new AtomicBoolean(false); - + + private final boolean isWindows; WebSocketProxy(InetSocketAddress mqttHost, Logger logger) { this.mqttHost = mqttHost; this.log = logger; + this.isWindows = System.getProperty("os.name").toLowerCase().contains("win"); } @@ -155,10 +157,17 @@ public void onWebSocketBinary(byte[] payload, int offset, int len) { if (mqttSocket == null || !mqttSocket.isOpen()) return; - ByteBuffer buf = ByteBuffer.wrap(Arrays.copyOfRange(payload, offset, offset + len)); + if (isWindows) + { + // Need to use a write queue since Windows' AsynchronousSocketChannel is inefficient + ByteBuffer buf = ByteBuffer.wrap(Arrays.copyOfRange(payload, offset, offset + len)); + writeQueue.add(buf); + tryWrite(); + return; + } - writeQueue.add(buf); - tryWrite(); + // Default to writing directly to socket channel + mqttSocket.write(ByteBuffer.wrap(payload)); } private void tryWrite() {