trait KafkaConsumeChunk[F[_], K, V] extends KafkaConsume[F, K, V]
- Alphabetic
- By Inheritance
- KafkaConsumeChunk
- KafkaConsume
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Abstract Value Members
- abstract def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]]
Stream
where the elements themselves areStream
s which continually request records for a single partition.Stream
where the elements themselves areStream
s which continually request records for a single partition. TheseStream
s will have to be processed in parallel, usingparJoin
orparJoinUnbounded
. Note that when usingparJoin(n)
andn
is smaller than the number of currently assigned partitions, then there will be assigned partitions which won't be processed. For that reason, preferparJoinUnbounded
and the actual limit will be the number of assigned partitions.If you do not want to process all partitions in parallel, then you can use records instead, where records for all partitions are in a single
Stream
.- Definition Classes
- KafkaConsume
- Note
you have to first use
subscribe
orassign
the consumer before using thisStream
. If you forgot to subscribe, there will be a NotSubscribedException raised in theStream
.
- abstract def partitionsMapStream: Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]]
Stream
where each element contains aMap
with all newly assigned partitions.Stream
where each element contains aMap
with all newly assigned partitions. Keys of thisMap
areTopicPartition
s, and values are record streams for the particularTopicPartition
. These streams will be closed only when a partition is revoked.With the default assignor, all previous partitions are revoked at once, and a new set of partitions is assigned to a consumer on each rebalance. In this case, each returned
Map
contains the full partition assignment for the consumer. And all streams from the previous assignment are closed. It means, thatpartitionsMapStream
reflects the default assignment process in a streaming manner.This may not be the case when a custom assignor is configured in the consumer. When using the
CooperativeStickyAssignor
, for instance, partitions may be revoked individually. In this case, each element in the stream (eachMap
) will contain only streams for newly assigned partitions. Previously returned streams for partitions that are retained will remain active. Only streams for revoked partitions will be closed.This is the most generic
Stream
method. If you don't need such control, consider usingpartitionedStream
orstream
methods. They are both based on apartitionsMapStream
.- Definition Classes
- KafkaConsume
- Note
you have to first use
subscribe
orassign
to subscribe the consumer before using thisStream
. If you forgot to subscribe, there will be a NotSubscribedException raised in theStream
.- See also
- abstract def stopConsuming: F[Unit]
Stops consuming new messages from Kafka.
Stops consuming new messages from Kafka. This method could be used to implement a graceful shutdown.
This method has a few effects:
- After this call no more data will be fetched from Kafka through the
poll
method. - All currently running streams will continue to run until all in-flight messages will be
processed. It means that streams will be completed when all fetched messages will be
processed.
If some of the records methods will be called after stopConsuming call, these methods will return empty streams.
More than one call of stopConsuming will have no effect.
- Definition Classes
- KafkaConsume
- After this call no more data will be fetched from Kafka through the
- abstract def stream: Stream[F, CommittableConsumerRecord[F, K, V]]
Alias for
partitionedStream.parJoinUnbounded
.Alias for
partitionedStream.parJoinUnbounded
.- Definition Classes
- KafkaConsume
- Note
you have to first use
subscribe
orassign
the consumer before using thisStream
. If you forgot to subscribe, there will be a NotSubscribedException raised in theStream
.- See also
partitionedRecords for more information
Concrete Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- final def consumeChunk(processor: (Chunk[ConsumerRecord[K, V]]) => F[CommitNow])(implicit F: Concurrent[F]): F[Nothing]
Consume from all assigned partitions concurrently, processing the records in
Chunk
s.Consume from all assigned partitions concurrently, processing the records in
Chunk
s. For eachChunk
, the providedprocessor
is called, after that has finished the offsets for all messages in the chunk are committed.This method is intended to be used in cases that require at-least-once-delivery, where messages have to be processed before offsets are committed. By relying on the methods like partitionedStream, records, and similar, you have to correctly implement not only your processing logic but also the correct mechanism for committing offsets. This can be tricky to do in a correct and efficient way.
Working with
Chunk
s of records has several benefits:- As a user, you don't have to care about committing offsets correctly. You can focus on
implementing your business logic
- It's very straightforward to batch several messages from a
Chunk
together, e.g. for efficient writes to a persistent storage - You can liberally use logic that involves concurrency, filtering, and re-ordering of
messages without having to worry about incorrect offset commits
The
processor
is a function that takes aChunk[ConsumerRecord[K, V]]
and returns aF[CommitNow]
. CommitNow is isomorphic toUnit
, but helps in transporting the intention that processing of aChunk
is done, offsets should be committed, and no important processing should be done afterwards.The returned value has the type
F[Nothing]
, because it's a never-ending process that doesn't terminate, and therefore doesn't return a result.- Note
This method does not make any use of Kafka's auto-commit feature, it implements "manual" commits in a way that suits most of the common use cases.
,you have to first use
subscribe
orassign
the consumer before using thisStream
. If you forgot to subscribe, there will be a NotSubscribedException raised in theStream
.- See also
CommitNow
- As a user, you don't have to care about committing offsets correctly. You can focus on
implementing your business logic
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def partitionedRecords: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]]
Alias for partitionedStream
Alias for partitionedStream
- Definition Classes
- KafkaConsume
- final def records: Stream[F, CommittableConsumerRecord[F, K, V]]
Consume from all assigned partitions, producing a stream of CommittableConsumerRecords.
Consume from all assigned partitions, producing a stream of CommittableConsumerRecords. Alias for stream.
- Definition Classes
- KafkaConsume
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()