sealed abstract class KafkaConsumer[F[_], K, V] extends KafkaConsume[F, K, V] with KafkaConsumeChunk[F, K, V] with KafkaAssignment[F] with KafkaOffsetsV2[F] with KafkaSubscription[F] with KafkaTopicsV2[F] with KafkaCommit[F] with KafkaMetrics[F] with KafkaConsumerLifecycle[F]

KafkaConsumer represents a consumer of Kafka records, with the ability to subscribe to topics, start a single top-level stream, and optionally control it via the provided fiber instance.

The following top-level streams are provided.

  • stream provides a single stream of records, where the order of records is guaranteed per topic-partition.
  • partitionedStream provides a stream with elements as streams that continually request records for a single partition. Order is guaranteed per topic-partition, but all assigned partitions will have to be processed in parallel.
  • partitionsMapStream provides a stream where each element contains a current assignment. The current assignment is the Map, where keys is a TopicPartition, and values are streams with records for a particular TopicPartition.
    For the streams, records are wrapped in CommittableConsumerRecords which provide CommittableOffsets with the ability to commit record offsets to Kafka. For performance reasons, offsets are usually committed in batches using CommittableOffsetBatch. Provided Pipes, like commitBatchWithin are available for batch committing offsets. If you are not committing offsets to Kafka, you can simply discard the CommittableOffset, and only make use of the record.

While it's technically possible to start more than one stream from a single KafkaConsumer, it is generally not recommended as there is no guarantee which stream will receive which records, and there might be an overlap, in terms of duplicate records, between the two streams. If a first stream completes, possibly with error, there's no guarantee the stream has processed all of the records it received, and a second stream from the same KafkaConsumer might not be able to pick up where the first one left off. Therefore, only create a single top-level stream per KafkaConsumer, and if you want to start a new stream if the first one finishes, let the KafkaConsumer shutdown and create a new one.

