Modules
The following sections describe the additional modules.
Vulcan
The fs2-kafka-vulcan
module provides Avro serialization support using Vulcan.
Add it to your project in build.sbt;
libraryDependencies += "com.github.fd4s" %% "fs2-kafka-vulcan" % fs2KafkaVersion
resolvers += "confluent" at "https://packages.confluent.io/maven/",
We start by defining the type we want to serialize or deserialize, and create a Codec
.
import cats.syntax.all._
import vulcan.Codec
final case class Person(name: String, age: Option[Int])
implicit val personCodec: Codec[Person] =
Codec.record(
name = "Person",
namespace = "com.example"
) { field =>
(
field("name", _.name),
field("age", _.age)
).mapN(Person(_, _))
}
// personCodec: Codec[Person] = WithTypeName(
// codec = Validated(
// codec = Codec({
// "type" : "record",
// "name" : "Person",
// "namespace" : "com.example",
// "fields" : [ {
// "name" : "name",
// "type" : "string"
// }, {
// "name" : "age",
// "type" : [ "null", "int" ]
// } ]
// }),
// validSchema = {"type":"record","name":"Person","namespace":"com.example","fields":[{"name":"name","type":"string"},{"name":"age","type":["null","int"]}]}
// ),
// typeName = "com.example.Person"
// )
We then define AvroSettings
, describing the schema registry settings.
import cats.effect.IO
import fs2.kafka.vulcan.{Auth, AvroSettings, SchemaRegistryClientSettings}
val avroSettings =
AvroSettings {
SchemaRegistryClientSettings[IO]("http://localhost:8081")
.withAuth(Auth.Basic("username", "password"))
}
We can then create a Serializer
and Deserializer
instance for Person
.
import cats.effect.Resource
import fs2.kafka.{ValueDeserializer, ValueSerializer}
import fs2.kafka.vulcan.{avroDeserializer, avroSerializer}
implicit val personSerializer: Resource[IO, ValueSerializer[IO, Person]] =
avroSerializer[Person].forValue(avroSettings)
implicit val personDeserializer: Resource[IO, ValueDeserializer[IO, Person]] =
avroDeserializer[Person].forValue(avroSettings)
Finally, we can create settings, passing the Serializer
s and Deserializer
s implicitly.
import fs2.kafka.{AutoOffsetReset, ConsumerSettings, ProducerSettings}
val consumerSettings =
ConsumerSettings[IO, String, Person]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:9092")
.withGroupId("group")
val producerSettings =
ProducerSettings[IO, String, Person].withBootstrapServers("localhost:9092")
If we prefer, we can instead specify the Serializer
s and Deserializer
s explicitly.
import fs2.kafka.{Deserializer, Serializer}
ConsumerSettings(
keyDeserializer = Deserializer[IO, String],
valueDeserializer = personDeserializer
).withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:9092")
.withGroupId("group")
ProducerSettings(
keySerializer = Serializer[IO, String],
valueSerializer = personSerializer
).withBootstrapServers("localhost:9092")
By default, a schema will automatically be registered when used to publish a message. We can disable this behaviour by
using withAutoRegisterSchemas(false)
. We can then use registerSchema
to manually register the schema with the registry server:
val avroSettingsWithoutAutoRegister = avroSettings.withAutoRegisterSchemas(false)
avroSettingsWithoutAutoRegister.registerSchema[String]("person-key") *>
avroSettingsWithoutAutoRegister.registerSchema[Person]("person-value")
Sharing Client
When creating AvroSettings
with SchemaRegistryClientSettings
, one schema registry client will be created per Serializer
or Deserializer
. For many cases, this is completely fine, but it's possible to reuse a single client for multiple Serializer
s and Deserializer
s.
To share a SchemaRegistryClient
, we first create it and then pass it to AvroSettings
.
val avroSettingsSharedClient: IO[AvroSettings[IO]] =
SchemaRegistryClientSettings[IO]("http://localhost:8081")
.withAuth(Auth.Basic("username", "password"))
.createSchemaRegistryClient
.map(AvroSettings(_))
We can then create multiple Serializer
s and Deserializer
s using the AvroSettings
.
avroSettingsSharedClient.map { avroSettings =>
val personSerializer: Resource[IO, ValueSerializer[IO, Person]] =
avroSerializer[Person].forValue(avroSettings)
val personDeserializer: Resource[IO, ValueDeserializer[IO, Person]] =
avroDeserializer[Person].forValue(avroSettings)
val consumerSettings =
ConsumerSettings(
keyDeserializer = Deserializer[IO, String],
valueDeserializer = personDeserializer
).withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:9092")
.withGroupId("group")
val producerSettings =
ProducerSettings(
keySerializer = Serializer[IO, String],
valueSerializer = personSerializer
).withBootstrapServers("localhost:9092")
(consumerSettings, producerSettings)
}
Vulcan testkit munit
The fs2-kafka-vulcan-testkit-munit
module provides an munit fixture for testing vulcan
codecs against a schema registry
A usage example:
import cats.effect.unsafe.implicits.global
import fs2.kafka.vulcan.testkit.SchemaSuite
import fs2.kafka.vulcan.SchemaRegistryClientSettings
import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType
import vulcan.Codec
class MySpec extends SchemaSuite {
val checker = compatibilityChecker(
SchemaRegistryClientSettings("https://some-schema-registry:1234")
)
override def munitFixtures = List(checker)
test("my codec is compatible") {
val myCodec: Codec[String] = ???
val compatibility = checker()
.assertReaderCompatibility(myCodec, "my-schema-subject")
.unsafeRunSync()
}
}