FS2 Kafka

FS2 Kafka

  • API Docs
  • Documentation
  • GitHub

›Documentation

Documentation

  • Overview
  • Quick Example
  • Consumers
  • Producers
  • Transactions
  • Admin
  • Modules
  • Technical Details

Admin

There is partial support for the Kafka admin API through KafkaAdminClient. Internally, this relies on the Java Kafka AdminClient and supports the same settings.

The following imports are assumed throughout this page.

import cats.effect._
import cats.syntax.all._
import fs2.kafka._
import fs2.Stream

Settings

AdminClientSettings is provided to avoid having to deal with String key-value settings.

Default Settings

There are several settings specific to the library.

  • withCloseTimeout controls the timeout when waiting for admin client shutdown. Default is 20 seconds.

  • withCreateAdminClient changes how the underlying Java Kafka admin client is created. The default creates a Java AdminClient instance using set properties, but this function allows overriding the behaviour for e.g. testing purposes.

Client Creation

Once settings are defined, we can use create an admin client in a Stream.

def kafkaAdminClientStream[F[_]: Async](
  bootstrapServers: String
): Stream[F, KafkaAdminClient[F]] =
  KafkaAdminClient.stream[F](AdminClientSettings(bootstrapServers))

Alternatively, we can create an admin client in a Resource context.

def kafkaAdminClientResource[F[_]: Async](
  bootstrapServers: String
): Resource[F, KafkaAdminClient[F]] =
  KafkaAdminClient.resource[F](AdminClientSettings(bootstrapServers))

Topics

There are functions available for describing, creating, and deleting topics.

import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}

def topicOperations[F[_]: Async]: F[Unit] =
  kafkaAdminClientResource[F]("localhost:9092").use { client =>
    for {
      topicNames <- client.listTopics.names
      _          <- client.describeTopics(topicNames.toList)
      _          <- client.createTopic(new NewTopic("new-topic", 1, 1.toShort))
      _          <- client.createTopics(new NewTopic("newer-topic", 1, 1.toShort) :: Nil)
      _          <- client.createPartitions(Map("new-topic" -> NewPartitions.increaseTo(4)))
      _          <- client.deleteTopic("new-topic")
      _          <- client.deleteTopics("newer-topic" :: Nil)
    } yield ()
  }

Configurations

We can edit the configuration of different resources, like topics and nodes.

import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.config.ConfigResource

def configOperations[F[_]: Async]: F[Unit] =
  kafkaAdminClientResource[F]("localhost:9092").use { client =>
    val topic = new ConfigResource(ConfigResource.Type.TOPIC, "topic")

    for {
      _ <- client.describeConfigs(topic :: Nil)
      _ <- client.alterConfigs {
             Map(
               topic -> List(
                 new AlterConfigOp(
                   new ConfigEntry("cleanup.policy", "delete"),
                   AlterConfigOp.OpType.SET
                 )
               )
             )
           }
    } yield ()
  }

Cluster Metadata

It's possible to retrieve metadata about the cluster nodes.

import org.apache.kafka.common.Node

def clusterNodes[F[_]: Async]: F[Set[Node]] =
  kafkaAdminClientResource[F]("localhost:9092").use(_.describeCluster.nodes)

Consumer Groups

There are functions available for working with consumer groups.

def consumerGroupOperations[F[_]: Async: cats.Parallel]: F[Unit] =
  kafkaAdminClientResource[F]("localhost:9092").use { client =>
    for {
      consumerGroupIds <- client.listConsumerGroups.groupIds
      _                <- client.describeConsumerGroups(consumerGroupIds)
      _ <- consumerGroupIds.parTraverse { groupId =>
             client.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata
           }
    } yield ()
  }

ACLs

There are ACL management functions to describe, create and delete ACL entries.

import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}

def aclOperations[F[_]: Async]: F[Unit] =
  kafkaAdminClientResource[F]("localhost:9092").use { client =>
    for {
      describedAcls <- client.describeAcls(AclBindingFilter.ANY)

      aclEntry = new AccessControlEntry(
                   "User:ANONYMOUS",
                   "*",
                   AclOperation.DESCRIBE,
                   AclPermissionType.ALLOW
                 )
      pattern = new ResourcePattern(ResourceType.TOPIC, "topic1", PatternType.LITERAL)
      acl     = new AclBinding(pattern, aclEntry)
      _      <- client.createAcls(List(acl))

      _ <- client.deleteAcls(List(AclBindingFilter.ANY))
    } yield ()
  }
← TransactionsModules →
  • Settings
    • Default Settings
  • Client Creation
  • Topics
  • Configurations
  • Cluster Metadata
  • Consumer Groups
  • ACLs

Copyright © 2018-2025 OVO Energy Limited.
Icon by Franco Averta. CC BY 3.0.