sealed abstract class ConsumerSettings[F[_], K, V] extends AnyRef
ConsumerSettings contain settings necessary to create a KafkaConsumer. At the very
least, this includes key and value deserializers.
The following consumer configuration defaults are used.
auto.offset.reset
is set tonone
to avoid the surprise of the otherwise defaultlatest
setting.enable.auto.commit
is set tofalse
since offset commits are managed manually.
Several convenience functions are provided so that you don't have to work with String
values
and ConsumerConfig
for configuration. It's still possible to specify ConsumerConfig
values
with functions like withProperty.
ConsumerSettings instances are immutable and all modification functions return a new
ConsumerSettings instance.
Use ConsumerSettings#apply
to create a new instance.
- Source
- ConsumerSettings.scala
- Alphabetic
- By Inheritance
- ConsumerSettings
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Abstract Value Members
- abstract def closeTimeout: FiniteDuration
The time to wait for the Java
KafkaConsumer
to shutdown.The time to wait for the Java
KafkaConsumer
to shutdown.The default value is 20 seconds.
- abstract def commitRecovery: CommitRecovery
The CommitRecovery strategy for recovering from offset commit exceptions.
The CommitRecovery strategy for recovering from offset commit exceptions.
The default is CommitRecovery#Default.
- abstract def commitTimeout: FiniteDuration
The time to wait for offset commits to complete.
The time to wait for offset commits to complete. If an offset commit doesn't complete within this time, a CommitTimeoutException will be raised instead.
The default value is 15 seconds.
- abstract def customBlockingContext: Option[ExecutionContext]
A custom
ExecutionContext
to use for blocking Kafka operations.A custom
ExecutionContext
to use for blocking Kafka operations. If not provided, a default single-threadedExecutionContext
will be created when creating aKafkaConsumer
instance. - abstract def keyDeserializer: Resource[F, KeyDeserializer[F, K]]
The
Deserializer
to use for deserializing record keys. - abstract def maxPrefetchBatches: Int
The maximum number of record batches to prefetch per topic-partition.
The maximum number of record batches to prefetch per topic-partition. This means that, while records are being processed, there can be up to
maxPrefetchBatches * max.poll.records
records per topic-partition that have already been fetched, and are waiting to be processed. You can use withMaxPollRecords to control themax.poll.records
setting.This setting effectively controls backpressure, i.e. the maximum number of batches to prefetch per topic-partition before starting to slow down (not fetching more records) until processing has caught-up.
Note that prefetching cannot be disabled and is generally preferred since it yields improved performance. The minimum value for this setting is
2
. - abstract def pollInterval: FiniteDuration
How often we should attempt to call
poll
on the JavaKafkaConsumer
.How often we should attempt to call
poll
on the JavaKafkaConsumer
.The default value is 50 milliseconds.
- abstract def pollTimeout: FiniteDuration
How long we should allow the
poll
call to block for in the JavaKafkaConsumer
.How long we should allow the
poll
call to block for in the JavaKafkaConsumer
.The default value is 50 milliseconds.
- abstract def properties: Map[String, String]
Properties which can be provided when creating a Java
KafkaConsumer
instance.Properties which can be provided when creating a Java
KafkaConsumer
instance. Numerous functions in ConsumerSettings add properties here if the settings are used by the JavaKafkaConsumer
. - abstract def recordMetadata: (ConsumerRecord[K, V]) => String
The function used to specify metadata for records.
The function used to specify metadata for records. This metadata will be included in
OffsetAndMetadata
in the CommittableOffsets, and can then be committed with the offsets.By default, there will be no metadata, as determined by
OffsetFetchResponse.NO_METADATA
. - abstract def valueDeserializer: Resource[F, ValueDeserializer[F, V]]
The
Deserializer
to use for deserializing record values. - abstract def withAllowAutoCreateTopics(allowAutoCreateTopics: Boolean): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified allow auto create topics.
Returns a new ConsumerSettings instance with the specified allow auto create topics. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
Boolean
instead of aString
.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG
- abstract def withAutoCommitInterval(autoCommitInterval: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified auto commit interval.
Returns a new ConsumerSettings instance with the specified auto commit interval. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDuration
instead of aString
.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
- abstract def withAutoOffsetReset(autoOffsetReset: AutoOffsetReset): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified auto offset reset.
Returns a new ConsumerSettings instance with the specified auto offset reset. This is equivalent to setting the following property using the withProperty function, except you can specify it with AutoOffsetReset instead of a
String
.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
- abstract def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified bootstrap servers.
Returns a new ConsumerSettings instance with the specified bootstrap servers. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
- abstract def withClientId(clientId: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified client id.
Returns a new ConsumerSettings instance with the specified client id. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.CLIENT_ID_CONFIG
- abstract def withClientRack(clientRack: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified client rack.
Returns a new ConsumerSettings instance with the specified client rack. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.CLIENT_RACK_CONFIG
- abstract def withCloseTimeout(closeTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified closeTimeout.
- abstract def withCommitRecovery(commitRecovery: CommitRecovery): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified CommitRecovery as the commitRecovery to use.
- abstract def withCommitTimeout(commitTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified commitTimeout.
- abstract def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V]
Includes the credentials properties from the provided KafkaCredentialStore
- abstract def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified ExecutionContext to use for blocking operations.
Returns a new ConsumerSettings instance with the specified ExecutionContext to use for blocking operations.
Because the underlying Java consumer is not thread-safe, the ExecutionContext *must* be single-threaded. If in doubt, leave this unset so that a default single-threaded blocker will be provided.
- abstract def withDefaultApiTimeout(defaultApiTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified default api timeout.
Returns a new ConsumerSettings instance with the specified default api timeout. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDuration
instead of aString
.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
- abstract def withDeserializers[K0, V0](keyDeserializer: Resource[F, KeyDeserializer[F, K0]], valueDeserializer: Resource[F, ValueDeserializer[F, V0]]): ConsumerSettings[F, K0, V0]
Creates a new
ConsumerSettings
instance that replaces the serializers with those provided.Creates a new
ConsumerSettings
instance that replaces the serializers with those provided. Note that this will remove any customrecordMetadata
configuration. - abstract def withEnableAutoCommit(enableAutoCommit: Boolean): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified auto commit setting.
Returns a new ConsumerSettings instance with the specified auto commit setting. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
Boolean
instead of aString
.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
Note that by default, this setting is set to
false
. - abstract def withGroupId(groupId: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified group id.
Returns a new ConsumerSettings instance with the specified group id. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.GROUP_ID_CONFIG
- abstract def withGroupInstanceId(groupInstanceId: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified group instance id.
Returns a new ConsumerSettings instance with the specified group instance id. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
- abstract def withHeartbeatInterval(heartbeatInterval: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified heartbeat interval.
Returns a new ConsumerSettings instance with the specified heartbeat interval. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDuration
instead of aString
.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG
- abstract def withIsolationLevel(isolationLevel: IsolationLevel): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified isolation level.
Returns a new ConsumerSettings instance with the specified isolation level. This is equivalent to setting the following property using the withProperty function, except you can specify it with an IsolationLevel instead of a
String
.ConsumerConfig.ISOLATION_LEVEL_CONFIG
- abstract def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified max poll interval.
Returns a new ConsumerSettings instance with the specified max poll interval. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDuration
instead of aString
.ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
- abstract def withMaxPollRecords(maxPollRecords: Int): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified max poll records.
Returns a new ConsumerSettings instance with the specified max poll records. This is equivalent to setting the following property using the withProperty function, except you can specify it with an
Int
instead of aString
.ConsumerConfig.MAX_POLL_RECORDS_CONFIG
- abstract def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified value for maxPrefetchBatches.
Creates a new ConsumerSettings with the specified value for maxPrefetchBatches. Note that if a value lower than the minimum
2
is specified, maxPrefetchBatches will instead be set to2
and not the specified value. - abstract def withPollInterval(pollInterval: FiniteDuration): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified pollInterval.
- abstract def withPollTimeout(pollTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified pollTimeout.
- abstract def withProperties(properties: Map[String, String]): ConsumerSettings[F, K, V]
Includes the specified keys and values as properties.
Includes the specified keys and values as properties. The keys should be part of the
ConsumerConfig
keys, and the values should be valid choices for the keys. - abstract def withProperties(properties: (String, String)*): ConsumerSettings[F, K, V]
Includes the specified keys and values as properties.
Includes the specified keys and values as properties. The keys should be part of the
ConsumerConfig
keys, and the values should be valid choices for the keys. - abstract def withProperty(key: String, value: String): ConsumerSettings[F, K, V]
Includes a property with the specified
key
andvalue
.Includes a property with the specified
key
andvalue
. The key should be one of the keys inConsumerConfig
, and the value should be a valid choice for the key. - abstract def withRecordMetadata(recordMetadata: (ConsumerRecord[K, V]) => String): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified recordMetadata.
Creates a new ConsumerSettings with the specified recordMetadata. Note that replacing the serializers via
withSerializers
will reset this to the default. - abstract def withRequestTimeout(requestTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified request timeout.
Returns a new ConsumerSettings instance with the specified request timeout. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDuration
instead of aString
.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG
- abstract def withSessionTimeout(sessionTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified session timeout.
Returns a new ConsumerSettings instance with the specified session timeout. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDuration
instead of aString
.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG
Concrete Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()