commit 915ed96d8e81c283470f300724de4211eb1e8581 Author: Pavel Kachalouski Date: Tue Mar 18 09:13:56 2025 +0100 Cinnamon with RestartXXX operators diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fa0e4cd --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.bsp/ +.idea/ +**/.bloop/ +.metals/ +.vscode/ + +target/ + +# Version for publishing is determined by commit ID or tag +version.sbt diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..0003111 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,3 @@ +version = "3.5.9" +runner.dialect = scala3 +maxColumn = 220 \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..c2ed83d --- /dev/null +++ b/build.sbt @@ -0,0 +1,29 @@ +import Dependencies._ + +Global / resolvers ++= Seq( + "Akka library repository".at("https://repo.akka.io/maven"), + "lightbend-commercial-mvn".at("https://repo.lightbend.com/pass/X6HMc6Ox2AKTAhqaLgnHjxb7yUCwpqULrhgmMc7ef3S4oYC5/commercial-releases") +) + +scalaVersion := "3.3.5" +fork := true + +libraryDependencies ++= Seq( + akkaStream, + akkaTyped, + scalactic % Test, + scalaLogging, + scalaTest % Test, + // Cinnamon dependencies + Cinnamon.library.cinnamonAkkaTyped, + Cinnamon.library.cinnamonAkkaStream, + Cinnamon.library.cinnamonSlf4jMdc, + Cinnamon.library.cinnamonPrometheus, + Cinnamon.library.cinnamonPrometheusHttpServer +) + +run / cinnamon := true +test / cinnamon := true +cinnamonLogLevel := "INFO" + +enablePlugins(CinnamonAgentOnly) diff --git a/project/Dependencies.scala b/project/Dependencies.scala new file mode 100644 index 0000000..9c928ab --- /dev/null +++ b/project/Dependencies.scala @@ -0,0 +1,20 @@ +import Versions._ +import sbt._ + +case object Versions { + val akkaVersion = "2.10.0" + val logbackVersion = "1.5.8" + val scalaLoggingVersion = "3.9.5" + val scalaTestVersion = "3.2.19" +} + +case object Dependencies { + val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion + val akkaStreamTyped = "com.typesafe.akka" %% "akka-stream-typed" % akkaVersion + val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion + val akkaTypedTestkit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion + val logback = "ch.qos.logback" % "logback-classic" % logbackVersion + val scalactic = "org.scalactic" %% "scalactic" % scalaTestVersion + val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion + val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion +} diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..0b699c3 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.10.2 diff --git a/project/metals.sbt b/project/metals.sbt new file mode 100644 index 0000000..29f1c9d --- /dev/null +++ b/project/metals.sbt @@ -0,0 +1,13 @@ +// format: off +// DO NOT EDIT! This file is auto-generated. + +// This plugin enables semantic information to be produced by sbt. +// It also adds support for debugging using the Debug Adapter Protocol + +addSbtPlugin("org.scalameta" % "sbt-metals" % "1.5.1") + +// This plugin adds the BSP debug capability to sbt server. + +addSbtPlugin("ch.epfl.scala" % "sbt-debug-adapter" % "4.2.2") + +// format: on diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..490acb5 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.lightbend.cinnamon" % "sbt-cinnamon" % "2.21.2") diff --git a/project/project/metals.sbt b/project/project/metals.sbt new file mode 100644 index 0000000..bf5021a --- /dev/null +++ b/project/project/metals.sbt @@ -0,0 +1,14 @@ +// format: off +// DO NOT EDIT! This file is auto-generated. + +// This plugin enables semantic information to be produced by sbt. +// It also adds support for debugging using the Debug Adapter Protocol + +addSbtPlugin("org.scalameta" % "sbt-metals" % "1.5.1") + +// This plugin makes sure that the JDI tools are in the sbt classpath. +// JDI tools are used by the debug adapter server. + +addSbtPlugin("com.github.sbt" % "sbt-jdi-tools" % "1.2.0") + +// format: on diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 0000000..7e132b2 --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,9 @@ + +cinnamon { + prometheus { + exporters += http-server + http-server { + port = 9095 + } + } +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..cb557b8 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + true + + { + "traceID": "%mdc{Trace-ID}", + "logger": "%logger{36}", + "akkaSource": "%mdc{akkaSource}", + "thread": "%thread" + } + + + + + + + + + + + diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf new file mode 100644 index 0000000..e69de29 diff --git a/src/main/scala/com/xeppaka/app/App.scala b/src/main/scala/com/xeppaka/app/App.scala new file mode 100644 index 0000000..734439a --- /dev/null +++ b/src/main/scala/com/xeppaka/app/App.scala @@ -0,0 +1,52 @@ +package com.xeppaka.app + +import akka.actor.typed.ActorSystem +import akka.actor.typed.SpawnProtocol +import akka.stream.RestartSettings +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.RestartFlow +import akka.stream.scaladsl.RestartSink +import akka.stream.scaladsl.RestartSource +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.Timeout +import com.lightbend.cinnamon.akka.stream.CinnamonAttributes +import com.lightbend.cinnamon.akka.stream.CinnamonAttributes.GraphWithInstrumented + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +object App { + def main(args: Array[String]): Unit = { + implicit val timeout: Timeout = 2.seconds + implicit val system: ActorSystem[SpawnProtocol.Command] = ActorSystem(SpawnProtocol(), "app-test") + implicit val executionContext: ExecutionContext = system.executionContext + val log = system.log + + val restartSettings = RestartSettings(1.second, 3.seconds, 0.2) + val processor1 = Flow[Int].map(i => i - 1) + + RestartSource + .withBackoff(restartSettings) { () => + Source.fromIterator(() => Range(1, Int.MaxValue).iterator) + } + .via(RestartFlow.onFailuresWithBackoff(restartSettings) { () => + Flow[Int] + .map(identity) + .throttle(1, 1.second) + .via(processor1) + // .map { i => + // if (i % 100000 == 0) { + // throw new Exception("BOOM!") + // } + // i + // } + }) + .to(RestartSink.withBackoff(restartSettings) { () => + Sink.foreach(println) + }) + // .addAttributes(CinnamonAttributes.instrumentedByName()) + .instrumented(name = "stream-0001") + .run() + } +}