sealed abstract class ConsumerSettings[F[_], K, V] extends AnyRef
ConsumerSettings contain settings necessary to create a KafkaConsumer. At the very
least, this includes key and value deserializers.
The following consumer configuration defaults are used.
auto.offset.resetis set tononeto avoid the surprise of the otherwise defaultlatestsetting.enable.auto.commitis set tofalsesince offset commits are managed manually.
Several convenience functions are provided so that you don't have to work with String values
and ConsumerConfig for configuration. It's still possible to specify ConsumerConfig values
with functions like withProperty.
ConsumerSettings instances are immutable and all modification functions return a new
ConsumerSettings instance.
Use ConsumerSettings#apply to create a new instance.
- Source
- ConsumerSettings.scala
- Alphabetic
- By Inheritance
- ConsumerSettings
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Abstract Value Members
- abstract def closeTimeout: FiniteDuration
The time to wait for the Java
KafkaConsumerto shutdown.The time to wait for the Java
KafkaConsumerto shutdown.The default value is 20 seconds.
- abstract def commitRecovery: CommitRecovery
The CommitRecovery strategy for recovering from offset commit exceptions.
The CommitRecovery strategy for recovering from offset commit exceptions.
The default is CommitRecovery#Default.
- abstract def commitTimeout: FiniteDuration
The time to wait for offset commits to complete.
The time to wait for offset commits to complete. If an offset commit doesn't complete within this time, a CommitTimeoutException will be raised instead.
The default value is 15 seconds.
- abstract def customBlockingContext: Option[ExecutionContext]
A custom
ExecutionContextto use for blocking Kafka operations.A custom
ExecutionContextto use for blocking Kafka operations. If not provided, a default single-threadedExecutionContextwill be created when creating aKafkaConsumerinstance. - abstract def keyDeserializer: Resource[F, KeyDeserializer[F, K]]
The
Deserializerto use for deserializing record keys. - abstract def maxPrefetchBatches: Int
The maximum number of record batches to prefetch per topic-partition.
The maximum number of record batches to prefetch per topic-partition. This means that, while records are being processed, there can be up to
maxPrefetchBatches * max.poll.recordsrecords per topic-partition that have already been fetched, and are waiting to be processed. You can use withMaxPollRecords to control themax.poll.recordssetting.This setting effectively controls backpressure, i.e. the maximum number of batches to prefetch per topic-partition before starting to slow down (not fetching more records) until processing has caught-up.
Note that prefetching cannot be disabled and is generally preferred since it yields improved performance. The minimum value for this setting is
2. - abstract def pollInterval: FiniteDuration
How often we should attempt to call
pollon the JavaKafkaConsumer.How often we should attempt to call
pollon the JavaKafkaConsumer.The default value is 50 milliseconds.
- abstract def pollTimeout: FiniteDuration
How long we should allow the
pollcall to block for in the JavaKafkaConsumer.How long we should allow the
pollcall to block for in the JavaKafkaConsumer.The default value is 50 milliseconds.
- abstract def properties: Map[String, String]
Properties which can be provided when creating a Java
KafkaConsumerinstance.Properties which can be provided when creating a Java
KafkaConsumerinstance. Numerous functions in ConsumerSettings add properties here if the settings are used by the JavaKafkaConsumer. - abstract def rebalanceRevokeMode: RebalanceRevokeMode
One of two possible modes of operation for KafkaConsumer.partitionsMapStream.
One of two possible modes of operation for KafkaConsumer.partitionsMapStream. See RebalanceRevokeMode for detailed explanation of differences between them.
- abstract def recordMetadata: (ConsumerRecord[K, V]) => String
The function used to specify metadata for records.
The function used to specify metadata for records. This metadata will be included in
OffsetAndMetadatain the CommittableOffsets, and can then be committed with the offsets.By default, there will be no metadata, as determined by
OffsetFetchResponse.NO_METADATA. - abstract def sessionTimeout: FiniteDuration
Returns value for property:
Returns value for property:
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG
Returns a value as a FiniteDuration for convenience
- abstract def valueDeserializer: Resource[F, ValueDeserializer[F, V]]
The
Deserializerto use for deserializing record values. - abstract def withAllowAutoCreateTopics(allowAutoCreateTopics: Boolean): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified allow auto create topics.
Returns a new ConsumerSettings instance with the specified allow auto create topics. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
Booleaninstead of aString.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG
- abstract def withAutoCommitInterval(autoCommitInterval: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified auto commit interval.
Returns a new ConsumerSettings instance with the specified auto commit interval. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDurationinstead of aString.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
- abstract def withAutoOffsetReset(autoOffsetReset: AutoOffsetReset): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified auto offset reset.
Returns a new ConsumerSettings instance with the specified auto offset reset. This is equivalent to setting the following property using the withProperty function, except you can specify it with AutoOffsetReset instead of a
String.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
- abstract def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified bootstrap servers.
Returns a new ConsumerSettings instance with the specified bootstrap servers. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
- abstract def withClientId(clientId: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified client id.
Returns a new ConsumerSettings instance with the specified client id. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.CLIENT_ID_CONFIG
- abstract def withClientRack(clientRack: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified client rack.
Returns a new ConsumerSettings instance with the specified client rack. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.CLIENT_RACK_CONFIG
- abstract def withCloseTimeout(closeTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified closeTimeout.
- abstract def withCommitRecovery(commitRecovery: CommitRecovery): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified CommitRecovery as the commitRecovery to use.
- abstract def withCommitTimeout(commitTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified commitTimeout.
- abstract def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V]
Includes the credentials properties from the provided KafkaCredentialStore
- abstract def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified ExecutionContext to use for blocking operations.
Returns a new ConsumerSettings instance with the specified ExecutionContext to use for blocking operations.
Because the underlying Java consumer is not thread-safe, the ExecutionContext *must* be single-threaded. If in doubt, leave this unset so that a default single-threaded blocker will be provided.
- abstract def withDefaultApiTimeout(defaultApiTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified default api timeout.
Returns a new ConsumerSettings instance with the specified default api timeout. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDurationinstead of aString.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
- abstract def withDeserializers[K0, V0](keyDeserializer: Resource[F, KeyDeserializer[F, K0]], valueDeserializer: Resource[F, ValueDeserializer[F, V0]]): ConsumerSettings[F, K0, V0]
Creates a new
ConsumerSettingsinstance that replaces the serializers with those provided.Creates a new
ConsumerSettingsinstance that replaces the serializers with those provided. Note that this will remove any customrecordMetadataconfiguration. - abstract def withEnableAutoCommit(enableAutoCommit: Boolean): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified auto commit setting.
Returns a new ConsumerSettings instance with the specified auto commit setting. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
Booleaninstead of aString.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
Note that by default, this setting is set to
false. - abstract def withGroupId(groupId: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified group id.
Returns a new ConsumerSettings instance with the specified group id. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.GROUP_ID_CONFIG
- abstract def withGroupInstanceId(groupInstanceId: String): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified group instance id.
Returns a new ConsumerSettings instance with the specified group instance id. This is equivalent to setting the following property using the withProperty function.
ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
- abstract def withHeartbeatInterval(heartbeatInterval: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified heartbeat interval.
Returns a new ConsumerSettings instance with the specified heartbeat interval. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDurationinstead of aString.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG
- abstract def withIsolationLevel(isolationLevel: IsolationLevel): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified isolation level.
Returns a new ConsumerSettings instance with the specified isolation level. This is equivalent to setting the following property using the withProperty function, except you can specify it with an IsolationLevel instead of a
String.ConsumerConfig.ISOLATION_LEVEL_CONFIG
- abstract def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified max poll interval.
Returns a new ConsumerSettings instance with the specified max poll interval. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDurationinstead of aString.ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
- abstract def withMaxPollRecords(maxPollRecords: Int): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified max poll records.
Returns a new ConsumerSettings instance with the specified max poll records. This is equivalent to setting the following property using the withProperty function, except you can specify it with an
Intinstead of aString.ConsumerConfig.MAX_POLL_RECORDS_CONFIG
- abstract def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified value for maxPrefetchBatches.
Creates a new ConsumerSettings with the specified value for maxPrefetchBatches. Note that if a value lower than the minimum
2is specified, maxPrefetchBatches will instead be set to2and not the specified value. - abstract def withPollInterval(pollInterval: FiniteDuration): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified pollInterval.
- abstract def withPollTimeout(pollTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified pollTimeout.
- abstract def withProperties(properties: Map[String, String]): ConsumerSettings[F, K, V]
Includes the specified keys and values as properties.
Includes the specified keys and values as properties. The keys should be part of the
ConsumerConfigkeys, and the values should be valid choices for the keys. - abstract def withProperties(properties: (String, String)*): ConsumerSettings[F, K, V]
Includes the specified keys and values as properties.
Includes the specified keys and values as properties. The keys should be part of the
ConsumerConfigkeys, and the values should be valid choices for the keys. - abstract def withProperty(key: String, value: String): ConsumerSettings[F, K, V]
Includes a property with the specified
keyandvalue.Includes a property with the specified
keyandvalue. The key should be one of the keys inConsumerConfig, and the value should be a valid choice for the key. - abstract def withRebalanceRevokeMode(rebalanceRevokeMode: RebalanceRevokeMode): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified rebalanceRevokeMode.
- abstract def withRecordMetadata(recordMetadata: (ConsumerRecord[K, V]) => String): ConsumerSettings[F, K, V]
Creates a new ConsumerSettings with the specified recordMetadata.
Creates a new ConsumerSettings with the specified recordMetadata. Note that replacing the serializers via
withSerializerswill reset this to the default. - abstract def withRequestTimeout(requestTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified request timeout.
Returns a new ConsumerSettings instance with the specified request timeout. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDurationinstead of aString.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG
- abstract def withSessionTimeout(sessionTimeout: FiniteDuration): ConsumerSettings[F, K, V]
Returns a new ConsumerSettings instance with the specified session timeout.
Returns a new ConsumerSettings instance with the specified session timeout. This is equivalent to setting the following property using the withProperty function, except you can specify it with a
FiniteDurationinstead of aString.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG
Concrete Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()