diff --git a/README.md b/README.md index 8e0fe23..63762d4 100644 --- a/README.md +++ b/README.md @@ -74,10 +74,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. | :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config | #### Consumer-source options -| Key | Type | Req? | Notes | -|-----|------|------|-------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| Key | Type | Req? | Notes | +|-----|------|------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | + | :ketu.source/consumer-commands-chan | channel | optional | Used for passing custom functions to be executed from within the poll loop. Items of this channel are expected to be of type `fn[x]`. One example for using this channel is to enable pausing/resuming of the underlying kafka consumer, since trying to do that outside the poll loop causes a `ConcurrentModificationException` to be thrown. [Example](#example-of-using-the-custom-commands-channel) | #### Producer-sink options | Key | Type | Req? | Notes | @@ -127,6 +128,79 @@ Similarly, to put a clojure data structure on the producer channel: (>!! producer-chan ["k2" "v2" "events"]) ``` +## Example of using the custom commands channel + +In this example we demonstare how to enable pause/resume of the consumer: + +```clojure +(ns custom-commands-channel-example + (:require [clojure.core.async :as async] + [ketu.async.source :as source] + [ketu.async.sink :as sink])) + +(let [commands-chan (async/chan 10) + consumer-chan (async/chan 10) + consumer-opts {:name "consumer-example" + :brokers "broker1:9092" + :topic "example" + :group-id "example" + :value-type :string + :shape :value + :ketu.source/consumer-commands-chan commands-chan} + source (source/source consumer-chan consumer-opts) + + producer-chan (async/chan 10) + sink-opts {:name "producer-example" + :brokers "broker1:9092" + :topic "example" + :value-type :string + :shape :value} + sink (sink/sink producer-chan sink-opts) + + ; periodically produce data to the topic + producing (future + (dotimes [i 20] + (async/>!! producer-chan (str i)) + (Thread/sleep 300)) + (async/>!! producer-chan "done") + (async/close! producer-chan)) + + ; read from the consumer channel and print to the screen + processing (future + (loop [] + (let [message (async/!! commands-chan (fn [{consumer :ketu.source/consumer}] + (.pause consumer (.assignment consumer)) + (deliver paused true))) + + @paused + (println "consumer is paused") + (Thread/sleep 2000) + + ; Send the commands channel a function that will resume the consumer + (async/>!! commands-chan (fn [{consumer :ketu.source/consumer}] + (.resume consumer (.paused consumer)) + (deliver resumed true))) + + @resumed + (println "consumer is resumed") + + ; Wait for all futures to finish + @producing + @processing) + (finally + (source/stop! source)))) +``` + ## Development & Contribution We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests, diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 558b51e..f02def7 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -102,6 +102,7 @@ close-out-chan? (:ketu.source/close-out-chan? opts) ^long close-consumer? (:ketu.source/close-consumer? opts) consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts) + commands-chan (:ketu.source/consumer-commands-chan opts) should-poll? (volatile! true) abort-pending-put (async/chan) @@ -112,6 +113,10 @@ ->data (->data-fn opts) put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put)) + maybe-execute-custom-command (if (some? commands-chan) + (fn [] (when-let [command (async/poll! commands-chan)] + (command {:ketu.source/consumer consumer}))) + (fn [])) consumer-thread (async/thread (try @@ -121,6 +126,7 @@ (subscribe! consumer) (while @should-poll? + (maybe-execute-custom-command) (let [records (poll!)] (run! put! records))) diff --git a/src/ketu/spec.clj b/src/ketu/spec.clj index d33221a..1039826 100644 --- a/src/ketu/spec.clj +++ b/src/ketu/spec.clj @@ -2,7 +2,8 @@ (:require [clojure.set] [clojure.spec.alpha :as s] [clojure.string] - [expound.alpha :as expound]) + [expound.alpha :as expound] + [clojure.core.async.impl.protocols]) (:import (java.util.regex Pattern) (org.apache.kafka.clients.producer Callback) (org.apache.kafka.common.serialization Deserializer Serializer))) @@ -27,6 +28,7 @@ (s/def :ketu.source/close-out-chan? boolean?) (s/def :ketu.source/close-consumer? boolean?) (s/def :ketu.source/create-rebalance-listener-obj fn?) +(s/def :ketu.source/consumer-commands-chan #(extends? clojure.core.async.impl.protocols/ReadPort (type %))) (s/def :ketu.source.assign/topic :ketu/topic) (s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?)) @@ -76,7 +78,8 @@ :ketu.source/consumer-close-timeout-ms :ketu.source/consumer-thread-timeout-ms :ketu.source/close-out-chan? - :ketu.source/close-consumer?])) + :ketu.source/close-consumer? + :ketu.source/consumer-commands-chan])) (s/def :ketu.apache.producer/config map?) (s/def :ketu.sink/sender-threads-num pos-int?) diff --git a/test/ketu/async/integration_test.clj b/test/ketu/async/integration_test.clj index 6ae4e41..782481c 100644 --- a/test/ketu/async/integration_test.clj +++ b/test/ketu/async/integration_test.clj @@ -211,3 +211,38 @@ (source/stop! source-rebalance) (source/stop! source) (.close ^AdminClient admin-client))))) + +(deftest commands-channel + (let [commands-chan (async/chan 10) + consumer-chan (async/chan 10) + result-chan (async/chan 10) + clicks-consumer-opts {:name "clicks-consumer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :group-id "clicks-test-consumer" + :auto-offset-reset "earliest" + :ketu.source/consumer-commands-chan commands-chan} + source (source/source consumer-chan clicks-consumer-opts)] + (try + (async/>!! commands-chan (fn [{consumer :ketu.source/consumer}] (async/>!! result-chan [:1 consumer]))) + (async/>!! commands-chan (fn [{consumer :ketu.source/consumer}] + (try + (.pause consumer (.assignment consumer)) + (async/>!! result-chan [:2 :success]) + (catch Exception _ + (async/>!! result-chan [:2 :failed]))))) + (async/>!! commands-chan (fn [{consumer :ketu.source/consumer}] + (try + (.resume consumer (.paused consumer)) + (async/>!! result-chan [:3 :success]) + (catch Exception _ + (async/>!! result-chan [:3 :failed]))))) + + (let [expected [[:1 (:ketu.source/consumer source)] + [:2 :success] + [:3 :success]] + actual (doall (mapv (fn [_] (u/try-take! result-chan)) (range 3)))] + (is (= expected actual))) + (finally + (Thread/sleep 2000) + (source/stop! source)))))