com.oscaro/felice

3.2.0-1.7


Felice is client library for Apache Kafka in Clojure

dependencies

org.clojure/clojure
1.10.1
org.apache.kafka/kafka-clients
3.2.0
com.cognitect/transit-clj
0.8.319
metosin/jsonista
0.2.5
com.taoensso/nippy
3.1.3



(this space intentionally left almost blank)
 

A thin layer on top of java KafkaConsumer See: https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

(ns felice.consumer
  (:require [clojure.walk :as walk]
            [felice.serialization :refer [deserializer]])
  (:import [org.apache.kafka.clients.consumer KafkaConsumer ConsumerRecords ConsumerRecord]
           [org.apache.kafka.clients.consumer OffsetAndMetadata]
           [org.apache.kafka.common TopicPartition Metric]
           org.apache.kafka.common.errors.WakeupException
           java.time.Duration))
(def CONF-COERCERS {:auto.commit.interval.ms   int
                    :connections.max.idle.ms   int
                    :default.api.timeout.ms    int
                    :fetch.max.bytes           int
                    :fetch.max.wait.ms         int
                    :fetch.min.bytes           int
                    :heartbeat.interval.ms     int
                    :max.partition.fetch.bytes int
                    :max.poll.interval.ms      int
                    :max.poll.records          int
                    :metrics.num.samples       int
                    :receive.buffer.bytes      int
                    :request.timeout.ms        int
                    :send.buffer.bytes         int
                    :session.timeout.ms        int
                    :sasl.login.refresh.buffer.seconds     short
                    :sasl.login.refresh.min.period.seconds short})
(defn- coerce-consumer-config
  [cfg]
  (->> cfg
       (map (fn [[k v]]
              (let [coerce-fn (get CONF-COERCERS (keyword k))
                    v* (if (and v coerce-fn) (coerce-fn v) v)]
                [k v*])))
       (into {})))

Commit functions

Commit offsets returned on the last poll() for all the subscribed list of topics and partitions.

consumer must be a KafkaConsumer object

(defn commit-sync
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer]
  (.commitSync consumer))

Commit a specific record

consumer must be a KafkaConsumer object

record must be a map with :partition :topic and :offset

(defn commit-message-offset
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer {:keys [partition topic offset] :as record}]
  (let [commit-point (long (inc offset))]
    (.commitSync consumer ^java.util.Map {(TopicPartition. topic partition)
                                          (OffsetAndMetadata. commit-point)})))
(defn ^:no-doc metric->map [^Metric metric]
  (let [metric-name (.metricName metric)]
    {:name        (.name metric-name)
     :tags        (.tags metric-name)
     :group       (.group metric-name)
     :description (.description metric-name)
     :value       (.metricValue metric)}))

returns a list of mtrics mapkept by the consumer

(defn metrics
    {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer]
  (map (fn [^java.util.Map$Entry m] (metric->map (.getValue m))) (.metrics consumer)))

converts a TopicPartition object to a clojure map containing :topic and :partition

(defn topic-partition->map
  {:added "3.2.0-1.7"}
  [^TopicPartition topic-partition]
  {:partition (.partition topic-partition)
   :topic     (.topic topic-partition)})

returns a set of topic-partition map currently assigned to this consumer.

(defn assignment
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer]
  (set (map topic-partition->map (.assignment consumer))))

returns a map {topic [assigned-partitions]}

(defn assignment-by-topic
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer]
  (loop [remaining-assignment (.assignment consumer)
         by-topic {}]
    (if remaining-assignment
      (let [topic-partition (first remaining-assignment)]
        (recur (next remaining-assignment)
               (update by-topic (.topic topic-partition) (fnil conj []) (.partition topic-partition))))
      by-topic)))

returns the set of currenctly subscribed topics

(defn subscription
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer]
  (.subscription consumer))

subscribe the consumer to one or more topics automaticly resubscribes previous subscriptions returns the consumer

note: subscribe and assign are mutually exclusive

(defn subscribe
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer & topics]
  (.subscribe consumer ^java.util.Collection (concat (subscription consumer) topics))
  consumer)

Unsubscribe from all topics currently subscribed returns the consumer

(defn unsubscribe
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer]
  (.unsubscribe consumer)
  consumer)
(defn ^:no-doc position [^KafkaConsumer consumer topic partition]
  (.position consumer (TopicPartition. topic partition)))
(defn ^:no-doc topics->assigned-topic-partitions [^KafkaConsumer consumer topics]
  (let [assignments (assignment-by-topic consumer)]
    (if topics
      (select-keys assignments (if (vector? topics) topics [topics]))
      assignments)))