Source
KafkaConsumer.scala
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. KafkaConsumer
  2. KafkaConsumerLifecycle
  3. KafkaMetrics
  4. KafkaCommit
  5. KafkaTopicsV2
  6. KafkaTopics
  7. KafkaSubscription
  8. KafkaOffsetsV2
  9. KafkaOffsets
  10. KafkaAssignment
  11. KafkaConsumeChunk
  12. KafkaConsume
  13. AnyRef
  14. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Abstract Value Members

  1. abstract def assign(topic: String): F[Unit]

    Manually assigns all partitions for the specified topic to the consumer.

    Manually assigns all partitions for the specified topic to the consumer.

    Definition Classes
    KafkaAssignment
  2. abstract def assign(partitions: NonEmptySet[TopicPartition]): F[Unit]

    Manually assigns the specified list of topic partitions to the consumer.

    Manually assigns the specified list of topic partitions to the consumer. This function does not allow for incremental assignment and will replace the previous assignment (if there is one).

    Manual topic assignment through this method does not use the consumer's group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change. Note that it is not possible to use both manual partition assignment with assign and group assignment with subscribe.

    If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new assignment replaces the old one.

    To unassign all partitions, use KafkaConsumer#unsubscribe.

    Definition Classes
    KafkaAssignment
    See also

    org.apache.kafka.clients.consumer.KafkaConsumer#assign

  3. abstract def assignment: F[SortedSet[TopicPartition]]

    Returns the set of partitions currently assigned to this consumer.

    Returns the set of partitions currently assigned to this consumer.

    Definition Classes
    KafkaAssignment
  4. abstract def assignmentStream: Stream[F, SortedSet[TopicPartition]]

    Stream where the elements are the set of TopicPartitions currently assigned to this consumer.

    Stream where the elements are the set of TopicPartitions currently assigned to this consumer. The stream emits whenever a rebalance changes partition assignments.

    Definition Classes
    KafkaAssignment
  5. abstract def awaitTermination: F[Unit]

    Wait for consumer to shut down.

    Wait for consumer to shut down. Note that awaitTermination is guaranteed to complete after consumer shutdown, even when the consumer is cancelled with terminate.

    This method will not initiate shutdown. To initiate shutdown and wait for it to complete, you can use terminate >> awaitTermination.

    Definition Classes
    KafkaConsumerLifecycle
  6. abstract def beginningOffsets(partitions: Set[TopicPartition], timeout: FiniteDuration): F[Map[TopicPartition, Long]]

    Returns the first offset for the specified partitions.

    Returns the first offset for the specified partitions.

    Definition Classes
    KafkaTopics
  7. abstract def beginningOffsets(partitions: Set[TopicPartition]): F[Map[TopicPartition, Long]]

    Returns the first offset for the specified partitions.

    Returns the first offset for the specified partitions.

    Timeout is determined by default.api.timeout.ms, which is set using ConsumerSettings#withDefaultApiTimeout.

    Definition Classes
    KafkaTopics
  8. abstract def commitAsync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit]

    Commit the specified offsets for the specified list of topics and partitions to Kafka.

    Commit the specified offsets for the specified list of topics and partitions to Kafka.

    This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1. If automatic group management with subscribe is used, then the committed offsets must belong to the currently auto-assigned partitions.

    Offsets committed through multiple calls to this API are guaranteed to be sent in the same order as the invocations. Additionally note that offsets committed through this API are guaranteed to complete before a subsequent call to commitSync (and variants) returns.

    Note, that the recommended way for committing offsets in fs2-kafka is to use commit on CommittableConsumerRecord, CommittableOffset or CommittableOffsetBatch. commitAsync and commitSync usually needs only for some custom scenarios.

    offsets

    A map of offsets by partition with associate metadata.

    Definition Classes
    KafkaCommit
    See also

    org.apache.kafka.clients.consumer.KafkaConsumer#commitAsync

  9. abstract def commitSync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit]

    Commit the specified offsets for the specified list of topics and partitions.

    Commit the specified offsets for the specified list of topics and partitions.

    This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1. If automatic group management with subscribe is used, then the committed offsets must belong to the currently auto-assigned partitions.

    Despite of it's name, this method is not blocking. But it's based on a blocking org.apache.kafka.clients.consumer.KafkaConsumer#commitSync method.

    Note, that the recommended way for committing offsets in fs2-kafka is to use commit on CommittableConsumerRecord, CommittableOffset or CommittableOffsetBatch. commitAsync and commitSync usually needs only for some custom scenarios.

    offsets

    A map of offsets by partition with associated metadata

    Definition Classes
    KafkaCommit
    See also

    org.apache.kafka.clients.consumer.KafkaConsumer#commitSync

  10. abstract def committed(partitions: Set[TopicPartition], timeout: FiniteDuration): F[Map[TopicPartition, OffsetAndMetadata]]

    Returns the last committed offsets for the given partitions.

    Returns the last committed offsets for the given partitions.

    Timeout is determined by default.api.timeout.ms, which is set using ConsumerSettings#withDefaultApiTimeout.

    Definition Classes
    KafkaOffsetsV2
  11. abstract def committed(partitions: Set[TopicPartition]): F[Map[TopicPartition, OffsetAndMetadata]]

    Returns the last committed offsets for the given partitions.

    Returns the last committed offsets for the given partitions.

    Definition Classes
    KafkaOffsetsV2
  12. abstract def endOffsets(partitions: Set[TopicPartition], timeout: FiniteDuration): F[Map[TopicPartition, Long]]

    Returns the last offset for the specified partitions.

    Returns the last offset for the specified partitions.

    Definition Classes
    KafkaTopics
  13. abstract def endOffsets(partitions: Set[TopicPartition]): F[Map[TopicPartition, Long]]

    Returns the last offset for the specified partitions.

    Returns the last offset for the specified partitions.

    Timeout is determined by request.timeout.ms, which is set using ConsumerSettings#withRequestTimeout.

    Definition Classes
    KafkaTopics
  14. abstract def listTopics(timeout: FiniteDuration): F[Map[String, List[PartitionInfo]]]

    Get metadata about partitions for all topics that the user is authorized to view.

    Get metadata about partitions for all topics that the user is authorized to view. This method will issue a remote call to the server.

    Definition Classes
    KafkaTopicsV2
  15. abstract def listTopics: F[Map[String, List[PartitionInfo]]]

    Get metadata about partitions for all topics that the user is authorized to view.

    Get metadata about partitions for all topics that the user is authorized to view. This method will issue a remote call to the server.

    Timeout is determined by default.api.timeout.ms, which is set using ConsumerSettings#withDefaultApiTimeout.

    Definition Classes
    KafkaTopicsV2
  16. abstract def metrics: F[Map[MetricName, Metric]]

    Returns consumer metrics.

    Returns consumer metrics.

    Definition Classes
    KafkaMetrics
    See also

    org.apache.kafka.clients.consumer.KafkaConsumer#metrics

  17. abstract def offsetsForTimes(timestampsToSearch: Map[TopicPartition, Long], timeout: FiniteDuration): F[Map[TopicPartition, Option[OffsetAndTimestamp]]]

    Look up the offsets for the given partitions by timestamp.

    Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

    The consumer does not have to be assigned the partitions. If no messages exist yet for a partition, it will not exist in the returned map.

    Definition Classes
    KafkaTopicsV2
  18. abstract def offsetsForTimes(timestampsToSearch: Map[TopicPartition, Long]): F[Map[TopicPartition, Option[OffsetAndTimestamp]]]

    Look up the offsets for the given partitions by timestamp.

    Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

    The consumer does not have to be assigned the partitions. If no messages exist yet for a partition, it will not exist in the returned map.

    Timeout is determined by default.api.timeout.ms, which is set using ConsumerSettings#withDefaultApiTimeout.

    Definition Classes
    KafkaTopicsV2
  19. abstract def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]]

    Stream where the elements themselves are Streams which continually request records for a single partition.

    Stream where the elements themselves are Streams which continually request records for a single partition. These Streams will have to be processed in parallel, using parJoin or parJoinUnbounded. Note that when using parJoin(n) and n is smaller than the number of currently assigned partitions, then there will be assigned partitions which won't be processed. For that reason, prefer parJoinUnbounded and the actual limit will be the number of assigned partitions.

    If you do not want to process all partitions in parallel, then you can use records instead, where records for all partitions are in a single Stream.

    Definition Classes
    KafkaConsume
    Note

    you have to first use subscribe or assign the consumer before using this Stream. If you forgot to subscribe, there will be a NotSubscribedException raised in the Stream.

  20. abstract def partitionsFor(topic: String, timeout: FiniteDuration): F[List[PartitionInfo]]

    Returns the partitions for the specified topic.

    Returns the partitions for the specified topic.

    Definition Classes
    KafkaTopics
  21. abstract def partitionsFor(topic: String): F[List[PartitionInfo]]

    Returns the partitions for the specified topic.

    Returns the partitions for the specified topic.

    Timeout is determined by default.api.timeout.ms, which is set using ConsumerSettings#withDefaultApiTimeout.

    Definition Classes
    KafkaTopics
  22. abstract def partitionsMapStream: Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]]

    Stream where each element contains a Map with all newly assigned partitions.

    Stream where each element contains a Map with all newly assigned partitions. Keys of this Map are TopicPartitions, and values are record streams for the particular TopicPartition. These streams will be closed only when a partition is revoked.

    With the default assignor, all previous partitions are revoked at once, and a new set of partitions is assigned to a consumer on each rebalance. In this case, each returned Map contains the full partition assignment for the consumer. And all streams from the previous assignment are closed. It means, that partitionsMapStream reflects the default assignment process in a streaming manner.

    This may not be the case when a custom assignor is configured in the consumer. When using the CooperativeStickyAssignor, for instance, partitions may be revoked individually. In this case, each element in the stream (eachMap) will contain only streams for newly assigned partitions. Previously returned streams for partitions that are retained will remain active. Only streams for revoked partitions will be closed.

    This is the most generic Stream method. If you don't need such control, consider using partitionedStream or stream methods. They are both based on a partitionsMapStream.

    Definition Classes
    KafkaConsume
    Note

    you have to first use subscribe or assign to subscribe the consumer before using this Stream. If you forgot to subscribe, there will be a NotSubscribedException raised in the Stream.

    See also

    records

    partitionedRecords

  23. abstract def position(partition: TopicPartition, timeout: FiniteDuration): F[Long]

    Returns the offset of the next record that will be fetched.

    Returns the offset of the next record that will be fetched.

    Definition Classes
    KafkaOffsets
  24. abstract def position(partition: TopicPartition): F[Long]

    Returns the offset of the next record that will be fetched.

    Returns the offset of the next record that will be fetched.

    Timeout is determined by default.api.timeout.ms, which is set using ConsumerSettings#withDefaultApiTimeout.

    Definition Classes
    KafkaOffsets
  25. abstract def seek(partition: TopicPartition, offset: Long): F[Unit]

    Overrides the fetch offsets that the consumer will use when reading the next record.

    Overrides the fetch offsets that the consumer will use when reading the next record. If this API is invoked for the same partition more than once, the latest offset will be used. Note that you may lose data if this API is arbitrarily used in the middle of consumption to reset the fetch offsets.

    Definition Classes
    KafkaOffsets
  26. abstract def seekToBeginning[G[_]](partitions: G[TopicPartition])(implicit arg0: Foldable[G]): F[Unit]

    Seeks to the first offset for each of the specified partitions.

    Seeks to the first offset for each of the specified partitions. If no partitions are provided, seeks to the first offset for all currently assigned partitions.

    Note that this seek evaluates lazily, and only on the next call to poll or position.

    Definition Classes
    KafkaOffsets
  27. abstract def seekToEnd[G[_]](partitions: G[TopicPartition])(implicit arg0: Foldable[G]): F[Unit]

    Seeks to the last offset for each of the specified partitions.

    Seeks to the last offset for each of the specified partitions. If no partitions are provided, seeks to the last offset for all currently assigned partitions.

    Note that this seek evaluates lazily, and only on the next call to poll or position.

    Definition Classes
    KafkaOffsets
  28. abstract def stopConsuming: F[Unit]

    Stops consuming new messages from Kafka.

    Stops consuming new messages from Kafka. This method could be used to implement a graceful shutdown.

    This method has a few effects:

    • After this call no more data will be fetched from Kafka through the poll method.
    • All currently running streams will continue to run until all in-flight messages will be processed. It means that streams will be completed when all fetched messages will be processed.

    If some of the records methods will be called after stopConsuming call, these methods will return empty streams.

    More than one call of stopConsuming will have no effect.

    Definition Classes
    KafkaConsume
  29. abstract def stream: Stream[F, CommittableConsumerRecord[F, K, V]]

    Alias for partitionedStream.parJoinUnbounded.

    Alias for partitionedStream.parJoinUnbounded.

    Definition Classes
    KafkaConsume
    Note

    you have to first use subscribe or assign the consumer before using this Stream. If you forgot to subscribe, there will be a NotSubscribedException raised in the Stream.

    See also

    partitionedRecords for more information

  30. abstract def subscribe(regex: Regex): F[Unit]

    Subscribes the consumer to the topics matching the specified Regex.

    Subscribes the consumer to the topics matching the specified Regex. Note that you have to use one of the subscribe functions before you can use any of the provided Streams, or a NotSubscribedException will be raised in the Streams.

    regex

    the regex to which matching topics should be subscribed

    Definition Classes
    KafkaSubscription
  31. abstract def subscribe[G[_]](topics: G[String])(implicit arg0: Reducible[G]): F[Unit]

    Subscribes the consumer to the specified topics.

    Subscribes the consumer to the specified topics. Note that you have to use one of the subscribe functions to subscribe to one or more topics before using any of the provided Streams, or a NotSubscribedException will be raised in the Streams.

    topics

    the topics to which the consumer should subscribe

    Definition Classes
    KafkaSubscription
  32. abstract def terminate: F[Unit]

    Whenever terminate is invoked, an attempt will be made to stop the underlying consumer.

    Whenever terminate is invoked, an attempt will be made to stop the underlying consumer. The terminate operation will not wait for the consumer to shutdown. If you also want to wait for the shutdown to complete, you can use terminate >> awaitTermination.

    Definition Classes
    KafkaConsumerLifecycle
  33. abstract def unsubscribe: F[Unit]

    Unsubscribes the consumer from all topics and partitions assigned by subscribe or assign.

    Unsubscribes the consumer from all topics and partitions assigned by subscribe or assign.

    Definition Classes
    KafkaSubscription

