From c78264d4b386d8620bffe9ef56681fcd2b513690 Mon Sep 17 00:00:00 2001 From: "amit.gold" Date: Wed, 9 Apr 2025 14:44:41 +0300 Subject: [PATCH] Better disconnection handling --- src/ketu/async/source.clj | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 051db46..007ed0c 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -1,5 +1,6 @@ (ns ketu.async.source (:require [clojure.core.async :as async] + [clojure.core.async.impl.protocols :as async-protocols] [ketu.async.util :as util] [ketu.clients.consumer :as consumer] [ketu.shape.consumer :as shape] @@ -125,7 +126,10 @@ (loop [] (when-let [records (poll!)] - (run! put! records) + (if (seq records) + (run! put! records) + ; Throws exception if not connected + (.listTopics consumer (Duration/ofMillis 5000))) (recur))) (catch WakeupException e @@ -211,6 +215,9 @@ (defn done-chan [state] (:ketu.source/consumer-thread state)) +(defn closed? [consumer] + (-> consumer :ketu.source/out-chan async-protocols/closed?)) + (defn- wait-for-the-thread! [state] (let [consumer-thread (:ketu.source/consumer-thread state) consumer-thread-timeout-ms (-> state :ketu/source-opts :ketu.source/consumer-thread-timeout-ms)