Cinnamon with RestartXXX operators
This commit is contained in:
10
.gitignore
vendored
Normal file
10
.gitignore
vendored
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
.bsp/
|
||||||
|
.idea/
|
||||||
|
**/.bloop/
|
||||||
|
.metals/
|
||||||
|
.vscode/
|
||||||
|
|
||||||
|
target/
|
||||||
|
|
||||||
|
# Version for publishing is determined by commit ID or tag
|
||||||
|
version.sbt
|
||||||
3
.scalafmt.conf
Normal file
3
.scalafmt.conf
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
version = "3.5.9"
|
||||||
|
runner.dialect = scala3
|
||||||
|
maxColumn = 220
|
||||||
29
build.sbt
Normal file
29
build.sbt
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
import Dependencies._
|
||||||
|
|
||||||
|
Global / resolvers ++= Seq(
|
||||||
|
"Akka library repository".at("https://repo.akka.io/maven"),
|
||||||
|
"lightbend-commercial-mvn".at("https://repo.lightbend.com/pass/X6HMc6Ox2AKTAhqaLgnHjxb7yUCwpqULrhgmMc7ef3S4oYC5/commercial-releases")
|
||||||
|
)
|
||||||
|
|
||||||
|
scalaVersion := "3.3.5"
|
||||||
|
fork := true
|
||||||
|
|
||||||
|
libraryDependencies ++= Seq(
|
||||||
|
akkaStream,
|
||||||
|
akkaTyped,
|
||||||
|
scalactic % Test,
|
||||||
|
scalaLogging,
|
||||||
|
scalaTest % Test,
|
||||||
|
// Cinnamon dependencies
|
||||||
|
Cinnamon.library.cinnamonAkkaTyped,
|
||||||
|
Cinnamon.library.cinnamonAkkaStream,
|
||||||
|
Cinnamon.library.cinnamonSlf4jMdc,
|
||||||
|
Cinnamon.library.cinnamonPrometheus,
|
||||||
|
Cinnamon.library.cinnamonPrometheusHttpServer
|
||||||
|
)
|
||||||
|
|
||||||
|
run / cinnamon := true
|
||||||
|
test / cinnamon := true
|
||||||
|
cinnamonLogLevel := "INFO"
|
||||||
|
|
||||||
|
enablePlugins(CinnamonAgentOnly)
|
||||||
20
project/Dependencies.scala
Normal file
20
project/Dependencies.scala
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
import Versions._
|
||||||
|
import sbt._
|
||||||
|
|
||||||
|
case object Versions {
|
||||||
|
val akkaVersion = "2.10.0"
|
||||||
|
val logbackVersion = "1.5.8"
|
||||||
|
val scalaLoggingVersion = "3.9.5"
|
||||||
|
val scalaTestVersion = "3.2.19"
|
||||||
|
}
|
||||||
|
|
||||||
|
case object Dependencies {
|
||||||
|
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
|
||||||
|
val akkaStreamTyped = "com.typesafe.akka" %% "akka-stream-typed" % akkaVersion
|
||||||
|
val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion
|
||||||
|
val akkaTypedTestkit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion
|
||||||
|
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
|
||||||
|
val scalactic = "org.scalactic" %% "scalactic" % scalaTestVersion
|
||||||
|
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion
|
||||||
|
val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
|
||||||
|
}
|
||||||
1
project/build.properties
Normal file
1
project/build.properties
Normal file
@@ -0,0 +1 @@
|
|||||||
|
sbt.version=1.10.2
|
||||||
13
project/metals.sbt
Normal file
13
project/metals.sbt
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
// format: off
|
||||||
|
// DO NOT EDIT! This file is auto-generated.
|
||||||
|
|
||||||
|
// This plugin enables semantic information to be produced by sbt.
|
||||||
|
// It also adds support for debugging using the Debug Adapter Protocol
|
||||||
|
|
||||||
|
addSbtPlugin("org.scalameta" % "sbt-metals" % "1.5.1")
|
||||||
|
|
||||||
|
// This plugin adds the BSP debug capability to sbt server.
|
||||||
|
|
||||||
|
addSbtPlugin("ch.epfl.scala" % "sbt-debug-adapter" % "4.2.2")
|
||||||
|
|
||||||
|
// format: on
|
||||||
1
project/plugins.sbt
Normal file
1
project/plugins.sbt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
addSbtPlugin("com.lightbend.cinnamon" % "sbt-cinnamon" % "2.21.2")
|
||||||
14
project/project/metals.sbt
Normal file
14
project/project/metals.sbt
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
// format: off
|
||||||
|
// DO NOT EDIT! This file is auto-generated.
|
||||||
|
|
||||||
|
// This plugin enables semantic information to be produced by sbt.
|
||||||
|
// It also adds support for debugging using the Debug Adapter Protocol
|
||||||
|
|
||||||
|
addSbtPlugin("org.scalameta" % "sbt-metals" % "1.5.1")
|
||||||
|
|
||||||
|
// This plugin makes sure that the JDI tools are in the sbt classpath.
|
||||||
|
// JDI tools are used by the debug adapter server.
|
||||||
|
|
||||||
|
addSbtPlugin("com.github.sbt" % "sbt-jdi-tools" % "1.2.0")
|
||||||
|
|
||||||
|
// format: on
|
||||||
9
src/main/resources/application.conf
Normal file
9
src/main/resources/application.conf
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
|
||||||
|
cinnamon {
|
||||||
|
prometheus {
|
||||||
|
exporters += http-server
|
||||||
|
http-server {
|
||||||
|
port = 9095
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
35
src/main/resources/logback.xml
Normal file
35
src/main/resources/logback.xml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE configuration>
|
||||||
|
|
||||||
|
<configuration>
|
||||||
|
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
|
||||||
|
<import class="ch.qos.logback.core.ConsoleAppender"/>
|
||||||
|
<import class="net.logstash.logback.encoder.LogstashEncoder"/>
|
||||||
|
<import class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"/>
|
||||||
|
<import class="net.logstash.logback.decorate.PrettyPrintingJsonGeneratorDecorator"/>
|
||||||
|
|
||||||
|
<appender name="STDOUT" class="ConsoleAppender">
|
||||||
|
<encoder class="LoggingEventCompositeJsonEncoder">
|
||||||
|
<jsonGeneratorDecorator class="PrettyPrintingJsonGeneratorDecorator"/>
|
||||||
|
<providers>
|
||||||
|
<timestamp/>
|
||||||
|
<pattern>
|
||||||
|
<omitEmptyFields>true</omitEmptyFields>
|
||||||
|
<pattern>
|
||||||
|
{
|
||||||
|
"traceID": "%mdc{Trace-ID}",
|
||||||
|
"logger": "%logger{36}",
|
||||||
|
"akkaSource": "%mdc{akkaSource}",
|
||||||
|
"thread": "%thread"
|
||||||
|
}
|
||||||
|
</pattern>
|
||||||
|
</pattern>
|
||||||
|
<message/>
|
||||||
|
</providers>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<root level="INFO">
|
||||||
|
<appender-ref ref="STDOUT"/>
|
||||||
|
</root>
|
||||||
|
</configuration>
|
||||||
0
src/main/resources/reference.conf
Normal file
0
src/main/resources/reference.conf
Normal file
52
src/main/scala/com/xeppaka/app/App.scala
Normal file
52
src/main/scala/com/xeppaka/app/App.scala
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package com.xeppaka.app
|
||||||
|
|
||||||
|
import akka.actor.typed.ActorSystem
|
||||||
|
import akka.actor.typed.SpawnProtocol
|
||||||
|
import akka.stream.RestartSettings
|
||||||
|
import akka.stream.scaladsl.Flow
|
||||||
|
import akka.stream.scaladsl.RestartFlow
|
||||||
|
import akka.stream.scaladsl.RestartSink
|
||||||
|
import akka.stream.scaladsl.RestartSource
|
||||||
|
import akka.stream.scaladsl.Sink
|
||||||
|
import akka.stream.scaladsl.Source
|
||||||
|
import akka.util.Timeout
|
||||||
|
import com.lightbend.cinnamon.akka.stream.CinnamonAttributes
|
||||||
|
import com.lightbend.cinnamon.akka.stream.CinnamonAttributes.GraphWithInstrumented
|
||||||
|
|
||||||
|
import scala.concurrent.ExecutionContext
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
object App {
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
implicit val timeout: Timeout = 2.seconds
|
||||||
|
implicit val system: ActorSystem[SpawnProtocol.Command] = ActorSystem(SpawnProtocol(), "app-test")
|
||||||
|
implicit val executionContext: ExecutionContext = system.executionContext
|
||||||
|
val log = system.log
|
||||||
|
|
||||||
|
val restartSettings = RestartSettings(1.second, 3.seconds, 0.2)
|
||||||
|
val processor1 = Flow[Int].map(i => i - 1)
|
||||||
|
|
||||||
|
RestartSource
|
||||||
|
.withBackoff(restartSettings) { () =>
|
||||||
|
Source.fromIterator(() => Range(1, Int.MaxValue).iterator)
|
||||||
|
}
|
||||||
|
.via(RestartFlow.onFailuresWithBackoff(restartSettings) { () =>
|
||||||
|
Flow[Int]
|
||||||
|
.map(identity)
|
||||||
|
.throttle(1, 1.second)
|
||||||
|
.via(processor1)
|
||||||
|
// .map { i =>
|
||||||
|
// if (i % 100000 == 0) {
|
||||||
|
// throw new Exception("BOOM!")
|
||||||
|
// }
|
||||||
|
// i
|
||||||
|
// }
|
||||||
|
})
|
||||||
|
.to(RestartSink.withBackoff(restartSettings) { () =>
|
||||||
|
Sink.foreach(println)
|
||||||
|
})
|
||||||
|
// .addAttributes(CinnamonAttributes.instrumentedByName())
|
||||||
|
.instrumented(name = "stream-0001")
|
||||||
|
.run()
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user