Running as an Armeria server
Endpoints can be mounted as TapirService[S, F]
on top of Armeria’s HttpServiceWithRoutes
.
Armeria interpreter can be used with different effect systems (cats-effect, ZIO) as well as Scala’s standard Future
.
Scala’s standard Future
Add the following dependency
"com.softwaremill.sttp.tapir" %% "tapir-armeria-server" % "1.10.6"
and import the object:
import sttp.tapir.server.armeria.ArmeriaFutureServerInterpreter
to use this interpreter with Future
.
The toService
method require a single, or a list of ServerEndpoint
s, which can be created by adding
server logic to an endpoint.
import sttp.tapir._
import sttp.tapir.server.armeria.ArmeriaFutureServerInterpreter
import scala.concurrent.Future
import com.linecorp.armeria.server.Server
object Main {
// JVM entry point that starts the HTTP server
def main(args: Array[String]): Unit = {
val tapirEndpoint: PublicEndpoint[(String, Int), Unit, String, Any] = ??? // your definition here
def logic(s: String, i: Int): Future[Either[Unit, String]] = ??? // your logic here
val tapirService = ArmeriaFutureServerInterpreter().toService(tapirEndpoint.serverLogic((logic _).tupled))
val server = Server
.builder()
.service(tapirService) // your endpoint is bound to the server
.build()
server.start().join()
}
}
This interpreter also supports streaming using Armeria Streams which is fully compatible with Reactive Streams:
import sttp.capabilities.armeria.ArmeriaStreams
import sttp.tapir._
import sttp.tapir.server.armeria.ArmeriaFutureServerInterpreter
import scala.concurrent.Future
import com.linecorp.armeria.common.HttpData
import com.linecorp.armeria.common.stream.StreamMessage
import org.reactivestreams.Publisher
val streamingResponse: PublicEndpoint[Int, Unit, Publisher[HttpData], ArmeriaStreams] =
endpoint
.in("stream")
.in(query[Int]("key"))
.out(streamTextBody(ArmeriaStreams)(CodecFormat.TextPlain()))
def streamLogic(foo: Int): Future[Publisher[HttpData]] = {
Future.successful(StreamMessage.of(HttpData.ofUtf8("hello"), HttpData.ofUtf8("world")))
}
val tapirService = ArmeriaFutureServerInterpreter().toService(streamingResponse.serverLogicSuccess(streamLogic))
Configuration
Every endpoint can be configured by providing an instance of ArmeriaFutureEndpointOptions
, see server options for details.
Note that Armeria automatically injects an ExecutionContext
on top of Armeria’s EventLoop
to invoke the logic.
Cats Effect
Add the following dependency
"com.softwaremill.sttp.tapir" %% "tapir-armeria-server-cats" % "1.10.6"
to use this interpreter with Cats Effect typeclasses.
Then import the object:
import sttp.tapir.server.armeria.cats.ArmeriaCatsServerInterpreter
This object contains the toService(e: ServerEndpoint[Fs2Streams[F], F])
method which returns a TapirService[Fs2Streams[F], F]
.
An HTTP server can then be started as in the following example:
import sttp.tapir._
import sttp.tapir.server.armeria.cats.ArmeriaCatsServerInterpreter
import cats.effect._
import cats.effect.std.Dispatcher
import com.linecorp.armeria.server.Server
object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val tapirEndpoint: PublicEndpoint[String, Unit, String, Any] = ???
def logic(req: String): IO[Either[Unit, String]] = ???
Dispatcher[IO]
.flatMap { dispatcher =>
Resource
.make(
IO.async_[Server] { cb =>
val tapirService = ArmeriaCatsServerInterpreter[IO](dispatcher).toService(tapirEndpoint.serverLogic(logic))
val server = Server
.builder()
.service(tapirService)
.build()
server.start().handle[Unit] {
case (_, null) => cb(Right(server))
case (_, cause) => cb(Left(cause))
}
}
)({ server =>
IO.fromCompletableFuture(IO(server.closeAsync())).void
})
}
.use(_ => IO.never)
}
}
This interpreter also supports streaming using FS2 streams:
import sttp.capabilities.fs2.Fs2Streams
import sttp.tapir._
import sttp.tapir.server.armeria.cats.ArmeriaCatsServerInterpreter
import cats.effect._
import cats.effect.std.Dispatcher
import fs2._
val streamingResponse: Endpoint[Unit, Int, Unit, Stream[IO, Byte], Fs2Streams[IO]] =
endpoint
.in("stream")
.in(query[Int]("times"))
.out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain()))
def streamLogic(times: Int): IO[Stream[IO, Byte]] = {
IO.pure(Stream.chunk(Chunk.array("Hello world!".getBytes)).repeatN(times))
}
def dispatcher: Dispatcher[IO] = ???
val tapirService = ArmeriaCatsServerInterpreter(dispatcher).toService(streamingResponse.serverLogicSuccess(streamLogic))
ZIO
Add the following dependency
"com.softwaremill.sttp.tapir" %% "tapir-armeria-server-zio" % "1.10.6"
to use this interpreter with ZIO.
Then import the object:
import sttp.tapir.server.armeria.zio.ArmeriaZioServerInterpreter
This object contains toService(e: ServerEndpoint[ZioStreams, RIO[R, *]])
method which returns a TapirService[ZioStreams, RIO[R, *]]
.
An HTTP server can then be started as in the following example:
import com.linecorp.armeria.server.Server
import sttp.tapir._
import sttp.tapir.server.armeria.zio.ArmeriaZioServerInterpreter
import sttp.tapir.ztapir._
import zio.{ExitCode, Runtime, UIO, URIO, ZIO, ZIOAppDefault}
object Main extends ZIOAppDefault {
override def run: URIO[Any, ExitCode] = {
implicit val runtime = Runtime.default
val tapirEndpoint: PublicEndpoint[String, Unit, String, Any] = ???
def logic(key: String): UIO[String] = ???
val s = ZIO.fromCompletableFuture {
val tapirService = ArmeriaZioServerInterpreter().toService(tapirEndpoint.zServerLogic(logic))
val server = Server
.builder()
.service(tapirService)
.build()
server.start().thenApply[Server](_ => server)
}
ZIO.scoped(ZIO.acquireRelease(s)(server => ZIO.fromCompletableFuture(server.closeAsync()).orDie) *> ZIO.never).exitCode
}
}
This interpreter supports streaming using ZStreams.