FS2 Kafka

FS2 Kafka

  • API Docs
  • Documentation
  • GitHub

›Documentation

Documentation

  • Overview
  • Quick Example
  • Consumers
  • Producers
  • Transactions
  • Admin
  • Modules
  • Technical Details

Modules

The following sections describe the additional modules.

Vulcan

The fs2-kafka-vulcan module provides Avro serialization support using Vulcan.

Add it to your project in build.sbt;

libraryDependencies += "com.github.fd4s" %% "fs2-kafka-vulcan" % fs2KafkaVersion
resolvers += "confluent" at "https://packages.confluent.io/maven/",

We start by defining the type we want to serialize or deserialize, and create a Codec.

import cats.syntax.all._

import vulcan.Codec

final case class Person(name: String, age: Option[Int])

implicit val personCodec: Codec[Person] =
  Codec.record(
    name = "Person",
    namespace = "com.example"
  ) { field =>
    (
      field("name", _.name),
      field("age", _.age)
    ).mapN(Person(_, _))
  }
// personCodec: Codec[Person] = WithTypeName(
//   codec = Validated(
//     codec = Codec({
//   "type" : "record",
//   "name" : "Person",
//   "namespace" : "com.example",
//   "fields" : [ {
//     "name" : "name",
//     "type" : "string"
//   }, {
//     "name" : "age",
//     "type" : [ "null", "int" ]
//   } ]
// }),
//     validSchema = {"type":"record","name":"Person","namespace":"com.example","fields":[{"name":"name","type":"string"},{"name":"age","type":["null","int"]}]}
//   ),
//   typeName = "com.example.Person"
// )

We then define AvroSettings, describing the schema registry settings.

import cats.effect.IO
import fs2.kafka.vulcan.{Auth, AvroSettings, SchemaRegistryClientSettings}

val avroSettings =
  AvroSettings {
    SchemaRegistryClientSettings[IO]("http://localhost:8081").withAuth(
      Auth.Basic("username", "password")
    )
  }

We can then create a Serializer and Deserializer instance for Person.

import cats.effect.Resource
import fs2.kafka.{ValueDeserializer, ValueSerializer}
import fs2.kafka.vulcan.{avroDeserializer, avroSerializer}

implicit val personSerializer: Resource[IO, ValueSerializer[IO, Person]] =
  avroSerializer[Person].forValue(avroSettings)

implicit val personDeserializer: Resource[IO, ValueDeserializer[IO, Person]] =
  avroDeserializer[Person].forValue(avroSettings)

Finally, we can create settings, passing the Serializers and Deserializers implicitly.

import fs2.kafka.{AutoOffsetReset, ConsumerSettings, ProducerSettings}

val consumerSettings =
  ConsumerSettings[IO, String, Person]
    .withAutoOffsetReset(AutoOffsetReset.Earliest)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group")

val producerSettings =
  ProducerSettings[IO, String, Person].withBootstrapServers("localhost:9092")

If we prefer, we can instead specify the Serializers and Deserializers explicitly.

import fs2.kafka.{Deserializer, Serializer}

ConsumerSettings(
  keyDeserializer = Deserializer[IO, String],
  valueDeserializer = personDeserializer
).withAutoOffsetReset(AutoOffsetReset.Earliest)
  .withBootstrapServers("localhost:9092")
  .withGroupId("group")

ProducerSettings(
  keySerializer = Serializer[IO, String],
  valueSerializer = personSerializer
).withBootstrapServers("localhost:9092")

By default, a schema will automatically be registered when used to publish a message. We can disable this behaviour by using withAutoRegisterSchemas(false). We can then use registerSchema to manually register the schema with the registry server:

val avroSettingsWithoutAutoRegister = avroSettings.withAutoRegisterSchemas(false)
avroSettingsWithoutAutoRegister.registerSchema[String]("person-key") *>
  avroSettingsWithoutAutoRegister.registerSchema[Person]("person-value")

Sharing Client

When creating AvroSettings with SchemaRegistryClientSettings, one schema registry client will be created per Serializer or Deserializer. For many cases, this is completely fine, but it's possible to reuse a single client for multiple Serializers and Deserializers.

To share a SchemaRegistryClient, we first create it and then pass it to AvroSettings.

val avroSettingsSharedClient: IO[AvroSettings[IO]] =
  SchemaRegistryClientSettings[IO]("http://localhost:8081")
    .withAuth(Auth.Basic("username", "password"))
    .createSchemaRegistryClient
    .map(AvroSettings(_))

We can then create multiple Serializers and Deserializers using the AvroSettings.

avroSettingsSharedClient.map { avroSettings =>
  val personSerializer: Resource[IO, ValueSerializer[IO, Person]] =
    avroSerializer[Person].forValue(avroSettings)

  val personDeserializer: Resource[IO, ValueDeserializer[IO, Person]] =
    avroDeserializer[Person].forValue(avroSettings)

  val consumerSettings =
    ConsumerSettings(
      keyDeserializer = Deserializer[IO, String],
      valueDeserializer = personDeserializer
    ).withAutoOffsetReset(AutoOffsetReset.Earliest)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group")

  val producerSettings =
    ProducerSettings(
      keySerializer = Serializer[IO, String],
      valueSerializer = personSerializer
    ).withBootstrapServers("localhost:9092")

  (consumerSettings, producerSettings)
}

Vulcan testkit munit

The fs2-kafka-vulcan-testkit-munit module provides an munit fixture for testing vulcan codecs against a schema registry

A usage example:

import cats.effect.unsafe.implicits.global
import fs2.kafka.vulcan.testkit.SchemaSuite
import fs2.kafka.vulcan.SchemaRegistryClientSettings

import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType
import vulcan.Codec

class MySpec extends SchemaSuite {

  val checker = compatibilityChecker(
    SchemaRegistryClientSettings("https://some-schema-registry:1234")
  )

  override def munitFixtures = List(checker)

  test("my codec is compatible") {
    val myCodec: Codec[String] = ???

    val compatibility = checker()
      .assertReaderCompatibility(myCodec, "my-schema-subject")
      .unsafeRunSync()
  }

}
← AdminTechnical Details →
  • Vulcan
    • Sharing Client
  • Vulcan testkit munit

Copyright © 2018-2025 OVO Energy Limited.
Icon by Franco Averta. CC BY 3.0.