(defn ^:no-doc ->topic-partitions
  ([topic->partitions]
   (mapcat (fn [[topic partitions]] (map #(TopicPartition. topic %) partitions)) topic->partitions))
  ([^KafkaConsumer consumer topics-or-partitions]
   (->topic-partitions (if (map? topics-or-partitions)
                         topics-or-partitions
                         (topics->assigned-topic-partitions consumer topics-or-partitions)))))

Manually assign partitions to this consumer. topic-partitions should be a map {topic [partitions]} returns the consumer

note: assign and subscribe are mutualy exclusive

(defn assign
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer topic-partitions]
  (.assign consumer ^java.util.Collection (->topic-partitions topic-partitions))
  consumer)

seek to the first offset of either all the assigned partitions or the given topic|[topics]|{topic [partitions]} returns the consumer

(defn seek-to-beginning
  {:added "3.2.0-1.7"}
  ([^KafkaConsumer consumer] (seek-to-beginning consumer nil))
  ([^KafkaConsumer consumer topics-or-partitions]
   (.seekToBeginning consumer ^java.util.Collection (->topic-partitions consumer topics-or-partitions))
   consumer))

seek to the last offset of either all the assigned partitions or the given topic|[topics]|{topic [partitions]} returns the consumer

(defn seek-to-end
  {:added "3.2.0-1.7"}
  ([^KafkaConsumer consumer] (seek-to-end consumer nil))
  ([^KafkaConsumer consumer topics-or-partitions]
   (.seekToEnd consumer ^java.util.Collection (->topic-partitions consumer topics-or-partitions))
   consumer))

Overrides the fetch offsets that the consumer will use on the next poll returns the consumer

(defn seek
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer topic partition offset]
  (.seek consumer (TopicPartition. topic partition) offset)
  consumer)

suspend fetching from either all the assigned partitions or the given topic|[topics]|{topic [partitions]} returns the consumer

(defn pause
  {:added "3.2.0-1.7"}
  ([^KafkaConsumer consumer] (pause consumer nil))
  ([^KafkaConsumer consumer topics-or-partitions]
   (.pause consumer ^java.util.Collection (->topic-partitions consumer topics-or-partitions))
   consumer))

resume fetching from either all the assigned partitions or the given topic|[topics]|{topic [partitions]} returns the consumer

(defn resume
  {:added "3.2.0-1.7"}
  ([^KafkaConsumer consumer] (resume consumer nil))
  ([^KafkaConsumer consumer topics-or-partitions]
   (.resume consumer ^java.util.Collection (->topic-partitions consumer topics-or-partitions))
   consumer))
(defn paused [^KafkaConsumer consumer]
  (map topic-partition->map (.paused consumer)))

Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.

This method returns immediately if there are records available. Otherwise, it will await the timeout ms.

If the timeout expires, an empty record set will be returned.

(defn poll
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer timeout]
  (.poll consumer (Duration/ofMillis timeout)))

Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.

The thread which is blocking in an operation will throw WakeupException.

If no thread is blocking in a method which can throw WakeupException, the next call to such a method will raise it instead.

(defn wakeup
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer]
  (.wakeup consumer))

transforms a ConsumerRecord to a clojure map containing: :key``:value :offset :topic :partition :timestamp :timestamp-type and :header

(defn consumer-record->map
  {:added "3.2.0-1.7"}
  [^ConsumerRecord record]
  {:key            (.key record)
   :offset         (.offset record)
   :partition      (.partition record)
   :timestamp      (.timestamp record)
   :timestamp-type (.name (.timestampType record))
   :headers        (.toArray (.headers record))
   :topic          (.topic record)
   :value          (.value record)})

takes the return off a poll (see ConsumerRecords) returns a lazy seq of records as clojure maps

(defn poll->all-records
  {:added "3.2.0-1.7"}
  [^ConsumerRecords records]
  (map consumer-record->map (iterator-seq (.iterator records))))

takes the return off a poll (see ConsumerRecords) returns the first record as a clojure map

(defn poll->record
  {:added "3.2.0-1.7"}
  [^ConsumerRecords records]
  (first (poll->all-records records)))

takes the return of a poll (see ConsumerRecords) returns a map {topic records-seq}

(defn poll->records-by-topic
  {:added "3.2.0-1.7"}
  [^ConsumerRecords records]
  (let [topics (map (comp :topic topic-partition->map) (.partitions records))]
    (->> topics
         (map (fn [topic] [topic (map consumer-record->map
                                      (.records records ^String topic))]))
         (into {}))))
(defn ^:no-doc poll->records-by-partition
  [^ConsumerRecords records])

Poll records and run process-fn on each of them (presumably for side effects)

(defn poll-and-process
  {:added "3.2.0-1.7"}
  [^KafkaConsumer consumer timeout process-fn commit-policy]
  (let [records (-> (poll consumer timeout)
                    (poll->all-records))]
    (doseq [record records]
      (process-fn record)
      (when (= :record commit-policy)
        (commit-message-offset consumer record)))
    (when (= :poll commit-policy)
      (commit-sync consumer))))

create a consumer

conf is a map {:keyword value} See: https://kafka.apache.org/documentation/#consumerconfigs for all possibilities

key and value serializer can be one of keys defined in felice.serializer namespace with the 1 argument arity, :key.deserializer and :value.deserializer must be provided in conf

you can optionaly provide a list of topics to subscribe to

