Natchez FS2
This is one of the more experimental modules on offer. It provides an AllocatedSpan
which must be manually
submitted on completion rather than a cats Resource
. This is useful in applications where per-element Resources
are unwieldy,
i.e. Kafka consumers using FS2.
Installation
val natchezExtrasVersion = "8.1.0"
libraryDependencies ++= Seq(
"com.ovoenergy" %% "natchez-extras-fs2" % natchezExtrasVersion
)
Usage
natchez-extras-fs2
provides an FS2 Pipe
that given an element in a stream returns it alongside an AllocatedSpan
.
You can then create subspans for this span as you process the message before manually committing it with .submit
.
A small syntax
object provides two functions - evalMapNamed
and evalMapTraced
to reduce the boilerplate involved
in unwrapping the message, creating a subspan and processing it.
If the stream is cancelled the span will be closed automatically.
import cats.Monad
import cats.effect.{ExitCode, IO, IOApp}
import com.ovoenergy.natchez.extras.fs2.syntax._
import com.ovoenergy.natchez.extras.fs2.AllocatedSpan
import com.ovoenergy.natchez.extras.slf4j.Slf4j
import fs2._
import natchez.{EntryPoint, Kernel}
import scala.concurrent.duration._
object NatchezFS2 extends IOApp {
// a message from e.g. Kafka or SQS.
case class Message[F[_]](kernel: Kernel, body: String, commit: F[Unit])
// an infinite stream of messages
def source[F[_]: Monad]: Stream[F, Message[F]] =
Stream.emit(Message(Kernel(Map.empty), "test", Monad[F].unit)).repeat
val entryPoint: EntryPoint[IO] =
Slf4j.entryPoint[IO]
def run(args: List[String]): IO[ExitCode] =
source[IO]
.through(AllocatedSpan.create()(msg => entryPoint.continueOrElseRoot("consume", msg.kernel)))
.evalMapNamed("processing-step-1")(m => IO.sleep(1.second).as(m))
.evalMapNamed("processing-step-2")(m => IO.sleep(2.seconds).as(m))
.evalMapNamed("commit")(_.commit)
.evalMap(_.span.submit)
.compile
.drain
.as(ExitCode.Error)
}