clj-kafka-repl.kafka
Functions for consuming, producing and reading metadata from kafka.
consume
(consume topic & {:keys [partition offset partition-offsets key-deserializer value-deserializer limit filter-fn], :or {partition nil, offset :end, partition-offsets nil, key-deserializer (default-key-deserializer), value-deserializer (default-value-deserializer), limit nil, filter-fn (constantly true)}})Opens a consumer over the specified topic and returns a ::ch/tracked-channel which is a wrapper over a core.async channel.
The channel will stay open indefinitely unless either: a) the channel is explicitly closed using ch/close! or b) the specified message limit is reached.
Examples of pulling data from channels:
- Pop the next message (if any) from the channel: (ch/poll! tc)
- Stream channel to file: (ch/to-file tc “/workspace/temp/your-file”)
- Stream channel to stdout: (ch/to-stdout tc)
- And then close the channel with: (ch/close! tc)
| key | default | description | 
|---|---|---|
| :partition | nil | Limit consumption to a specific partition. | 
| :offset | :end | Start consuming from the specified offset. Valid values: :start,:end, numeric offset, timestamp (as date/time string) | 
| :partition-offsets | nil | Vector of partition+offset vector pairs that represent a by-partition representation of offsets to start consuming from. | 
| :key-deserializer | nil | Deserializer to use to deserialize the message key. Overrides the value in config. Defaults to a string deserializer. | 
| :value-deserializer | nil | Deserializer to use to deserialize the message value. Overrides the value in config. Defaults to a string deserializer. | 
| :limit | nil | The maximum number of messages to pull back either into the stream or the results vector (depending on stream mode). | 
| :filter-fn | nil | filterfunction to apply to the incoming :kafka-message(s). Can be a string, in which case a filter on the message value containing that string is implied. | 
get-earliest-offsets
(get-earliest-offsets topic & {:keys [partitions], :or {partitions nil}})Gets a vector of vectors representing the mapping of partition->earliest-offset for the partitions of the given topic.
| key | default | description | 
|---|---|---|
| :partitions | nil | Limit the results to the specified collection of partitions. | 
get-group-offset
(get-group-offset topic group partition)Gets the offset of the given consumer group on the given topic/partition.
get-group-offsets
(get-group-offsets topic group)Gets the offsets on all partitions of the given topic for the specified consumer group.
get-lag
(get-lag topic group & {:keys [verbose?], :or {verbose? true}})Gets the topic lag for the given consumer group.
| key | default | description | 
|---|---|---|
| :verbose? | false | If true, will include by-partition breakdown. | 
get-latest-offsets
(get-latest-offsets topic & {:keys [partitions], :or {partitions nil}})Gets a vector of vectors representing the mapping of partition->latest-offset for the partitions of the given topic.
| key | default | description | 
|---|---|---|
| :partitions | nil | Limit the results to the specified collection of partitions. | 
get-message
(get-message topic offset & {:keys [value-deserializer partition], :or {value-deserializer nil, partition nil}})Gets the message at the specified offset on the given topic (if any).
| key | default | description | 
|---|---|---|
| :partition | nil | Limit consumption to a specific partition. | 
| :value-deserializer | nil | Deserializer to use to deserialize the message value. | 
get-topic-partitions
(get-topic-partitions topic)Gets the vector of partitions available for the given topic.
get-topics
(get-topics & {:keys [search]})Lists the names of all topics.
| key | default | description | 
|---|---|---|
| :search | nil | String to filter topics on. Only topic names containing the string will be returned. | 
produce
(produce topic & {:keys [key-serializer value-serializer], :or {key-serializer (default-key-serializer), value-serializer (default-value-serializer)}})Produce messages to the specified topic.
| key | default | description | 
|---|---|---|
| :key-serializer | nil | Serializer to use to serialize the message key. Will use a string deserializer if not specified. | 
| :value-serializer | nil | Serializer to use to serialize the message value. Will use an edn serializer if not specified. | 
sample
(sample topic & opts)Convenience function around kafka/consume to just sample a message from the topic.
set-group-offsets!
(set-group-offsets! topic group partition-offsets & {:keys [consumer], :or {consumer nil}})Sets the offsets for the specified group on the specified topic to the offsets given in the passed sequence of partition->offset pairs. The offset in each pair can be one of several types:
- A natural integer - an absolute offset.
- A negative integer - an offset relative to the current offset (i.e. deduct from the current offset)
- :start - seek to start.
- :end - seek to end.
- date-time string - set offset to that which was current at the given time.