(defn consumer
  {:added "3.2.0-1.7"}
  ([conf]
   (let [kd (deserializer (:key.deserializer conf))
         vd (deserializer (:value.deserializer conf))
         conf* (-> conf
                   (dissoc :key.deserializer :value.deserializer :topics)
                   coerce-consumer-config
                   walk/stringify-keys)
         kc (KafkaConsumer. ^java.util.Map conf* kd vd)]
     (when-let [topics (:topics conf)]
       (apply subscribe kc topics))
     kc))
  ([conf topics]
   (consumer (assoc conf :topics topics)))
  ([conf key-deserializer value-deserializer]
   (consumer (assoc conf :key.deserializer key-deserializer
                    :value.deserializer value-deserializer)))
  ([conf key-deserializer value-deserializer topics]
   (consumer (assoc conf :topics topics) key-deserializer value-deserializer)))

Tries to close the consumer cleanly within the specified timeout in ms (defaults to 30 secs).

This method waits up to timeout for the consumer to complete pending commits and leave the group.

If auto-commit is enabled, this will commit the current offsets if possible within the timeout.

If the consumer is unable to complete offset commits and gracefully leave the group before the timeout expires, the consumer is force closed.

(defn close!
  {:added "3.2.0-1.7"}
  ([^KafkaConsumer consumer]         (.close consumer))
  ([^KafkaConsumer consumer timeout] (.close consumer (Duration/ofMillis timeout))))
(defn poll-record* [consumer topic partition offset]
  (-> consumer
      (assign {topic [partition]})
      (seek topic partition offset)
      (poll 100)
      (poll->record)))

instanciate a consumer according to consumer-conf then fetches the record on the givent topic partiton and offset

(defn poll-record
  {:added "3.2.0-1.7"}
  [consumer-conf topic partition offset]
  (let [consumer (consumer (assoc consumer-conf
                                  :enable.auto.commit false
                                  :max.poll.records   1))
        record (poll-record* topic partition offset)]
    (close! consumer)
    record))
(defn poll-loop*
  [consumer
   process-record-fn
   {:keys [poll-timeout on-error-fn commit-policy close-timeout-ms]
    :or {poll-timeout 2000 close-timeout-ms 5000}}]
  {:added "3.2.0-1.7"}
  (let [continue?  (atom true)
        completion (future
                     (try
                       (while @continue?
                         (try
                           (poll-and-process consumer poll-timeout process-record-fn commit-policy)
                           (catch WakeupException _)
                           (catch Throwable t
                             (if on-error-fn (on-error-fn t))
                             (throw t))))
                       :stopped
                       (catch Throwable t t)
                       (finally
                         (close! consumer (or close-timeout-ms Long/MAX_VALUE)))))]
    (fn
      ([]
       (reset! continue? false)
       (deref completion))
      ([timeout]
       (deref completion timeout :polling)))))

Start a consumer loop, calling a callback for each record, and returning a function to stop the loop.

Parameters

         consumer: consumer config (see consumer)