Concrete Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##: Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  5. def assign(topic: String, partitions: NonEmptySet[Int]): F[Unit]

    Manually assigns the specified list of partitions for the specified topic to the consumer.

    Manually assigns the specified list of partitions for the specified topic to the consumer. This function does not allow for incremental assignment and will replace the previous assignment (if there is one).

    Manual topic assignment through this method does not use the consumer's group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change. Note that it is not possible to use both manual partition assignment with assign and group assignment with subscribe.

    If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new assignment replaces the old one.

    To unassign all partitions, use KafkaConsumer#unsubscribe.

    Definition Classes
    KafkaAssignment
    See also

    org.apache.kafka.clients.consumer.KafkaConsumer#assign

  6. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @native()
  7. final def consumeChunk(processor: (Chunk[ConsumerRecord[K, V]]) => F[CommitNow])(implicit F: Concurrent[F]): F[Nothing]

    Consume from all assigned partitions concurrently, processing the records in Chunks.

    Consume from all assigned partitions concurrently, processing the records in Chunks. For each Chunk, the provided processor is called, after that has finished the offsets for all messages in the chunk are committed.

    This method is intended to be used in cases that require at-least-once-delivery, where messages have to be processed before offsets are committed. By relying on the methods like partitionedStream, records, and similar, you have to correctly implement not only your processing logic but also the correct mechanism for committing offsets. This can be tricky to do in a correct and efficient way.

    Working with Chunks of records has several benefits:

    • As a user, you don't have to care about committing offsets correctly. You can focus on implementing your business logic
    • It's very straightforward to batch several messages from a Chunk together, e.g. for efficient writes to a persistent storage
    • You can liberally use logic that involves concurrency, filtering, and re-ordering of messages without having to worry about incorrect offset commits


    The processor is a function that takes a Chunk[ConsumerRecord[K, V]] and returns a F[CommitNow]. CommitNow is isomorphic to Unit, but helps in transporting the intention that processing of a Chunk is done, offsets should be committed, and no important processing should be done afterwards.

    The returned value has the type F[Nothing], because it's a never-ending process that doesn't terminate, and therefore doesn't return a result.

    Definition Classes
    KafkaConsumeChunk
    Note

    This method does not make any use of Kafka's auto-commit feature, it implements "manual" commits in a way that suits most of the common use cases.

    ,

    you have to first use subscribe or assign the consumer before using this Stream. If you forgot to subscribe, there will be a NotSubscribedException raised in the Stream.

    See also

    partitionedStream

    CommitNow

  8. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  9. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  10. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable])
  11. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  12. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  13. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  14. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  15. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  16. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  17. final def partitionedRecords: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]]

    Alias for partitionedStream

    Definition Classes
    KafkaConsume
  18. final def records: Stream[F, CommittableConsumerRecord[F, K, V]]

    Consume from all assigned partitions, producing a stream of CommittableConsumerRecords.

    Consume from all assigned partitions, producing a stream of CommittableConsumerRecords. Alias for stream.

    Definition Classes
    KafkaConsume
  19. def seekToBeginning: F[Unit]

    Seeks to the first offset for each currently assigned partition.

    Seeks to the first offset for each currently assigned partition. This is equivalent to using seekToBeginning with an empty set of partitions.

    Note that this seek evaluates lazily, and only on the next call to poll or position.

    Definition Classes
    KafkaOffsets
  20. def seekToEnd: F[Unit]

    Seeks to the last offset for each currently assigned partition.

    Seeks to the last offset for each currently assigned partition. This is equivalent to using seekToEnd with an empty set of partitions.

    Note that this seek evaluates lazily, and only on the next call to poll or position.

    Definition Classes
    KafkaOffsets
  21. def subscribeTo(firstTopic: String, remainingTopics: String*): F[Unit]

    Subscribes the consumer to the specified topics.

    Subscribes the consumer to the specified topics. Note that you have to use one of the subscribe functions to subscribe to one or more topics before using any of the provided Streams, or a NotSubscribedException will be raised in the Streams.

    Definition Classes
    KafkaSubscription
  22. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  23. def toString(): String
    Definition Classes
    AnyRef → Any
  24. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  25. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  26. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()

Inherited from KafkaConsumerLifecycle[F]

Inherited from KafkaMetrics[F]

Inherited from KafkaCommit[F]

Inherited from KafkaTopicsV2[F]

Inherited from KafkaTopics[F]

Inherited from KafkaSubscription[F]

Inherited from KafkaOffsetsV2[F]

Inherited from KafkaOffsets[F]

Inherited from KafkaAssignment[F]

Inherited from KafkaConsumeChunk[F, K, V]

Inherited from KafkaConsume[F, K, V]

Inherited from AnyRef

Inherited from Any

Ungrouped