clj-kafka-repl.kafka
Functions for consuming, producing and reading metadata from kafka.
consume
(consume topic & {:keys [partition offset partition-offsets key-deserializer value-deserializer limit filter-fn], :or {partition nil, offset :end, partition-offsets nil, key-deserializer (default-key-deserializer), value-deserializer (default-value-deserializer), limit nil, filter-fn (constantly true)}})
Opens a consumer over the specified topic and returns a ::ch/tracked-channel which is a wrapper over a core.async channel.
The channel will stay open indefinitely unless either: a) the channel is explicitly closed using ch/close! or b) the specified message limit is reached.
Examples of pulling data from channels:
- Pop the next message (if any) from the channel: (ch/poll! tc)
- Stream channel to file: (ch/to-file tc “/workspace/temp/your-file”)
- Stream channel to stdout: (ch/to-stdout tc)
- And then close the channel with: (ch/close! tc)
key | default | description |
---|---|---|
:partition |
nil |
Limit consumption to a specific partition. |
:offset |
:end |
Start consuming from the specified offset. Valid values: :start , :end , numeric offset, timestamp (as date/time string) |
:partition-offsets |
nil |
Vector of partition+offset vector pairs that represent a by-partition representation of offsets to start consuming from. |
:key-deserializer |
nil |
Deserializer to use to deserialize the message key. Overrides the value in config. Defaults to a string deserializer. |
:value-deserializer |
nil |
Deserializer to use to deserialize the message value. Overrides the value in config. Defaults to a string deserializer. |
:limit |
nil |
The maximum number of messages to pull back either into the stream or the results vector (depending on stream mode). |
:filter-fn |
nil |
filter function to apply to the incoming :kafka-message(s). Can be a string, in which case a filter on the message value containing that string is implied. |
get-earliest-offsets
(get-earliest-offsets topic & {:keys [partitions], :or {partitions nil}})
Gets a vector of vectors representing the mapping of partition->earliest-offset for the partitions of the given topic.
key | default | description |
---|---|---|
:partitions |
nil |
Limit the results to the specified collection of partitions. |
get-group-offset
(get-group-offset topic group partition)
Gets the offset of the given consumer group on the given topic/partition.
get-group-offsets
(get-group-offsets topic group)
Gets the offsets on all partitions of the given topic for the specified consumer group.
get-lag
(get-lag topic group & {:keys [verbose?], :or {verbose? true}})
Gets the topic lag for the given consumer group.
key | default | description |
---|---|---|
:verbose? |
false |
If true , will include by-partition breakdown. |
get-latest-offsets
(get-latest-offsets topic & {:keys [partitions], :or {partitions nil}})
Gets a vector of vectors representing the mapping of partition->latest-offset for the partitions of the given topic.
key | default | description |
---|---|---|
:partitions |
nil |
Limit the results to the specified collection of partitions. |
get-message
(get-message topic offset & {:keys [value-deserializer partition], :or {value-deserializer nil, partition nil}})
Gets the message at the specified offset on the given topic (if any).
key | default | description |
---|---|---|
:partition |
nil |
Limit consumption to a specific partition. |
:value-deserializer |
nil |
Deserializer to use to deserialize the message value. |
get-topic-partitions
(get-topic-partitions topic)
Gets the vector of partitions available for the given topic.
get-topics
(get-topics & {:keys [search]})
Lists the names of all topics.
key | default | description |
---|---|---|
:search |
nil |
String to filter topics on. Only topic names containing the string will be returned. |
produce
(produce topic & {:keys [key-serializer value-serializer], :or {key-serializer (default-key-serializer), value-serializer (default-value-serializer)}})
Produce messages to the specified topic.
key | default | description |
---|---|---|
:key-serializer |
nil |
Serializer to use to serialize the message key. Will use a string deserializer if not specified. |
:value-serializer |
nil |
Serializer to use to serialize the message value. Will use an edn serializer if not specified. |
sample
(sample topic & opts)
Convenience function around kafka/consume to just sample a message from the topic.
set-group-offsets!
(set-group-offsets! topic group partition-offsets & {:keys [consumer], :or {consumer nil}})
Sets the offsets for the specified group on the specified topic to the offsets given in the passed sequence of partition->offset pairs. The offset in each pair can be one of several types:
- A natural integer - an absolute offset.
- A negative integer - an offset relative to the current offset (i.e. deduct from the current offset)
- :start - seek to start.
- :end - seek to end.
- date-time string - set offset to that which was current at the given time.