process-record-fn: function to call with each record polled
          options: {:poll-timeout 2000 ; duration of a polling without events (ms)
                    :on-error-fn  (fn [ex] ...); called on exception
                    :commit-policy :never ; #{:never :poll :record}}

commit policy

  • :never : does nothing (use it if you enabled client auto commit)
  • :poll : commit last read offset after processing all the items of a poll
  • :record : commit the offset of every processed record

    if you want to commit messages yourself, set commit policy to :never and use commit-message-offset or commit-sync

Returns

          stop-fn: callback function to stop the loop
(defn poll-loop
  {:added "3.2.0-1.7"}
  ([consumer-conf process-record-fn] (poll-loop consumer-conf process-record-fn {}))
  ([consumer-conf process-record-fn opts]
   (let [consumer (consumer consumer-conf)]
     (poll-loop* consumer process-record-fn opts))))
(defn poll-loops* [consumer-conf process-record-fn topics opts threads]
  (for [n (range threads)
        :let [consumer (consumer consumer-conf topics)]]
    (poll-loop* consumer process-record-fn opts)))

Start consumer loops, calling a callback for each record, and returning a function to stop the loops.

Parameters

         consumer: consumer config (see consumer)
process-record-fn: function to call with each record polled
           topics: topics you want to subscribe to
          options: {:poll-timeout 2000 ; duration of a polling without events (ms)
                    :on-error-fn  (fn [ex] ...); called on exception
                    :commit-policy :never ; #{:never :poll :record}
                    :threads-by-topic 1 ; number of spawned consumers for each topic
                    :threads 1 ; number of spawned consumers}

commit policy

  • :never : does nothing (use it if you enabled client auto commit)
  • :poll : commit last read offset after processing all the items of a poll
  • :record : commit the offset of every processed record

    if you want to commit messages yourself, set commit policy to :never and use commit-message-offset or commit-sync

Multi-threading

You can set either :threads-by-topic or :threads option (if both are set, :threads-by-topic will win) * :threads : spawn N threads total (each thread listening all registered topic) * :threads-by-topic : spawn N threads for each registered topic * you can also provide a map {:topic :threads} instead of a list of topics

Returns

          stop-fn: callback function to stop the loop
(defn poll-loops
  {:added "3.2.0-1.7"}
  ([consumer-conf process-record-fn] (poll-loops consumer-conf process-record-fn {}))
  ([consumer-conf process-record-fn {:as opts}]
   (if-let [topics (:topics consumer-conf)]
     (poll-loops consumer-conf process-record-fn topics opts)
     (throw (ex-info "topics configuration is missing"
                     {:consumer-configuration consumer-conf
                      :info "you must specify a (list of )topic(s) either in the consumer config or using the 4 params arity of 'poll-loops'"}))))
  ([consumer-conf process-record-fn topics {:keys [threads threads-by-topic] :as opts}]
   (let [loops
         (doall
          (if (and threads (nil? threads-by-topic))
            (poll-loops* consumer-conf process-record-fn topics opts threads)
            (flatten (for [topic topics]
                       (let [[topic threads] (if (map-entry? topic)
                                               topic
                                               [topic (or threads-by-topic 1)])]
                         (poll-loops* consumer-conf process-record-fn [topic] opts
                                      (or threads threads-by-topic 1)))))))]
     (fn
       ([]        (doall (for [loop loops] (loop))))
       ([timeout] (doall (for [loop loops] (loop timeout))))))))
 
(ns felice.async
  (:require [felice.consumer :as consumer])
  (:import org.apache.kafka.common.errors.WakeupException))
(def async-enabled?
  (try
    (require '[clojure.core.async :as async])
    true
    (catch Throwable _ false)))
(defn- consumer-control [consumer [type data]]
  (try
    (condp = type
      :commit-message (consumer/commit-message-offset consumer data)
      (str "unknown control type" type))
    (catch WakeupException _)))
(defn commit-message-offset [consumer message]
  (async/>!! (:control-chan consumer)
             [:commit-message message])
  (consumer/wakeup (:consumer consumer)))
(defn poll-chan
  [consumer poll-timeout records-chan control-chan]
  (future
    (try
      (while (not (clojure.core.async.impl.protocols/closed? records-chan))
        (async/onto-chan records-chan
                         (consumer/poll->all-records
                          (try (consumer/poll consumer poll-timeout)
                               (catch WakeupException _)))
                         false)
        ;; process consumer controls
        (loop []
          (let [res (async/alt!!
                      control-chan ([control]
                                    (if control
                                      (consumer-control consumer control)
                                      :closed))
                      (async/timeout 100) :timeout)]
            (when-not (#{:timeout :closed} res)
              (recur)))))
      :ok
      (catch Throwable t t)
      (finally
        (.close consumer)))))
(defn poll-chans
  [consumer poll-timeout topic->chan control-chan]
  (let [continue? (atom true)
        completion
        (future
          (try
            (while @continue?
              (let [poll (try (consumer/poll consumer poll-timeout)
                              (catch WakeupException _))
                    records-by-topic (consumer/poll->records-by-topic poll)]
                (doseq [[topic records] records-by-topic]
                  (async/onto-chan (topic->chan topic) records false)))
              ;; process consumer controls
              (loop []
                (let [res (async/alt!!
                            control-chan ([control]
                                          (if control
                                            (consumer-control consumer control)
                                            :closed))
                            (async/timeout 100) :timeout)]
                  (when-not (#{:timeout :closed} res)
                    (recur)))))
            :ok
            (catch Throwable t t)
            (finally
              (.close consumer))))]
    (fn close! []
      (reset! continue? false)
      (deref completion))))
(defn consumer
  ([conf key-deserializer value-deserializer topic->chan]
   {:pre [async-enabled?]}
   (let [consumer (consumer/consumer conf key-deserializer value-deserializer)
         control-chan (async/chan 100)]
     (apply consumer/subscribe consumer (keys topic->chan))
     (let [close-polling! (poll-chans consumer 2000 topic->chan control-chan)]
       {:consumer     consumer
        :chans        topic->chan
        :control-chan control-chan
        :close! (fn []
                  (close-polling!)
                  (async/close! control-chan)
                  (doseq [chan (vals topic->chan)]
                    (async/close! chan)))})))
  ([conf key-deserializer value-deserializer records-chan & topics]
   {:pre [async-enabled?]}
   (let [consumer (consumer/consumer conf key-deserializer value-deserializer)
         control-chan (async/chan 100)]
     (apply consumer/subscribe consumer topics)
     (let [polling (poll-chan consumer 2000 records-chan control-chan)]
       {:consumer consumer
        :records-chan records-chan
        :control-chan control-chan
        :close!  (fn []
                   (async/close! records-chan)
                   (async/close! control-chan)
                   (deref polling))}))))
(defn close!
  [{:keys [close!] :as _consumer}]
  (close!))
 
(ns felice.admin
  (:require [clojure.walk :refer [stringify-keys]]
            [clojure.spec.alpha :as s]
            [clojure.string :as str])
  (:import org.apache.kafka.clients.admin.Admin
           org.apache.kafka.clients.admin.NewTopic
           org.apache.kafka.common.config.TopicConfig
           org.apache.kafka.common.Node
           org.apache.kafka.common.TopicPartition
           org.apache.kafka.common.TopicPartitionInfo
           org.apache.kafka.clients.consumer.OffsetAndMetadata))
(defn- ->node
  [^Node node]
  (cond-> {:host (.host node)
           :port (.port node)
           :id (.id node)}
    (.hasRack node) (assoc :rack (.rack node))))
(defn- ->topic-partition
  [^TopicPartitionInfo partition]
  {:isr (map ->node (.isr partition))
   :leader (->node (.leader partition))
   :replicas (map ->node (.replicas partition))})
(defn- ->offset-metadata
  [^OffsetAndMetadata offset-metadata]
  {:metadata (.metadata offset-metadata)
   :offset (.offset offset-metadata)})

Instanciate an Admin from properties

(defn admin-client
  {:added "3.2.0-1.7"}
  ^Admin
  ([props]
   (let [props* (-> (stringify-keys props)
                    (dissoc :key.deserializer :value.deserializer :topics))]
     (Admin/create ^java.util.Map props*))))

Close the Admin client and release all associated resources.

The close operation has a grace period during which current operations will be allowed to complete, specified by the given duration. New operations will not be accepted during the grace period.

Once the grace period is over, all operations that have not yet been completed will be aborted with a TimeoutException.

(defn admin-close
  {:added "3.2.0-1.7"}
  ([^Admin ac]
   (.close ac)))

Get the metrics kept by the adminClient

(defn admin-metrics
  {:added "3.2.0-1.7"}
  ([^Admin ac]
   (some->> (.metrics ac)
            (map #(.getValue %))
            (map (fn [o]
                   (let [name (.metricName o)]
                     {:name (.name name)
                      :group (.group name)
                      :description (.description name)
                      :tags (.tags name)
                      :value (.metricValue o)}))))))

Get information about the nodes in the cluster, using the default options.

(defn describe-cluster
  {:added "3.2.0-1.7"}
  ([^Admin ac]
   (let [desc (.describeCluster ac)
         aop (.authorizedOperations desc)
         cluster-id (.clusterId desc)]
     {:cluster-id @cluster-id
      :authorized-operation @aop
      :controller-node (->node (deref (.controller desc)))
      :nodes (map ->node (deref (.nodes desc)))})))

List topics for the current Admin connection

(defn list-topics
  {:added "3.2.0-1.7"}
  ([^Admin ac]
   (some->> (.listTopics ac)
            (.names)
            deref)))

From a configuration map, try to resolve static class field and populate a map configuration

(defn- safely-resolve-field [class f]
  (try (.get (.getField class f) nil) (catch Exception _ nil)))
(defn- static-field->props
  [m class]
  (reduce
   (fn [acc p]
     (let [[k v]
           (map (fn [e]
                  (->> (str/replace (name e) "." "_")
                       (str/upper-case))) p)
           k? (safely-resolve-field class k)
           v? (or (safely-resolve-field class v) v)]
       (if k? (assoc acc k? v?) acc)))
   (sorted-map)
   m))
(defn- mk-topic-instance
  ^NewTopic
  [^String topic-name partition-count replication-factor props]
  (let [topic* (NewTopic. topic-name (int partition-count) (short replication-factor))]
    (when-not (empty? props)
      (.configs topic*
                (static-field->props props TopicConfig)))
    topic*))
(defn- submit-topic-creation-request
  [^Admin ac topic-instances]
  (some->>
   (.createTopics ac topic-instances)
   (.values)
   (map (fn [[k f]]
          (try
            (.get f)
            {:topic k
             :status :kafka.topic/created}
            (catch java.util.concurrent.ExecutionException e
              {:topic k
               :message (.getMessage e)
               :status :kafka.topic/error}))))))

Create a new topic

(defn create-topic
  {:added "3.2.0-1.7"}
  ([^Admin ac ^String topic-name partition-count replication-factor]
   (create-topic ac topic-name partition-count replication-factor {}))
  ([^Admin ac ^String topic-name partition-count replication-factor ^java.util.Map props]
   (let [topic* (mk-topic-instance topic-name partition-count replication-factor props)]
     (first (submit-topic-creation-request ac [topic*])))))
(s/def :kafka.topic/name string?)
(s/def :kafka.topic/partition-count int?)
(s/def :kafka.topic/replication-factor int?)
(s/def :kafka.topic/props map?)
(s/def ::kafka-topic (s/keys :req-un [:kafka.topic/name
                                      :kafka.topic/partition-count
                                      :kafka.topic/replication-factor
                                      :kafka.topic/props]))
(s/def ::kafka-topics (s/coll-of ::kafka-topic))

Create new topics from list of objects

This operation is not transactional so it may succeed for some topics while fail for others.

(defn create-topics
  {:added "3.2.0-1.7"}
  ([^Admin ac topics]
   (if (s/valid? ::kafka-topics topics)
     (let [topics* (->> topics
                        (mapv (fn [{:keys [name partition-count replication-factor props] :as t}]
                                (mk-topic-instance name partition-count replication-factor props))))]
       (submit-topic-creation-request ac topics*))
     (throw (ex-info "Bad Topics spec" (s/explain-data ::kafka-topic topics))))))

Delete a topic set list

This operation is not transactional so it may succeed for some topics while fail for others.

(defn delete-topics
  {:added "3.2.0-1.7"}
  ([^Admin ac topics]
   (if (s/valid? (s/coll-of :kafka.topic/name) topics)
     (->> (.deleteTopics ac topics)
          (.values)
          (map (fn [[k f]]
                 (try
                   (.get f)
                   {:topic k
                    :status :kafka.topic/deleted}
                   (catch java.util.concurrent.ExecutionException e
                     {:topic k
                      :message (.getMessage e)
                      :status :kafka.topic/error})))))
     (throw (ex-info "Bad Topics spec" (s/explain-data (s/coll-of :kafka.topic/name) topics))))))

Delete a topic

(defn delete-topic
  {:added "3.2.0-1.7"}
  ([^Admin ac topic-name]
   (first (delete-topics ac #{topic-name}))))

Describe some topics in the cluster.

If not topic list provided, describe all the topics in the cluster.

(defn describe-topics
  {:added "3.2.0-1.7"}
  ([^Admin ac]
   (let [topics* (list-topics ac)]
     (describe-topics ac topics*)))
  ([^Admin ac topic-list]
   (let [op (.allTopicNames (.describeTopics ac topic-list))]
     (->> (into {} (.get op))
          (reduce (fn [acc [name o]]
                    (assoc acc name
                           {:uuid       (str (.topicId o))
                            :partitions (map ->topic-partition (.partitions o))}))
                  (sorted-map))))))

Describe a topic.

(defn describe-topic
  {:added "3.2.0-1.7"}
  ([^Admin ac topic]
   (first (describe-topic ac #{topic}))))

List the consumer groups for the current Admin connection

(defn list-consumer-groups
  {:added "3.2.0-1.7"}
  ([^Admin ac]
   (some->> (.listConsumerGroups ac)
            (.all)
            (.get)
            (map (fn [o]
                   {:group-id (.groupId o)
                    :is-simple-consumer-group (.isSimpleConsumerGroup o)
                    :state (keyword (.orElse (.state o) "unknown"))})))))

List consumer group offsets, if no group id specified, compute for all the group-id well-known in the current cluster connection.

(defn list-consumer-groups-offsets
  {:added "3.2.0-1.7"}
  ([^Admin ac]
   (let [all-group-ids* (map :group-id (list-consumer-groups ac))]
     (doall
      (keep (partial list-consumer-groups-offsets ac) all-group-ids*))))
  ([^Admin ac group-id]
   (let [per-topic-offsets
         (some->> (.listConsumerGroupOffsets ac group-id)
                  (.partitionsToOffsetAndMetadata)
                  (.get)
                  (map (fn [[t m]]
                         {:topic-name (.topic t)
                          :partition-name (str t)
                          :partition t
                          :metadata (->offset-metadata m)}))
                  (group-by :topic-name)
                  (not-empty))]
     (when (some? per-topic-offsets)
       {:group-id group-id
        :topics (into {} (map (fn [[t v]]
                                [t (map #(dissoc % :topic-name) v)])
                              per-topic-offsets))}))))

Sum consumer group offset over all partitions

(defn sum-consumer-groups-offsets
  {:added "3.2.0-1.7"}
  ([^Admin ac]
   (let [all-group-ids* (map :group-id (list-consumer-groups ac))]
     (into {} (keep (partial sum-consumer-groups-offsets ac) all-group-ids*))))
  ([^Admin ac group-id]
   (let [consumer-group* (list-consumer-groups-offsets ac group-id)]
     (when-not (empty? consumer-group*)
       {group-id (->> consumer-group*
                      :topics
                      (map (fn [[topic offsets]]
                             {:topic topic
                              :sum (apply + (map #(get-in % [:metadata :offset])
                                                 offsets))})))}))))

Alters offsets for the specified group of a specific topic.

Once alter completed, list the current offset

Yield nil if no link found between consummers & topic

(defn set-consumer-group-topic-offset
  {:added "3.2.0-1.7"}
  ([^Admin ac ^String group-id ^String topic offset]
   (let [target-offset (OffsetAndMetadata. (long offset))
         old-om (.get (.partitionsToOffsetAndMetadata
                       (.listConsumerGroupOffsets ac group-id)))
         updated-offsets (into {}
                               (keep (fn [[t]]
                                       (when (= topic (.topic t))
                                         {t target-offset}))
                                     old-om))]
     (when-not (empty? updated-offsets)
       (loop [op (.all (.alterConsumerGroupOffsets ac group-id updated-offsets))]
         (if (.isDone op)
           {:group-id group-id
            :topic topic
            :offsets (some->> (list-consumer-groups-offsets ac group-id)
                              :topics
                              (filter (fn [[t]] (= t topic)))
                              first
                              second)}
           (recur op)))))))

Delete consumer groups from the cluster with the default options.

(defn delete-consumer-groups
  {:added "3.2.0-1.7"}
  ([^Admin ac groups]
   (->> (.deleteConsumerGroups ac groups)
        .deletedGroups
        (map (fn [[group status]]
               (try
                 (.get status)
                 {:group-id group
                  :status :kafka.consumer-group/deleted}
                 (catch Exception e
                   {:group-id group
                    :status :kafka.consumer-group/error
                    :message (.getMessage e)})))))))

Delete one consumer group from the cluster with the default options.

(defn delete-consumer-group
  {:added "3.2.0-1.7"}
  ([^Admin ac group-id]
   (first (delete-consumer-groups ac [group-id]))))
(s/def ::kafka-partition #(instance? TopicPartition %))
(s/def ::kafka-partitions (s/coll-of ::kafka-partition))

Delete committed offsets for a set of partitions in a consumer group.

NOTE: This will succeed at the partition level only if the group is not actively subscribed to the corresponding topic.

(defn delete-consumer-group-offsets
  {:added "3.2.0-1.7"}
  ([^Admin ac ^String group-id]
   (when-let [partitions* (some->> (list-consumer-groups-offsets ac group-id)
                                   :topics
                                   (mapcat second)
                                   (keep :partition))]
     (delete-consumer-group-offsets ac group-id partitions*)))
  ([^Admin ac ^String group-id  partitions]
   (if (s/valid? ::kafka-partitions partitions)
     (let [op (.deleteConsumerGroupOffsets ac group-id (set partitions))]
       (->> partitions
            (map (fn [^TopicPartition o]
                   (try
                     (.get (.partitionResult op o))
                     {:partition-offset (.toString o)
                      :status :kafka.consumer-group-offset/deleted}
                     (catch Exception e
                       {:partition-offset (.toString o)
                        :status :kafka.consumer-group-offset/error
                        :message (.getMessage e)}))))))
     (throw (ex-info "Bad Partition spec" (s/explain-data ::kafka-partitions partitions))))))
 
(ns felice.producer
  (:require [clojure.walk :as walk]
            [felice.serialization :refer [serializer]])
  (:import java.util.concurrent.TimeUnit
           [org.apache.kafka.clients.producer KafkaProducer ProducerRecord Callback]))
(def CONF-COERCERS {:retries                               int
                    :batch.size                            int
                    :connections.max.idle.ms               int
                    :delivery.timeout.ms                   int
                    :max.request.size                      int
                    :receive.buffer.bytes                  int
                    :request.timeout.ms                    int
                    :send.buffer.bytes                     int
                    :max.in.flight.requests.per.connection int
                    :metrics.num.samples                   int
                    :sasl.login.connect.timeout.ms         int
                    :sasl.login.read.timeout.ms            int
                    :sasl.login.refresh.buffer.seconds     short
                    :sasl.login.refresh.min.period.seconds short
                    :sasl.oauthbearer.clock.skew.seconds   int
                    :transaction.timeout.ms                int})
(defn- coerce-producer-config
  [cfg]
  (->> cfg
       (map (fn [[k v]]
              (let [coerce-fn (get CONF-COERCERS (keyword k))
                    v* (if (and v coerce-fn) (coerce-fn v) v)]
                [k v*])))
       (into {})))

Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.

(defn flush!
  {:added "3.2.0-1.7"}
  [^KafkaProducer producer]
  (.flush producer))

Get the full set of internal metrics maintained by the producer.

(defn metrics
  {:added "3.2.0-1.7"}
  [^KafkaProducer producer]
  (.metrics producer))

Get the partition metadata for the given topic.

(defn partitions-for
  {:added "3.2.0-1.7"}
  [^KafkaProducer producer topic]
  (.partitionsFor producer topic))

Transforms a clojure map to a ProducerRecord object

(defn ^:no-doc producer-record
  {:added "3.2.0-1.7"}
  [{:keys [topic key value partition timestamp headers]}]
  (ProducerRecord. topic partition timestamp key value headers))

Sends a ProducerRecord with an optional callback when the send has been acknowledged.

(defn ^:no-doc send-record!
  {:added "3.2.0-1.7"}
  ([^KafkaProducer producer ^ProducerRecord record]
   (.send producer (producer-record record)))
  ([^KafkaProducer producer ^ProducerRecord record callback]
   (.send producer
          (producer-record record)
          (reify Callback
            (onCompletion [this metadata exception]
              (callback (or exception metadata)))))))

creates a record map given a topic, a value and an optional key

(defn ->record
  {:added "3.2.0-1.7"}
  ([topic value]     {:topic topic          :value value})
  ([topic key value] {:topic topic :key key :value value}))

asynchronously send a record

(defn send!
  {:added "3.2.0-1.7"}
  ([^KafkaProducer producer topic value]     (send-record! producer (->record topic value)))
  ([^KafkaProducer producer topic key value] (send-record! producer (->record topic key value)))
  ([^KafkaProducer producer record-map]      (send-record! producer record-map)))

Asynchronously send a record triggering the given callback when the send has been acknowledged. Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads.

If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing.

(defn send-with-callback!
  {:added "3.2.0-1.7"}
  ([^KafkaProducer producer topic value cb]     (send-record! producer (->record topic value) cb))
  ([^KafkaProducer producer topic key value cb] (send-record! producer (->record topic key value) cb))
  ([^KafkaProducer producer record-map cb]      (send-record! producer record-map cb)))

Synchronously send a record - wait until acknowledged

(defn send!!
  {:added "3.2.0-1.7"}
  ([^KafkaProducer producer topic value]     (send!! producer (->record topic value)))
  ([^KafkaProducer producer topic key value] (send!! producer (->record topic key value)))
  ([^KafkaProducer producer record-map] (.get (send! producer record-map))))

Create a producer

conf is a map {:keyword value} See https://kafka.apache.org/documentation/#producerconfigs for all possibilities

key and value serializer can be one of keys defined in felice.serializer namespace with the 1 argument arity, :key.serializer and :value.serializer must be provided in conf

(defn producer
  {:added "3.2.0-1.7"}
  ([conf]
   (KafkaProducer. (-> conf
                       (dissoc :key.serializer :value.serializer)
                       coerce-producer-config
                       walk/stringify-keys)
                   (serializer (:key.serializer conf))
                   (serializer (:value.serializer conf))))
  ([conf key-serializer value-serializer]
   (producer (assoc conf :key.serializer key-serializer
                    :value.serializer value-serializer))))

This method waits up to timeout ms for the producer to complete the sending of all incomplete requests.

If the producer is unable to complete all requests before the timeout expires, this method will fail any unsent and unacknowledged records immediately.

Calling close with no timeout is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)

(defn close!
  {:added "3.2.0-1.7"}
  ([^KafkaProducer producer]         (.close producer))
  ([^KafkaProducer producer timeout] (.close producer timeout TimeUnit/MILLISECONDS)))
 
(ns ^:no-doc felice.serialization
  (:require [cognitect.transit :as transit]
            [jsonista.core :as json]
            [taoensso.nippy :as nippy])
  (:import [org.apache.kafka.common.serialization
            Serializer       Deserializer
            LongSerializer   LongDeserializer
            StringSerializer StringDeserializer]
           [java.io ByteArrayInputStream ByteArrayOutputStream]))

transit

(defn transit-serializer [type]
  (reify
    Serializer
    (close [this])
    (configure [this config is-key?])
    (serialize [this topic payload]
      (let [out (ByteArrayOutputStream.)
            writer (transit/writer out type)]
        (transit/write writer payload)
        (.toByteArray out)))))
(defn transit-deserializer [type]
  (reify
    Deserializer
    (close [this])
    (configure [this config is-key?])
    (deserialize [this topic payload]
      (let [in (ByteArrayInputStream. payload)
            reader (transit/reader in type)]
        (transit/read reader)))))
(def json-mapper
  (json/object-mapper {:encode-key-fn name
                       :decode-key-fn keyword
                       :date-format "yyyy-MM-dd'T'HH:mm:ss.SSSX"}))

json

(defn json-serializer []
  (reify
    Serializer
    (close [this])
    (configure [this config is-key?])
    (serialize [this topic payload]
      (.getBytes (json/write-value-as-string payload json-mapper)))))
(defn json-deserializer []
  (reify
    Deserializer
    (close [this])
    (configure [this config is-key?])
    (deserialize [this topic payload]
      (let [as-string (String. payload)]
        (try (json/read-value as-string json-mapper)
             (catch Exception e
               (throw (ex-info "malformed json"
                               {:cause e
                                :content as-string
                                :topic topic}))))))))
(defn json-safe-deserializer []
  (reify
    Deserializer
    (close [this])
    (configure [this config is-key?])
    (deserialize [this topic payload]
      (let [as-string (String. payload)]
        (try (json/read-value as-string json-mapper)
             (catch Exception e
               {:raw-value as-string
                ::error {:deserializing e}}))))))

nippy

(defn nippy-serializer ^Serializer [type]
  (reify
    Serializer
    (close [this])
    (configure [this config is-key?])
    (serialize [this topic payload]
      (condp = type
        :fast (nippy/fast-freeze payload)
        :lz4  (nippy/freeze payload {:incl-metadata? false
                                     :compressor nippy/lz4-compressor})))))
(defn nippy-deserializer ^Deserializer [type]
  (reify
    Deserializer
    (close [this])
    (configure [this config is-key?])
    (deserialize [this topic payload]
      (try
        (condp = type
          :fast (nippy/fast-thaw payload)
          :lz4  (nippy/thaw payload {:incl-metadata? false
                                     :compressor nippy/lz4-compressor}))
        (catch Exception e
          (throw (ex-info "corrupted nippy byte array"
                          {:cause e
                           :topic topic})))))))

references

(def serializers {:long       (fn [] (LongSerializer.))
                  :string     (fn [] (StringSerializer.))
                  :json       json-serializer
                  :t+json     (partial transit-serializer :json)
                  :t+mpack    (partial transit-serializer :msgpack)
                  ;; TODO: Implement password encryption mechanism
                  ;; in the constructor for Nippy
                  :nippy+fast (partial nippy-serializer :fast)
                  :nippy+lz4  (partial nippy-serializer :lz4)})
(def deserializers {:long       (fn [] (LongDeserializer.))
                    :string     (fn [] (StringDeserializer.))
                    :json       json-deserializer
                    :json-safe  json-safe-deserializer
                    :t+json     (partial transit-deserializer :json)
                    :t+mpack    (partial transit-deserializer :msgpack)
                    :nippy+fast (partial nippy-deserializer :fast)
                    :nippy+lz4  (partial nippy-deserializer :lz4)})

implementation

(defn ^Serializer serializer [s]
  (if (keyword? s)
    (if-let [serializer (serializers s)]
      (serializer)
      (throw (ex-info "failed to initialize kafka serializer"
                      {:cause   (str "unknown serializer alias " s)
                       :allowed (keys serializers)})))
    s))
(defn ^Deserializer deserializer [d]
  (if (keyword? d)
    (if-let [deserializer (deserializers d)]
      (deserializer)
      (throw (ex-info "failed to initialize kafka deserializer"
                      {:cause   (str "unknown deserializer alias " d)
                       :allowed (keys deserializers)})))
    d))