Cinnamon with RestartXXX operators

This commit is contained in:
Pavel Kachalouski
2025-03-18 09:13:56 +01:00
commit dde793d631
12 changed files with 186 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
.bsp/
.idea/
**/.bloop/
.metals/
.vscode/
target/
# Version for publishing is determined by commit ID or tag
version.sbt

3
.scalafmt.conf Normal file
View File

@@ -0,0 +1,3 @@
version = "3.5.9"
runner.dialect = scala3
maxColumn = 220

29
build.sbt Normal file
View File

@@ -0,0 +1,29 @@
import Dependencies._
Global / resolvers ++= Seq(
"Akka library repository".at("https://repo.akka.io/maven"),
"lightbend-commercial-mvn".at("https://repo.lightbend.com/pass/X6HMc6Ox2AKTAhqaLgnHjxb7yUCwpqULrhgmMc7ef3S4oYC5/commercial-releases")
)
scalaVersion := "3.3.5"
fork := true
libraryDependencies ++= Seq(
akkaStream,
akkaTyped,
scalactic % Test,
scalaLogging,
scalaTest % Test,
// Cinnamon dependencies
Cinnamon.library.cinnamonAkkaTyped,
Cinnamon.library.cinnamonAkkaStream,
Cinnamon.library.cinnamonSlf4jMdc,
Cinnamon.library.cinnamonPrometheus,
Cinnamon.library.cinnamonPrometheusHttpServer
)
run / cinnamon := true
test / cinnamon := true
cinnamonLogLevel := "INFO"
enablePlugins(CinnamonAgentOnly)

View File

@@ -0,0 +1,20 @@
import Versions._
import sbt._
case object Versions {
val akkaVersion = "2.10.0"
val logbackVersion = "1.5.8"
val scalaLoggingVersion = "3.9.5"
val scalaTestVersion = "3.2.19"
}
case object Dependencies {
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
val akkaStreamTyped = "com.typesafe.akka" %% "akka-stream-typed" % akkaVersion
val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion
val akkaTypedTestkit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
val scalactic = "org.scalactic" %% "scalactic" % scalaTestVersion
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion
val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
}

1
project/build.properties Normal file
View File

@@ -0,0 +1 @@
sbt.version=1.10.2

13
project/metals.sbt Normal file
View File

@@ -0,0 +1,13 @@
// format: off
// DO NOT EDIT! This file is auto-generated.
// This plugin enables semantic information to be produced by sbt.
// It also adds support for debugging using the Debug Adapter Protocol
addSbtPlugin("org.scalameta" % "sbt-metals" % "1.5.1")
// This plugin adds the BSP debug capability to sbt server.
addSbtPlugin("ch.epfl.scala" % "sbt-debug-adapter" % "4.2.2")
// format: on

1
project/plugins.sbt Normal file
View File

@@ -0,0 +1 @@
addSbtPlugin("com.lightbend.cinnamon" % "sbt-cinnamon" % "2.21.2")

View File

@@ -0,0 +1,14 @@
// format: off
// DO NOT EDIT! This file is auto-generated.
// This plugin enables semantic information to be produced by sbt.
// It also adds support for debugging using the Debug Adapter Protocol
addSbtPlugin("org.scalameta" % "sbt-metals" % "1.5.1")
// This plugin makes sure that the JDI tools are in the sbt classpath.
// JDI tools are used by the debug adapter server.
addSbtPlugin("com.github.sbt" % "sbt-jdi-tools" % "1.2.0")
// format: on

View File

@@ -0,0 +1,9 @@
cinnamon {
prometheus {
exporters += http-server
http-server {
port = 9095
}
}
}

View File

@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<import class="net.logstash.logback.encoder.LogstashEncoder"/>
<import class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"/>
<import class="net.logstash.logback.decorate.PrettyPrintingJsonGeneratorDecorator"/>
<appender name="STDOUT" class="ConsoleAppender">
<encoder class="LoggingEventCompositeJsonEncoder">
<jsonGeneratorDecorator class="PrettyPrintingJsonGeneratorDecorator"/>
<providers>
<timestamp/>
<pattern>
<omitEmptyFields>true</omitEmptyFields>
<pattern>
{
"traceID": "%mdc{Trace-ID}",
"logger": "%logger{36}",
"akkaSource": "%mdc{akkaSource}",
"thread": "%thread"
}
</pattern>
</pattern>
<message/>
</providers>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

View File

@@ -0,0 +1,51 @@
package com.xeppaka.app
import akka.actor.typed.ActorSystem
import akka.actor.typed.SpawnProtocol
import akka.stream.RestartSettings
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.RestartFlow
import akka.stream.scaladsl.RestartSink
import akka.stream.scaladsl.RestartSource
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.Timeout
import com.lightbend.cinnamon.akka.stream.CinnamonAttributes
import com.lightbend.cinnamon.akka.stream.CinnamonAttributes.GraphWithInstrumented
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object App {
def main(args: Array[String]): Unit = {
implicit val timeout: Timeout = 2.seconds
implicit val system: ActorSystem[SpawnProtocol.Command] = ActorSystem(SpawnProtocol(), "app-test")
implicit val executionContext: ExecutionContext = system.executionContext
val log = system.log
val restartSettings = RestartSettings(1.second, 3.seconds, 0.2)
val processor1 = Flow[Int].map(i => i - 1)
RestartSource
.withBackoff(restartSettings) { () =>
Source.fromIterator(() => Range(1, Int.MaxValue).iterator)
}
.via(RestartFlow.onFailuresWithBackoff(restartSettings) { () =>
Flow[Int]
.map(identity)
.throttle(1, 1.second)
.via(processor1)
// .map { i =>
// if (i % 100000 == 0) {
// throw new Exception("BOOM!")
// }
// i
// }
})
.to(RestartSink.withBackoff(restartSettings) { () =>
Sink.foreach(println)
})
.instrumented(name = "stream-0001")
.run()
}
}