clj-kafka-repl.kafka

Functions for consuming, producing and reading metadata from kafka.

consume

(consume topic & {:keys [partition offset partition-offsets key-deserializer value-deserializer limit filter-fn], :or {partition nil, offset :end, partition-offsets nil, key-deserializer (default-key-deserializer), value-deserializer (default-value-deserializer), limit nil, filter-fn (constantly true)}})

Opens a consumer over the specified topic and returns a ::ch/tracked-channel which is a wrapper over a core.async channel.

The channel will stay open indefinitely unless either: a) the channel is explicitly closed using ch/close! or b) the specified message limit is reached.

Examples of pulling data from channels:

  • Pop the next message (if any) from the channel: (ch/poll! tc)
  • Stream channel to file: (ch/to-file tc “/workspace/temp/your-file”)
  • Stream channel to stdout: (ch/to-stdout tc)
  • And then close the channel with: (ch/close! tc)
key default description
:partition nil Limit consumption to a specific partition.
:offset :end Start consuming from the specified offset. Valid values: :start, :end, numeric offset, timestamp (as date/time string)
:partition-offsets nil Vector of partition+offset vector pairs that represent a by-partition representation of offsets to start consuming from.
:key-deserializer nil Deserializer to use to deserialize the message key. Overrides the value in config. Defaults to a string deserializer.
:value-deserializer nil Deserializer to use to deserialize the message value. Overrides the value in config. Defaults to a string deserializer.
:limit nil The maximum number of messages to pull back either into the stream or the results vector (depending on stream mode).
:filter-fn nil filter function to apply to the incoming :kafka-message(s). Can be a string, in which case a filter on the message value containing that string is implied.

get-earliest-offsets

(get-earliest-offsets topic & {:keys [partitions], :or {partitions nil}})

Gets a vector of vectors representing the mapping of partition->earliest-offset for the partitions of the given topic.

key default description
:partitions nil Limit the results to the specified collection of partitions.

get-group-offset

(get-group-offset topic group partition)

Gets the offset of the given consumer group on the given topic/partition.

get-group-offsets

(get-group-offsets topic group)

Gets the offsets on all partitions of the given topic for the specified consumer group.

get-lag

(get-lag topic group & {:keys [verbose?], :or {verbose? true}})

Gets the topic lag for the given consumer group.

key default description
:verbose? false If true, will include by-partition breakdown.

get-latest-offsets

(get-latest-offsets topic & {:keys [partitions], :or {partitions nil}})

Gets a vector of vectors representing the mapping of partition->latest-offset for the partitions of the given topic.

key default description
:partitions nil Limit the results to the specified collection of partitions.

get-message

(get-message topic offset & {:keys [value-deserializer partition], :or {value-deserializer nil, partition nil}})

Gets the message at the specified offset on the given topic (if any).

key default description
:partition nil Limit consumption to a specific partition.
:value-deserializer nil Deserializer to use to deserialize the message value.

get-topic-partitions

(get-topic-partitions topic)

Gets the vector of partitions available for the given topic.

get-topics

(get-topics & {:keys [search]})

Lists the names of all topics.

key default description
:search nil String to filter topics on. Only topic names containing the string will be returned.

produce

(produce topic & {:keys [key-serializer value-serializer], :or {key-serializer (default-key-serializer), value-serializer (default-value-serializer)}})

Produce messages to the specified topic.

key default description
:key-serializer nil Serializer to use to serialize the message key. Will use a string deserializer if not specified.
:value-serializer nil Serializer to use to serialize the message value. Will use an edn serializer if not specified.

sample

(sample topic & opts)

Convenience function around kafka/consume to just sample a message from the topic.

set-group-offsets!

(set-group-offsets! topic group partition-offsets & {:keys [consumer], :or {consumer nil}})

Sets the offsets for the specified group on the specified topic to the offsets given in the passed sequence of partition->offset pairs. The offset in each pair can be one of several types:

  • A natural integer - an absolute offset.
  • A negative integer - an offset relative to the current offset (i.e. deduct from the current offset)
  • :start - seek to start.
  • :end - seek to end.
  • date-time string - set offset to that which was current at the given time.