Codecs
Codec
is the central concept in the library. Codec
s represent an Avro schema
, together with an encode
and decode
function for converting between Scala types and types recognized by the Apache Avro library. There are default Codec
s defined for many standard library types. For example, we can check what the default Option[Instant]
encoding is by asking for a Codec
instance.
import java.time.Instant
import vulcan.Codec
Codec[Option[Instant]]
// res1: Codec[Option[Instant]] = WithTypeName(
// codec = Validated(
// codec = OptionCodec(
// codec = WithTypeName(
// codec = Validated(
// codec = WithLogicalType(
// codec = Validated(
// codec = ImapErrors(
// codec = Codec("long"),
// f = vulcan.Codec$WithValidSchema$$Lambda$16677/0x0000000104595040@76f98646,
// g = vulcan.Codec$WithValidSchema$$Lambda$16678/0x0000000104594040@5d579d91
// ),
// validSchema = "long"
// ),
// logicalType = org.apache.avro.LogicalTypes$TimestampMillis@71bd2999
// ),
// validSchema = {"type":"long","logicalType":"timestamp-millis"}
// ),
// typeName = "Instant"
// )
// ),
// validSchema = ["null",{"type":"long","logicalType":"timestamp-millis"}]
// ),
// typeName = "Option"
// )
In some cases, it's not possible to generate Avro schemas. This is why Codec
schema
s are wrapped in Either
with error type AvroError
. For example, Avro doesn't support nested unions, so what would happen when we try to ask for a Codec
for Option[Option[Instant]]
?
Codec[Option[Option[Instant]]]
// res2: Codec[Option[Option[Instant]]] = Fail(
// error = AvroError(org.apache.avro.AvroRuntimeException: Duplicate in union:null)
// )
Encoding and decoding with a Codec
might also be unsuccessful, so results are wrapped in Either
with error type AvroError
. Encoding accepts a value to be encoded according to the schema defined by the Codec
, and decoding accepts a value and a schema to decode the value against. For example, what happens if we try to decode Int
s using a Boolean
schema?
import org.apache.avro.SchemaBuilder
Codec[Int].decode(10, SchemaBuilder.builder.booleanType)
// res3: Either[vulcan.AvroError, Int] = Left(
// value = ErrorDecodingType(
// decodingTypeName = "Int",
// cause = AvroError(Got unexpected schema type BOOLEAN, expected schema type INT)
// )
// )
Since the Apache Avro library encodes and decodes using Object
, Codec
s encode and decode between Scala types and Any
. This means type safety is lost and tests should be used to ensure Codec
s work as intended. This becomes important when we define Codec
s from scratch. Note Schema
s are treated as effectively immutable, even though they're in fact mutable.
Codec
s form invariant functors, which means you can easily use an existing Codec
to encode and decode a different type, by mapping back-and-forth between the Codec
's existing type argument and the new type. This becomes useful when dealing with newtypes, like InstallationTime
in the following example.
final case class InstallationTime(value: Instant)
Codec[Instant].imap(InstallationTime(_))(_.value)
// res4: Codec.Aux[Codec.instant.AvroType, InstallationTime] = ImapErrors(
// codec = WithTypeName(
// codec = Validated(
// codec = WithLogicalType(
// codec = Validated(
// codec = ImapErrors(
// codec = Codec("long"),
// f = vulcan.Codec$WithValidSchema$$Lambda$16677/0x0000000104595040@76f98646,
// g = vulcan.Codec$WithValidSchema$$Lambda$16678/0x0000000104594040@5d579d91
// ),
// validSchema = "long"
// ),
// logicalType = org.apache.avro.LogicalTypes$TimestampMillis@71bd2999
// ),
// validSchema = {"type":"long","logicalType":"timestamp-millis"}
// ),
// typeName = "Instant"
// ),
// f = vulcan.Codec$WithValidSchema$$Lambda$16677/0x0000000104595040@28b092c7,
// g = vulcan.Codec$WithValidSchema$$Lambda$16678/0x0000000104594040@51983ade
// )
When we have a newtype where we ensure values are valid, we can use imapError
instead.
import vulcan.AvroError
sealed abstract case class SerialNumber(value: String)
object SerialNumber {
def apply(value: String): Either[AvroError, SerialNumber] =
if(value.length == 12 && value.forall(_.isDigit))
Right(new SerialNumber(value) {})
else Left(AvroError(s"$value is not a serial number"))
}
Codec[String].imapError(SerialNumber(_))(_.value)
// res5: Codec.Aux[Codec.string.AvroType, SerialNumber] = ImapErrors(
// codec = Codec("string"),
// f = <function1>,
// g = vulcan.Codec$$Lambda$16748/0x00000001046ba840@3ba68b9c
// )
Decimals
Avro decimals closely correspond to BigDecimal
s with a fixed precision and scale.
Codec.decimal
can be used to create a Codec
for BigDecimal
given the precision and scale. When encoding, the Codec
checks the precision and scale of the BigDecimal
to make sure it matches the precision and scale defined in the schema. When decoding, we check the precision is not exceeded.
Codec.decimal(precision = 10, scale = 2)
// res6: Codec.Aux[vulcan.Avro.Bytes, BigDecimal] = DecimalCodec(
// precision = 10,
// scale = 2,
// validSchema = {"type":"bytes","logicalType":"decimal","precision":10,"scale":2}
// )
Enumerations
Avro enumerations closely correspond to sealed trait
s with case object
s.
We can use Codec.enumeration
to specify an encoding.
sealed trait Fruit
case object Apple extends Fruit
case object Banana extends Fruit
case object Cherry extends Fruit
Codec.enumeration[Fruit](
name = "Fruit",
namespace = "com.example",
doc = Some("A selection of different fruits"),
symbols = List("apple", "banana", "cherry"),
encode = {
case Apple => "apple"
case Banana => "banana"
case Cherry => "cherry"
},
decode = {
case "apple" => Right(Apple)
case "banana" => Right(Banana)
case "cherry" => Right(Cherry)
case other => Left(AvroError(s"$other is not a Fruit"))
},
default = Some(Banana)
)
// res7: Codec.Aux[vulcan.Avro.EnumSymbol, Fruit] = WithTypeName(
// codec = Validated(
// codec = Codec({
// "type" : "enum",
// "name" : "Fruit",
// "namespace" : "com.example",
// "doc" : "A selection of different fruits",
// "symbols" : [ "apple", "banana", "cherry" ],
// "default" : "banana"
// }),
// validSchema = {"type":"enum","name":"Fruit","namespace":"com.example","doc":"A selection of different fruits","symbols":["apple","banana","cherry"],"default":"banana"}
// ),
// typeName = "com.example.Fruit"
// )
Derivation for enumeration types can be partly automated using the generic or enumeratum modules, although these will not support Scala 3 for the foreseeable future.
Fixed
Avro fixed types correspond to Array[Byte]
s with a fixed size.
We can use Codec.fixed
to define a codec.
sealed abstract case class Pence(value: Byte)
object Pence {
def apply(value: Byte): Either[AvroError, Pence] =
if(0 <= value && value < 100) Right(new Pence(value) {})
else Left(AvroError(s"Expected pence value, got $value"))
}
Codec.fixed[Pence](
name = "Pence",
namespace = "com.example",
size = 1,
encode = pence => Array[Byte](pence.value),
decode = bytes => Pence(bytes.head),
doc = Some("Amount of pence as a single byte")
)
// res8: Codec.Aux[vulcan.Avro.Fixed, Pence] = WithTypeName(
// codec = Validated(
// codec = Codec({
// "type" : "fixed",
// "name" : "Pence",
// "namespace" : "com.example",
// "doc" : "Amount of pence as a single byte",
// "size" : 1
// }),
// validSchema = {"type":"fixed","name":"Pence","namespace":"com.example","doc":"Amount of pence as a single byte","size":1}
// ),
// typeName = "com.example.Pence"
// )
Records
Avro records closely correspond to case class
es.
We can use Codec.record
to specify a record encoding.
import cats.implicits._
final case class Person(firstName: String, lastName: String, age: Option[Int])
Codec.record[Person](
name = "Person",
namespace = "com.example",
doc = Some("Person with a full name and optional age")
) { field =>
field("fullName", p => s"${p.firstName} ${p.lastName}") *>
(
field("firstName", _.firstName),
field("lastName", _.lastName, doc = Some("the last name")),
field("age", _.age, default = Some(None))
).mapN(Person(_, _, _))
}
// res9: Codec.Aux[vulcan.Avro.Record, Person] = WithTypeName(
// codec = Validated(
// codec = Codec({
// "type" : "record",
// "name" : "Person",
// "namespace" : "com.example",
// "doc" : "Person with a full name and optional age",
// "fields" : [ {
// "name" : "fullName",
// "type" : "string"
// }, {
// "name" : "firstName",
// "type" : "string"
// }, {
// "name" : "lastName",
// "type" : "string",
// "doc" : "the last name"
// }, {
// "name" : "age",
// "type" : [ "null", "int" ],
// "default" : null
// } ]
// }),
// validSchema = {"type":"record","name":"Person","namespace":"com.example","doc":"Person with a full name and optional age","fields":[{"name":"fullName","type":"string"},{"name":"firstName","type":"string"},{"name":"lastName","type":"string","doc":"the last name"},{"name":"age","type":["null","int"],"default":null}]}
// ),
// typeName = "com.example.Person"
// )
For generic derivation support, refer to the generic module.
Unions
Avro unions closely correspond to sealed trait
s.
We can use Codec.union
to specify a union encoding.
sealed trait FirstOrSecond
final case class First(value: Int) extends FirstOrSecond
object First {
implicit val codec: Codec[First] =
Codec[Int].imap(apply)(_.value)
}
final case class Second(value: String) extends FirstOrSecond
object Second {
implicit val codec: Codec[Second] =
Codec[String].imap(apply)(_.value)
}
Codec.union[FirstOrSecond] { alt =>
alt[First] |+| alt[Second]
}
// res10: Codec.Aux[Any, FirstOrSecond] = WithTypeName(
// codec = Validated(
// codec = UnionCodec(
// alts = Append(
// leftNE = Singleton(a = vulcan.Codec$Alt$$anon$6@6862658e),
// rightNE = Singleton(a = vulcan.Codec$Alt$$anon$6@4d6c896d)
// )
// ),
// validSchema = ["int","string"]
// ),
// typeName = "union"
// )
For generic derivation support, refer to the generic module.