trait KafkaConsumeChunk[F[_], K, V] extends KafkaConsume[F, K, V]

Source
KafkaConsumeChunk.scala
Linear Supertypes
Known Subclasses
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. KafkaConsumeChunk
  2. KafkaConsume
  3. AnyRef
  4. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Abstract Value Members

  1. abstract def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]]

    Stream where the elements themselves are Streams which continually request records for a single partition.

    Stream where the elements themselves are Streams which continually request records for a single partition. These Streams will have to be processed in parallel, using parJoin or parJoinUnbounded. Note that when using parJoin(n) and n is smaller than the number of currently assigned partitions, then there will be assigned partitions which won't be processed. For that reason, prefer parJoinUnbounded and the actual limit will be the number of assigned partitions.

    If you do not want to process all partitions in parallel, then you can use records instead, where records for all partitions are in a single Stream.

    Definition Classes
    KafkaConsume
    Note

    you have to first use subscribe or assign the consumer before using this Stream. If you forgot to subscribe, there will be a NotSubscribedException raised in the Stream.

  2. abstract def partitionsMapStream: Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]]

    Stream where each element contains a Map with all newly assigned partitions.

    Stream where each element contains a Map with all newly assigned partitions. Keys of this Map are TopicPartitions, and values are record streams for the particular TopicPartition. These streams will be closed only when a partition is revoked.

    With the default assignor, all previous partitions are revoked at once, and a new set of partitions is assigned to a consumer on each rebalance. In this case, each returned Map contains the full partition assignment for the consumer. And all streams from the previous assignment are closed. It means, that partitionsMapStream reflects the default assignment process in a streaming manner.

    This may not be the case when a custom assignor is configured in the consumer. When using the CooperativeStickyAssignor, for instance, partitions may be revoked individually. In this case, each element in the stream (eachMap) will contain only streams for newly assigned partitions. Previously returned streams for partitions that are retained will remain active. Only streams for revoked partitions will be closed.

    This is the most generic Stream method. If you don't need such control, consider using partitionedStream or stream methods. They are both based on a partitionsMapStream.

    Definition Classes
    KafkaConsume
    Note

    you have to first use subscribe or assign to subscribe the consumer before using this Stream. If you forgot to subscribe, there will be a NotSubscribedException raised in the Stream.

    See also

    records

    partitionedRecords

  3. abstract def stopConsuming: F[Unit]

    Stops consuming new messages from Kafka.

    Stops consuming new messages from Kafka. This method could be used to implement a graceful shutdown.

    This method has a few effects:

    • After this call no more data will be fetched from Kafka through the poll method.
    • All currently running streams will continue to run until all in-flight messages will be processed. It means that streams will be completed when all fetched messages will be processed.

    If some of the records methods will be called after stopConsuming call, these methods will return empty streams.

    More than one call of stopConsuming will have no effect.

    Definition Classes
    KafkaConsume
  4. abstract def stream: Stream[F, CommittableConsumerRecord[F, K, V]]

    Alias for partitionedStream.parJoinUnbounded.

    Alias for partitionedStream.parJoinUnbounded.

    Definition Classes
    KafkaConsume
    Note

    you have to first use subscribe or assign the consumer before using this Stream. If you forgot to subscribe, there will be a NotSubscribedException raised in the Stream.

    See also

    partitionedRecords for more information

Concrete Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##: Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  5. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @native()
  6. final def consumeChunk(processor: (Chunk[ConsumerRecord[K, V]]) => F[CommitNow])(implicit F: Concurrent[F]): F[Nothing]

    Consume from all assigned partitions concurrently, processing the records in Chunks.

    Consume from all assigned partitions concurrently, processing the records in Chunks. For each Chunk, the provided processor is called, after that has finished the offsets for all messages in the chunk are committed.

    This method is intended to be used in cases that require at-least-once-delivery, where messages have to be processed before offsets are committed. By relying on the methods like partitionedStream, records, and similar, you have to correctly implement not only your processing logic but also the correct mechanism for committing offsets. This can be tricky to do in a correct and efficient way.

    Working with Chunks of records has several benefits:

    • As a user, you don't have to care about committing offsets correctly. You can focus on implementing your business logic
    • It's very straightforward to batch several messages from a Chunk together, e.g. for efficient writes to a persistent storage
    • You can liberally use logic that involves concurrency, filtering, and re-ordering of messages without having to worry about incorrect offset commits


    The processor is a function that takes a Chunk[ConsumerRecord[K, V]] and returns a F[CommitNow]. CommitNow is isomorphic to Unit, but helps in transporting the intention that processing of a Chunk is done, offsets should be committed, and no important processing should be done afterwards.

    The returned value has the type F[Nothing], because it's a never-ending process that doesn't terminate, and therefore doesn't return a result.

    Note

    This method does not make any use of Kafka's auto-commit feature, it implements "manual" commits in a way that suits most of the common use cases.

    ,

    you have to first use subscribe or assign the consumer before using this Stream. If you forgot to subscribe, there will be a NotSubscribedException raised in the Stream.

    See also

    partitionedStream

    CommitNow

  7. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  8. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  9. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable])
  10. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  11. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  12. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  13. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  14. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  15. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  16. final def partitionedRecords: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]]

    Alias for partitionedStream

    Definition Classes
    KafkaConsume
  17. final def records: Stream[F, CommittableConsumerRecord[F, K, V]]

    Consume from all assigned partitions, producing a stream of CommittableConsumerRecords.

    Consume from all assigned partitions, producing a stream of CommittableConsumerRecords. Alias for stream.

    Definition Classes
    KafkaConsume
  18. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  19. def toString(): String
    Definition Classes
    AnyRef → Any
  20. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  21. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  22. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()

Inherited from KafkaConsume[F, K, V]

Inherited from AnyRef

Inherited from Any

Ungrouped