Upgrade to scala 2.13.1 and akka 2.6.3
This commit is contained in:
@@ -13,12 +13,13 @@ lazy val `telegram-bot-delivery` = (project in file("."))
|
||||
name := "telegram-bot-delivery",
|
||||
libraryDependencies ++= Seq(
|
||||
scalaTest % Test,
|
||||
akka,
|
||||
akkaTyped,
|
||||
akkaClusterShardingTyped,
|
||||
akkaHttp,
|
||||
akkaStream,
|
||||
akkaPersistence,
|
||||
akkaPersistenceCassandra,
|
||||
akkaPersistenceQuery,
|
||||
levelDbJni,
|
||||
circleCore,
|
||||
circleGeneric,
|
||||
@@ -26,13 +27,13 @@ lazy val `telegram-bot-delivery` = (project in file("."))
|
||||
circeAkkaHttp,
|
||||
slibTelegram
|
||||
),
|
||||
dockerBaseImage := "openjdk:13-jdk-oracle",
|
||||
dockerBaseImage := "openjdk:11",
|
||||
dockerExposedPorts := Seq(8443),
|
||||
dockerRepository := Some("registry.xeppaka.eu:443"),
|
||||
Docker / daemonUserUid := Some("1001"),
|
||||
Docker / daemonUser := "telegram-bot",
|
||||
Docker / defaultLinuxInstallLocation := "/opt/telegram-bot-delivery",
|
||||
version := "1.0.1"
|
||||
version := "1.1.1"
|
||||
)
|
||||
.enablePlugins(JavaServerAppPackaging)
|
||||
.enablePlugins(DockerPlugin)
|
||||
|
||||
@@ -4,22 +4,23 @@ import Dependencies.Versions._
|
||||
|
||||
object Dependencies {
|
||||
object Versions {
|
||||
val akkaVersion = "2.5.26"
|
||||
val akkaHttpVersion = "10.1.10"
|
||||
val akkaPersistenceCassandraVersion = "0.100"
|
||||
val akkaVersion = "2.6.3"
|
||||
val akkaHttpVersion = "10.1.11"
|
||||
val akkaPersistenceCassandraVersion = "0.103"
|
||||
val levelDbJniVersion = "1.8"
|
||||
val circeVersion = "0.12.3"
|
||||
val akkaHttpCirceVersion = "1.29.1"
|
||||
val scalaTestVersion = "3.2.0-M1"
|
||||
val circeVersion = "0.13.0"
|
||||
val akkaHttpCirceVersion = "1.31.0"
|
||||
val scalaTestVersion = "3.2.0-M4"
|
||||
val slibTelegramVersion = "0.1.0"
|
||||
}
|
||||
|
||||
val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion
|
||||
val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion
|
||||
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
|
||||
val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
|
||||
val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion
|
||||
val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.100"
|
||||
val akkaClusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion
|
||||
val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandraVersion
|
||||
val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion
|
||||
val levelDbJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % levelDbJniVersion
|
||||
val circleCore = "io.circe" %% "circe-core" % circeVersion
|
||||
val circleGeneric = "io.circe" %% "circe-generic" % circeVersion
|
||||
|
||||
@@ -1 +1 @@
|
||||
sbt.version=1.3.3
|
||||
sbt.version=1.3.8
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.4.1")
|
||||
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.12")
|
||||
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1")
|
||||
@@ -1,11 +1,11 @@
|
||||
package eu.xeppaka.bot
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.typed.scaladsl.{Behaviors, StashBuffer}
|
||||
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector, SupervisorStrategy}
|
||||
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import akka.util.{ByteString, Timeout}
|
||||
import eu.xeppaka.telegram.bot.TelegramEntities._
|
||||
@@ -59,10 +59,9 @@ object CheckDeliveryDialog {
|
||||
private val removeKeyboard = Some(ReplyKeyboardRemove())
|
||||
|
||||
def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
implicit val materializer: ActorMaterializer = ActorMaterializer()(ctx.system.toClassic)
|
||||
implicit val executionContext: ExecutionContext = ctx.system.dispatchers.lookup(DispatcherSelector.default())
|
||||
val http = Http()(ctx.system.toClassic)
|
||||
val stashBuffer = StashBuffer[Command](100)
|
||||
implicit val system: ActorSystem = ctx.system.toClassic
|
||||
implicit val executionContext: ExecutionContext = ctx.executionContext
|
||||
val http = Http()
|
||||
val deliveryStateAdapter: ActorRef[CzechPostDeliveryCheck.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state))
|
||||
val czechPostDeliveryCheck = ctx.spawnAnonymous(Behaviors.supervise(CzechPostDeliveryCheck.behavior(chatId.toString, deliveryStateAdapter)).onFailure(SupervisorStrategy.restart))
|
||||
|
||||
@@ -96,69 +95,73 @@ object CheckDeliveryDialog {
|
||||
Behaviors.unhandled
|
||||
}
|
||||
|
||||
def addParcel(parcelId: String, comment: String): Behavior[Command] = Behaviors.setup { ctx =>
|
||||
case object AddParcelSuccess extends Command
|
||||
case class AddParcelFailure(exception: Throwable) extends Command
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
def addParcel(parcelId: String, comment: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
|
||||
Behaviors.setup { ctx =>
|
||||
case object AddParcelSuccess extends Command
|
||||
case class AddParcelFailure(exception: Throwable) extends Command
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
|
||||
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) {
|
||||
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess
|
||||
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception)
|
||||
case Failure(exception) => AddParcelFailure(exception)
|
||||
}
|
||||
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) {
|
||||
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess
|
||||
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception)
|
||||
case Failure(exception) => AddParcelFailure(exception)
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case AddParcelSuccess =>
|
||||
val message = SendMessage(chatId, s"Parcel $parcelId was added to the watch list.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case AddParcelFailure(exception) =>
|
||||
exception match {
|
||||
case CzechPostDeliveryCheck.DuplicateParcelId(_) =>
|
||||
val message = SendMessage(chatId, s"Parcel $parcelId is in the watch list already.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case _ =>
|
||||
ctx.log.error(exception, "action=add_parcel result=failure")
|
||||
val message = SendMessage(chatId, s"Adding parcel failed. Please try again.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
}
|
||||
case otherMessage =>
|
||||
stashBuffer.stash(otherMessage)
|
||||
Behaviors.same
|
||||
Behaviors.receiveMessage {
|
||||
case AddParcelSuccess =>
|
||||
val message = SendMessage(chatId, s"Parcel $parcelId was added to the watch list.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case AddParcelFailure(exception) =>
|
||||
exception match {
|
||||
case CzechPostDeliveryCheck.DuplicateParcelId(_) =>
|
||||
val message = SendMessage(chatId, s"Parcel $parcelId is in the watch list already.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case _ =>
|
||||
ctx.log.error("action=add_parcel result=failure", exception)
|
||||
val message = SendMessage(chatId, s"Adding parcel failed. Please try again.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
}
|
||||
case otherMessage =>
|
||||
stashBuffer.stash(otherMessage)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def listParcels: Behavior[Command] = Behaviors.setup { ctx =>
|
||||
case class ListParcelsSuccess(parcelsList: Seq[String]) extends Command
|
||||
case class ListParcelsFailure(exception: Throwable) extends Command
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
def listParcels: Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
|
||||
Behaviors.setup { ctx =>
|
||||
case class ListParcelsSuccess(parcelsList: Seq[String]) extends Command
|
||||
case class ListParcelsFailure(exception: Throwable) extends Command
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
|
||||
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.ListParcels(ref)) {
|
||||
case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList)
|
||||
case Failure(exception) => ListParcelsFailure(exception)
|
||||
}
|
||||
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcels(ref)) {
|
||||
case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList)
|
||||
case Failure(exception) => ListParcelsFailure(exception)
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case ListParcelsSuccess(parcelsList) =>
|
||||
val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.sorted.mkString("\n") else "(empty)")
|
||||
val message = SendMessage(chatId, messageText, Some("Markdown"), reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case ListParcelsFailure(exception) =>
|
||||
ctx.log.error(exception, "action=list_parcels result=failure chat_id={}", chatId)
|
||||
val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case otherMessage =>
|
||||
stashBuffer.stash(otherMessage)
|
||||
Behaviors.same
|
||||
Behaviors.receiveMessage {
|
||||
case ListParcelsSuccess(parcelsList) =>
|
||||
val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.sorted.mkString("\n") else "(empty)")
|
||||
val message = SendMessage(chatId, messageText, Some("Markdown"), reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case ListParcelsFailure(exception) =>
|
||||
ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception)
|
||||
val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case otherMessage =>
|
||||
stashBuffer.stash(otherMessage)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def removeParcel(onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] =
|
||||
def removeParcel(onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
|
||||
Behaviors.setup { ctx =>
|
||||
case class ListParcelIdsSuccess(parcelsList: Seq[String]) extends Command
|
||||
case class ListParcelIdsFailure(exception: Throwable) extends Command
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
|
||||
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.ListParcelIds(ref)) {
|
||||
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcelIds(ref)) {
|
||||
case Success(CzechPostDeliveryCheck.ListParcelIdsResult(parcelsList)) => ListParcelIdsSuccess(parcelsList)
|
||||
case Failure(exception) => ListParcelIdsFailure(exception)
|
||||
}
|
||||
@@ -175,7 +178,7 @@ object CheckDeliveryDialog {
|
||||
sendMessage(message, onSuccess, onFailure)
|
||||
}
|
||||
case ListParcelIdsFailure(exception) =>
|
||||
ctx.log.error(exception, "action=list_parcels result=failure chat_id={}", chatId)
|
||||
ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception)
|
||||
val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case otherMessage =>
|
||||
@@ -183,35 +186,38 @@ object CheckDeliveryDialog {
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def removeParcelId(parcelId: String): Behavior[Command] = Behaviors.setup { ctx =>
|
||||
case object RemoveParcelSuccess extends Command
|
||||
case class RemoveParcelFailure(exception: Throwable) extends Command
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
def removeParcelId(parcelId: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
|
||||
Behaviors.setup { ctx =>
|
||||
case object RemoveParcelSuccess extends Command
|
||||
case class RemoveParcelFailure(exception: Throwable) extends Command
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
|
||||
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) {
|
||||
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess
|
||||
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception)
|
||||
case Failure(exception) => RemoveParcelFailure(exception)
|
||||
}
|
||||
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) {
|
||||
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess
|
||||
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception)
|
||||
case Failure(exception) => RemoveParcelFailure(exception)
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case RemoveParcelSuccess =>
|
||||
val message = SendMessage(chatId, s"Parcel $parcelId was removed from the watch list.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case RemoveParcelFailure(exception) =>
|
||||
exception match {
|
||||
case CzechPostDeliveryCheck.ParcelIdNotFound(_) =>
|
||||
val message = SendMessage(chatId, s"Parcel $parcelId is not found in the list of the watched parcels.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case _ =>
|
||||
ctx.log.error(exception, "action=add_parcel result=failure")
|
||||
val message = SendMessage(chatId, s"Remove of the parcel failed. Please try again.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
}
|
||||
case otherMessage =>
|
||||
stashBuffer.stash(otherMessage)
|
||||
Behaviors.same
|
||||
Behaviors.receiveMessage {
|
||||
case RemoveParcelSuccess =>
|
||||
val message = SendMessage(chatId, s"Parcel $parcelId was removed from the watch list.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case RemoveParcelFailure(exception) =>
|
||||
exception match {
|
||||
case CzechPostDeliveryCheck.ParcelIdNotFound(_) =>
|
||||
val message = SendMessage(chatId, s"Parcel $parcelId is not found in the list of the watched parcels.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
case _ =>
|
||||
ctx.log.error("action=add_parcel result=failure", exception)
|
||||
val message = SendMessage(chatId, s"Remove of the parcel failed. Please try again.", reply_markup = commandsKeyboard)
|
||||
sendMessage(message, waitCommand, waitCommand)
|
||||
}
|
||||
case otherMessage =>
|
||||
stashBuffer.stash(otherMessage)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -225,66 +231,16 @@ object CheckDeliveryDialog {
|
||||
// sendMessage(message, waitParcelId(parcelId => addParcel(parcelId)), waitCommand)
|
||||
// }
|
||||
|
||||
def waitTextMessage(onFinish: String => Behavior[Command]): Behavior[Command] = Behaviors.receiveMessage {
|
||||
case ProcessMessage(msg, replyTo) =>
|
||||
if (msg.text.isDefined) {
|
||||
val parcelId = msg.text.get
|
||||
replyTo ! ProcessMessageSuccess
|
||||
onFinish(parcelId)
|
||||
} else {
|
||||
replyTo ! ProcessMessageSuccess
|
||||
waitTextMessage(onFinish)
|
||||
}
|
||||
case otherMsg =>
|
||||
stashBuffer.stash(otherMsg)
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
import io.circe.generic.auto._
|
||||
import io.circe.syntax._
|
||||
|
||||
case object SendMessageSuccess extends Command
|
||||
case class SendMessageFailure(exception: Throwable) extends Command
|
||||
|
||||
val json = printer.print(message.asJson)
|
||||
val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json)))
|
||||
|
||||
ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, json)
|
||||
|
||||
Source
|
||||
.single(request)
|
||||
.initialDelay(2.seconds * (attempt - 1))
|
||||
.mapAsync(1) { request =>
|
||||
http
|
||||
.singleRequest(request)
|
||||
.transform {
|
||||
case Success(response) =>
|
||||
if (response.status.isSuccess()) {
|
||||
Success(SendMessageSuccess)
|
||||
} else {
|
||||
Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}.")))
|
||||
}
|
||||
case Failure(exception) =>
|
||||
ctx.log.error(exception, "action=send_message status=finished result=failure chat_id={}", chatId)
|
||||
Success(SendMessageFailure(exception))
|
||||
}
|
||||
}
|
||||
.to(Sink.foreach(ctx.self ! _))
|
||||
.run()
|
||||
|
||||
def waitTextMessage(onFinish: String => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
|
||||
Behaviors.receiveMessage {
|
||||
case SendMessageSuccess =>
|
||||
ctx.log.debug("action=send_message status=finished result=success chat_id={}", chatId)
|
||||
stashBuffer.unstashAll(ctx, onSuccess)
|
||||
case SendMessageFailure(exception) =>
|
||||
ctx.log.error(exception, "action=send_message status=finished result=failure chat_id={} attempt={}", chatId, attempt)
|
||||
|
||||
if (attempt > 5) {
|
||||
ctx.log.error(exception, "action=send_message result=failure message=attempts threshold exceeded")
|
||||
stashBuffer.unstashAll(ctx, onFailure)
|
||||
case ProcessMessage(msg, replyTo) =>
|
||||
if (msg.text.isDefined) {
|
||||
val parcelId = msg.text.get
|
||||
replyTo ! ProcessMessageSuccess
|
||||
onFinish(parcelId)
|
||||
} else {
|
||||
sendMessage(message, onSuccess, onFailure, attempt + 1)
|
||||
replyTo ! ProcessMessageSuccess
|
||||
waitTextMessage(onFinish)
|
||||
}
|
||||
case otherMsg =>
|
||||
stashBuffer.stash(otherMsg)
|
||||
@@ -292,6 +248,60 @@ object CheckDeliveryDialog {
|
||||
}
|
||||
}
|
||||
|
||||
def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
|
||||
Behaviors.setup[Command] { ctx =>
|
||||
import io.circe.generic.auto._
|
||||
import io.circe.syntax._
|
||||
|
||||
case object SendMessageSuccess extends Command
|
||||
case class SendMessageFailure(exception: Throwable) extends Command
|
||||
|
||||
val json = printer.print(message.asJson)
|
||||
val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json)))
|
||||
|
||||
ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, json)
|
||||
|
||||
Source
|
||||
.single(request)
|
||||
.initialDelay(2.seconds * (attempt - 1))
|
||||
.mapAsync(1) { request =>
|
||||
http
|
||||
.singleRequest(request)
|
||||
.transform {
|
||||
case Success(response) =>
|
||||
if (response.status.isSuccess()) {
|
||||
Success(SendMessageSuccess)
|
||||
} else {
|
||||
Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}.")))
|
||||
}
|
||||
case Failure(exception) =>
|
||||
ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId", exception)
|
||||
Success(SendMessageFailure(exception))
|
||||
}
|
||||
}
|
||||
.to(Sink.foreach(ctx.self ! _))
|
||||
.run()
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case SendMessageSuccess =>
|
||||
ctx.log.debug("action=send_message status=finished result=success chat_id={}", chatId)
|
||||
stashBuffer.unstashAll(onSuccess)
|
||||
case SendMessageFailure(exception) =>
|
||||
ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId attempt=$attempt", exception)
|
||||
|
||||
if (attempt > 5) {
|
||||
ctx.log.error("action=send_message result=failure message=attempts threshold exceeded", exception)
|
||||
stashBuffer.unstashAll(onFailure)
|
||||
} else {
|
||||
sendMessage(message, onSuccess, onFailure, attempt + 1)
|
||||
}
|
||||
case otherMsg =>
|
||||
stashBuffer.stash(otherMsg)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
waitCommand
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,8 @@ import java.security.cert.X509Certificate
|
||||
import java.text.SimpleDateFormat
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
|
||||
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
|
||||
import akka.http.scaladsl.UseHttp2.Negotiated
|
||||
import akka.http.scaladsl.model._
|
||||
@@ -16,7 +16,6 @@ import akka.http.scaladsl.{Http, HttpsConnectionContext}
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler}
|
||||
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
|
||||
import akka.stream.ActorMaterializer
|
||||
import com.typesafe.sslconfig.akka.AkkaSSLConfig
|
||||
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
|
||||
import io.circe.generic.auto._
|
||||
@@ -56,6 +55,7 @@ object Entities {
|
||||
object CzechPostDeliveryCheck {
|
||||
private val czechPostDateFormat = new SimpleDateFormat("yyyy-MM-dd")
|
||||
private val printDateFormat = new SimpleDateFormat("dd-MM-yyyy")
|
||||
private val entityType = "czechpost"
|
||||
|
||||
sealed trait Command
|
||||
sealed trait CommandResult
|
||||
@@ -123,154 +123,153 @@ object CzechPostDeliveryCheck {
|
||||
context
|
||||
}
|
||||
|
||||
def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
Behaviors.withTimers(scheduler => checkParcel(chatId, stateReporter, scheduler))
|
||||
}
|
||||
def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = checkParcel(chatId, stateReporter)
|
||||
|
||||
private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged], scheduler: TimerScheduler[Command]): Behavior[Command] = Behaviors.setup { ctx =>
|
||||
implicit val actorSystem: ActorSystem = ctx.system.toUntyped
|
||||
implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
|
||||
implicit val materializer: ActorMaterializer = ActorMaterializer()
|
||||
val http = Http()
|
||||
val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose
|
||||
.withAcceptAnyCertificate(true)
|
||||
.withDisableHostnameVerification(true)))
|
||||
val originalCtx = http.createClientHttpsContext(badSslConfig)
|
||||
val sslContext = new HttpsConnectionContext(
|
||||
trustfulSslContext,
|
||||
originalCtx.sslConfig,
|
||||
originalCtx.enabledCipherSuites,
|
||||
originalCtx.enabledProtocols,
|
||||
originalCtx.clientAuth,
|
||||
originalCtx.sslParameters,
|
||||
Negotiated
|
||||
)
|
||||
val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0")))
|
||||
val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings)
|
||||
private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.withTimers { scheduler =>
|
||||
Behaviors.setup { ctx =>
|
||||
implicit val actorSystem: ActorSystem = ctx.system.toClassic
|
||||
implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
|
||||
val http = Http()
|
||||
val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose
|
||||
.withAcceptAnyCertificate(true)
|
||||
.withDisableHostnameVerification(true)))
|
||||
val originalCtx = http.createClientHttpsContext(badSslConfig)
|
||||
val sslContext = new HttpsConnectionContext(
|
||||
trustfulSslContext,
|
||||
originalCtx.sslConfig,
|
||||
originalCtx.enabledCipherSuites,
|
||||
originalCtx.enabledProtocols,
|
||||
originalCtx.clientAuth,
|
||||
originalCtx.sslParameters,
|
||||
Negotiated
|
||||
)
|
||||
val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0")))
|
||||
val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings)
|
||||
|
||||
scheduler.startPeriodicTimer("check-delivery-state", CheckParcels, 5.minutes)
|
||||
scheduler.startPeriodicTimer("check-delivery-state", CheckParcels, 5.minutes)
|
||||
|
||||
val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
|
||||
cmd match {
|
||||
case AddParcel(parcelId, comment, replyTo) =>
|
||||
val parcelIdUpper = parcelId.toUpperCase
|
||||
if (state.parcelStates.keySet.contains(parcelIdUpper)) {
|
||||
val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
|
||||
cmd match {
|
||||
case AddParcel(parcelId, comment, replyTo) =>
|
||||
val parcelIdUpper = parcelId.toUpperCase
|
||||
if (state.parcelStates.keySet.contains(parcelIdUpper)) {
|
||||
Effect
|
||||
.none
|
||||
.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper)))
|
||||
} else {
|
||||
Effect
|
||||
.persist(ParcelAdded(parcelIdUpper, comment))
|
||||
.thenRun(_ => {
|
||||
replyTo ! CommandResultSuccess
|
||||
ctx.self ! CheckParcels
|
||||
})
|
||||
}
|
||||
case RemoveParcel(parcelId, replyTo) =>
|
||||
val parcelIdUpper = parcelId.toUpperCase
|
||||
if (state.parcelStates.contains(parcelIdUpper)) {
|
||||
Effect
|
||||
.persist(ParcelRemoved(parcelIdUpper))
|
||||
.thenRun(_ => replyTo ! CommandResultSuccess)
|
||||
} else {
|
||||
Effect
|
||||
.none
|
||||
.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper)))
|
||||
}
|
||||
|
||||
case ListParcels(replyTo) =>
|
||||
Effect.none
|
||||
.thenRun { state =>
|
||||
val parcelsList = state.latestStatesPrint
|
||||
replyTo ! ListParcelsResult(parcelsList)
|
||||
}
|
||||
|
||||
case ListParcelIds(replyTo) =>
|
||||
Effect.none
|
||||
.thenRun { state =>
|
||||
replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq)
|
||||
}
|
||||
|
||||
case CheckParcels =>
|
||||
Effect
|
||||
.none
|
||||
.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper)))
|
||||
} else {
|
||||
.thenRun { _ =>
|
||||
ctx.log.info("action=check_parcel_state chat_id={}", chatId)
|
||||
val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id"))
|
||||
|
||||
for (ids <- parcelIds) {
|
||||
val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz")
|
||||
val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`)))
|
||||
|
||||
ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri)
|
||||
|
||||
http
|
||||
.singleRequest(request, connectionContext = sslContext, settings = connectionSettings)
|
||||
.transform {
|
||||
case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}."))
|
||||
case response: Failure[HttpResponse] => response
|
||||
}
|
||||
.flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]])
|
||||
.andThen {
|
||||
case Success(parcelHistories) =>
|
||||
parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory))
|
||||
case Failure(exception) =>
|
||||
ctx.log.error("Error checking parcel history.", exception)
|
||||
}
|
||||
.andThen {
|
||||
case Success(_) => ctx.log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri)
|
||||
case Failure(exception) => ctx.log.error(s"action=check_parcel_state result=failure chat_id=$chatId check_uri=$checkUri", exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
case ParcelHistoryRetrieved(parcelHistory) =>
|
||||
val parcelId = parcelHistory.id
|
||||
val parcelState = state.parcelStates(parcelId)
|
||||
val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty)
|
||||
Some(parcelHistory.attributes)
|
||||
else
|
||||
parcelState.attributes
|
||||
.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None))
|
||||
.map(attributes => ParcelAttributesChanged(parcelId, attributes))
|
||||
.toSeq
|
||||
|
||||
val newStates = parcelHistory.states.state.toSet -- parcelState.states
|
||||
val stateEvents: Seq[Event] = newStates
|
||||
.map(state => ParcelHistoryStateAdded(parcelId, state))
|
||||
.toSeq
|
||||
val comment = state.parcelStates(parcelId).comment
|
||||
|
||||
Effect
|
||||
.persist(ParcelAdded(parcelIdUpper, comment))
|
||||
.persist(attributesChangedEvents ++ stateEvents)
|
||||
.thenRun(_ => {
|
||||
replyTo ! CommandResultSuccess
|
||||
ctx.self ! CheckParcels
|
||||
if (newStates.nonEmpty) {
|
||||
stateReporter ! DeliveryStateChanged(Parcel(comment, None, newStates).fullStatePrint(parcelId))
|
||||
}
|
||||
})
|
||||
}
|
||||
case RemoveParcel(parcelId, replyTo) =>
|
||||
val parcelIdUpper = parcelId.toUpperCase
|
||||
if (state.parcelStates.contains(parcelIdUpper)) {
|
||||
Effect
|
||||
.persist(ParcelRemoved(parcelIdUpper))
|
||||
.thenRun(_ => replyTo ! CommandResultSuccess)
|
||||
} else {
|
||||
Effect
|
||||
.none
|
||||
.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper)))
|
||||
}
|
||||
|
||||
case ListParcels(replyTo) =>
|
||||
Effect.none
|
||||
.thenRun { state =>
|
||||
val parcelsList = state.latestStatesPrint
|
||||
replyTo ! ListParcelsResult(parcelsList)
|
||||
}
|
||||
|
||||
case ListParcelIds(replyTo) =>
|
||||
Effect.none
|
||||
.thenRun { state =>
|
||||
replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq)
|
||||
}
|
||||
|
||||
case CheckParcels =>
|
||||
Effect
|
||||
.none
|
||||
.thenRun { _ =>
|
||||
ctx.log.info("action=check_parcel_state chat_id={}", chatId)
|
||||
val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id"))
|
||||
|
||||
for (ids <- parcelIds) {
|
||||
val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz")
|
||||
val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`)))
|
||||
|
||||
ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri)
|
||||
|
||||
http
|
||||
.singleRequest(request, connectionContext = sslContext, settings = connectionSettings)
|
||||
.transform {
|
||||
case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}."))
|
||||
case response: Failure[HttpResponse] => response
|
||||
}
|
||||
.flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]])
|
||||
.andThen {
|
||||
case Success(parcelHistories) =>
|
||||
parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory))
|
||||
case Failure(exception) =>
|
||||
ctx.log.error(exception, "Error checking parcel history.")
|
||||
}
|
||||
.andThen {
|
||||
case Success(_) => ctx.log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri)
|
||||
case Failure(exception) => ctx.log.error(exception, "action=check_parcel_state result=failure chat_id={} check_uri={}", chatId, checkUri)
|
||||
}
|
||||
}
|
||||
}
|
||||
case ParcelHistoryRetrieved(parcelHistory) =>
|
||||
val parcelId = parcelHistory.id
|
||||
val parcelState = state.parcelStates(parcelId)
|
||||
val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty)
|
||||
Some(parcelHistory.attributes)
|
||||
else
|
||||
parcelState.attributes
|
||||
.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None))
|
||||
.map(attributes => ParcelAttributesChanged(parcelId, attributes))
|
||||
.toSeq
|
||||
|
||||
val newStates = parcelHistory.states.state.toSet -- parcelState.states
|
||||
val stateEvents: Seq[Event] = newStates
|
||||
.map(state => ParcelHistoryStateAdded(parcelId, state))
|
||||
.toSeq
|
||||
val comment = state.parcelStates(parcelId).comment
|
||||
|
||||
Effect
|
||||
.persist(attributesChangedEvents ++ stateEvents)
|
||||
.thenRun(_ => {
|
||||
if (newStates.nonEmpty) {
|
||||
stateReporter ! DeliveryStateChanged(Parcel(comment, None, newStates).fullStatePrint(parcelId))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val eventHandler: EventHandler[State, Event] = (state, evt) => {
|
||||
evt match {
|
||||
case ParcelAdded(parcelId, comment) =>
|
||||
state.copy(parcelStates = state.parcelStates + (parcelId -> Parcel(comment)))
|
||||
case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId)
|
||||
case ParcelHistoryStateAdded(parcelId, newState) =>
|
||||
val parcelState = state.parcelStates(parcelId)
|
||||
val newParcelState = parcelState.copy(states = parcelState.states + newState)
|
||||
state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
|
||||
case ParcelAttributesChanged(parcelId, newAttributes) =>
|
||||
val parcelState = state.parcelStates(parcelId)
|
||||
val newParcelState = parcelState.copy(attributes = Some(newAttributes))
|
||||
state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
|
||||
val eventHandler: EventHandler[State, Event] = (state, evt) => {
|
||||
evt match {
|
||||
case ParcelAdded(parcelId, comment) =>
|
||||
state.copy(parcelStates = state.parcelStates + (parcelId -> Parcel(comment)))
|
||||
case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId)
|
||||
case ParcelHistoryStateAdded(parcelId, newState) =>
|
||||
val parcelState = state.parcelStates(parcelId)
|
||||
val newParcelState = parcelState.copy(states = parcelState.states + newState)
|
||||
state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
|
||||
case ParcelAttributesChanged(parcelId, newAttributes) =>
|
||||
val parcelState = state.parcelStates(parcelId)
|
||||
val newParcelState = parcelState.copy(attributes = Some(newAttributes))
|
||||
state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
EventSourcedBehavior[Command, Event, State](
|
||||
persistenceId = PersistenceId(s"$chatId-czechpost"),
|
||||
emptyState = State(),
|
||||
commandHandler = commandHandler,
|
||||
eventHandler = eventHandler
|
||||
)
|
||||
EventSourcedBehavior[Command, Event, State](
|
||||
persistenceId = PersistenceId(entityType, chatId),
|
||||
emptyState = State(),
|
||||
commandHandler = commandHandler,
|
||||
eventHandler = eventHandler
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
package eu.xeppaka.bot
|
||||
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler}
|
||||
import akka.persistence.typed.scaladsl.{Effect, EffectBuilder, EventSourcedBehavior}
|
||||
import akka.actor.typed.{ActorRef, Behavior}
|
||||
import akka.util.Timeout
|
||||
import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess}
|
||||
import eu.xeppaka.telegram.bot.TelegramEntities._
|
||||
@@ -24,77 +21,34 @@ object DialogManager {
|
||||
private case class DialogResponseSuccess(dialogId: Long, replyTo: ActorRef[CommandResult]) extends Command
|
||||
private case class DialogResponseFailure(dialogId: Long, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command
|
||||
|
||||
sealed trait Event
|
||||
private case class DialogAdded(chatId: Long) extends Event
|
||||
|
||||
case class State(dialogs: Map[Long, ActorRef[CheckDeliveryDialog.Command]] = Map.empty)
|
||||
|
||||
def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
|
||||
cmd match {
|
||||
case ProcessUpdate(update, replyTo) =>
|
||||
if (update.message.isDefined) {
|
||||
val chatId = update.message.get.chat.id
|
||||
Behaviors.receiveMessagePartial {
|
||||
case ProcessUpdate(update, replyTo) =>
|
||||
if (update.message.isDefined) {
|
||||
val chatId = update.message.get.chat.id
|
||||
ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get)
|
||||
val msg = update.message.get
|
||||
val dialogActor = ctx.child(chatId.toString).getOrElse(ctx.spawn(CheckDeliveryDialog.behavior(chatId, botUri), chatId.toString)).unsafeUpcast[CheckDeliveryDialog.Command]
|
||||
ctx.log.info("action=ask_dialog id={}", chatId)
|
||||
|
||||
val effect: EffectBuilder[Event, State] = if (state.dialogs.contains(chatId)) {
|
||||
Effect.none
|
||||
} else {
|
||||
Effect.persist(DialogAdded(chatId))
|
||||
}
|
||||
|
||||
effect
|
||||
.thenRun(_ => ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get))
|
||||
.thenRun { state =>
|
||||
val msg = update.message.get
|
||||
val dialogActor = state.dialogs(chatId)
|
||||
|
||||
ctx.log.info("action=ask_dialog id={}", chatId)
|
||||
|
||||
implicit val timeout: Timeout = 20.seconds
|
||||
ctx.ask(dialogActor)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) {
|
||||
case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo)
|
||||
case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo)
|
||||
case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Effect
|
||||
.none
|
||||
.thenRun { _ =>
|
||||
ctx.log.debug("action=process_update result=success message=update message is empty")
|
||||
}
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
ctx.ask[CheckDeliveryDialog.Command, CheckDeliveryDialog.CommandResult](dialogActor, replyTo => CheckDeliveryDialog.ProcessMessage(msg, replyTo)) {
|
||||
case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo)
|
||||
case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo)
|
||||
case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo)
|
||||
}
|
||||
|
||||
case DialogResponseSuccess(dialogId, replyTo) =>
|
||||
Effect
|
||||
.none
|
||||
.thenRun { _ =>
|
||||
ctx.log.info("action=ask_dialog id={} result=success", dialogId)
|
||||
replyTo ! ProcessUpdateSuccess
|
||||
}
|
||||
case DialogResponseFailure(dialogId, exception, replyTo) =>
|
||||
Effect
|
||||
.none
|
||||
.thenRun { _ =>
|
||||
ctx.log.error(exception, "action=ask_dialog id={} result=failure", dialogId)
|
||||
replyTo ! ProcessUpdateFailure(exception)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ctx.log.debug("action=process_update result=success message=update message is empty")
|
||||
}
|
||||
Behaviors.same
|
||||
case DialogResponseSuccess(dialogId, replyTo) =>
|
||||
ctx.log.info("action=ask_dialog id={} result=success", dialogId)
|
||||
replyTo ! ProcessUpdateSuccess
|
||||
Behaviors.same
|
||||
case DialogResponseFailure(dialogId, exception, replyTo) =>
|
||||
ctx.log.error(s"action=ask_dialog id=$dialogId result=failure", exception)
|
||||
replyTo ! ProcessUpdateFailure(exception)
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
val eventHandler: EventHandler[State, Event] = (state, evt) => {
|
||||
evt match {
|
||||
case DialogAdded(chatId) =>
|
||||
val dialogActor = ctx.spawn(Behaviors.supervise(CheckDeliveryDialog.behavior(chatId, botUri)).onFailure(SupervisorStrategy.restart), s"delivery-check-$chatId")
|
||||
state.copy(dialogs = state.dialogs.updated(chatId, dialogActor))
|
||||
}
|
||||
}
|
||||
|
||||
EventSourcedBehavior(
|
||||
persistenceId = PersistenceId("dialog-manager"),
|
||||
emptyState = State(),
|
||||
commandHandler = commandHandler,
|
||||
eventHandler = eventHandler
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,16 +5,15 @@ import java.security.{KeyStore, SecureRandom}
|
||||
import java.util.UUID
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.typed.scaladsl.{Behaviors, StashBuffer}
|
||||
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector, SupervisorStrategy}
|
||||
import akka.actor.{ActorSystem, Scheduler}
|
||||
import akka.actor.typed._
|
||||
import akka.http.scaladsl.marshalling.Marshal
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.server.Directives.{as, complete, entity, extractLog, onComplete, path, post}
|
||||
import akka.http.scaladsl.server.Route
|
||||
import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext}
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.util.{ByteString, Timeout}
|
||||
import eu.xeppaka.telegram.bot.TelegramEntities._
|
||||
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
|
||||
@@ -34,173 +33,173 @@ object TelegramBot {
|
||||
case object GetBotInfo
|
||||
case object GetWebhookInfo
|
||||
|
||||
def behavior(botId: String, interface: String, localPort: Int, hookDomain: String, hookPort: Int, useHttpsServer: Boolean = true): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
ctx.log.info("action=start_bot")
|
||||
|
||||
implicit val untypedSystem: ActorSystem = ctx.system.toClassic
|
||||
implicit val actorMaterializer: ActorMaterializer = ActorMaterializer()
|
||||
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
|
||||
|
||||
val botUri = BotUri(botId)
|
||||
val http: HttpExt = Http()
|
||||
val hookId = UUID.randomUUID().toString
|
||||
val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId")
|
||||
val httpsContext = if (useHttpsServer) Some(createHttpsConnectionContext) else None
|
||||
val stashBuffer = StashBuffer[Command](10)
|
||||
val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart))
|
||||
val routes = botRoutes(hookId, dialogManager)(untypedSystem.scheduler)
|
||||
|
||||
def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
case class BindingSuccess(binding: Http.ServerBinding) extends Command
|
||||
case class BindingFailure(exception: Throwable) extends Command
|
||||
|
||||
ctx.log.info("action=bind_server interface={} port={}", interface, localPort)
|
||||
|
||||
http
|
||||
.bindAndHandle(routes, interface, localPort, httpsContext.getOrElse(http.defaultServerHttpContext))
|
||||
.onComplete {
|
||||
case Success(binding) => ctx.self ! BindingSuccess(binding)
|
||||
case Failure(exception) => ctx.self ! BindingFailure(exception)
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage[Command] {
|
||||
case BindingSuccess(binding) =>
|
||||
ctx.log.info("action=bind_server result=success")
|
||||
settingWebhook(binding)
|
||||
case BindingFailure(exception) =>
|
||||
ctx.log.error("action=bind_server result=failure", exception)
|
||||
ctx.log.error("action=start_bot result=failure")
|
||||
Behaviors.stopped
|
||||
case otherCommand: Command =>
|
||||
stashBuffer.stash(otherCommand)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
def unbindingServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
case object UnbindingSuccess extends Command
|
||||
case class UnbindingFailure(exception: Throwable) extends Command
|
||||
|
||||
ctx.log.info("action=unbind_server interface={} port={}", interface, localPort)
|
||||
|
||||
binding
|
||||
.unbind()
|
||||
.onComplete {
|
||||
case Success(Done) => ctx.self ! UnbindingSuccess
|
||||
case Failure(exception) => ctx.self ! UnbindingFailure(exception)
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage[Command] {
|
||||
case UnbindingSuccess =>
|
||||
ctx.log.info("action=unbind_server result=success")
|
||||
replyTo.foreach(_ ! Done)
|
||||
Behaviors.stopped
|
||||
case UnbindingFailure(exception) =>
|
||||
ctx.log.error("action=unbind_server result=failure", exception)
|
||||
replyTo.foreach(_ ! Done)
|
||||
Behaviors.stopped
|
||||
case _ => Behaviors.unhandled
|
||||
}
|
||||
}
|
||||
|
||||
def settingWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
case object SetWebhookSuccess extends Command
|
||||
case class SetWebhookFailure(exception: Throwable) extends Command
|
||||
|
||||
ctx.log.info("action=set_webhook url={} webhook={}", botUri.setWebhook, webhookUri)
|
||||
def behavior(botId: String, interface: String, localPort: Int, hookDomain: String, hookPort: Int, useHttpsServer: Boolean = true): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
|
||||
Behaviors.setup[Command] { ctx =>
|
||||
ctx.log.info("action=start_bot")
|
||||
|
||||
implicit val untypedSystem: ActorSystem = ctx.system.toClassic
|
||||
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
|
||||
|
||||
val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString()))
|
||||
val urlPart = Some(Multipart.FormData.BodyPart.Strict("url", urlEntity))
|
||||
val botUri = BotUri(botId)
|
||||
val http: HttpExt = Http()
|
||||
val hookId = UUID.randomUUID().toString
|
||||
val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId")
|
||||
val httpsContext = if (useHttpsServer) Some(createHttpsConnectionContext) else None
|
||||
val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart))
|
||||
val routes = botRoutes(hookId, dialogManager)(ctx.system.scheduler)
|
||||
|
||||
val certificatePart = if (useHttpsServer) {
|
||||
val certificate = ByteString(Source.fromResource("telegram-bot.pem").mkString)
|
||||
val certificateEntity = HttpEntity.Strict(ContentTypes.`application/octet-stream`, certificate)
|
||||
def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
case class BindingSuccess(binding: Http.ServerBinding) extends Command
|
||||
case class BindingFailure(exception: Throwable) extends Command
|
||||
|
||||
Some(Multipart.FormData.BodyPart.Strict("certificate", certificateEntity, Map("filename" -> "cert.pem")))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
ctx.log.info("action=bind_server interface={} port={}", interface, localPort)
|
||||
|
||||
val formParts = immutable.Seq(urlPart, certificatePart).flatten
|
||||
val formData = Multipart.FormData.Strict(formParts)
|
||||
|
||||
Marshal(formData)
|
||||
.to[RequestEntity]
|
||||
.flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity)))
|
||||
.onComplete {
|
||||
case Success(response) =>
|
||||
if (response.status.isSuccess())
|
||||
ctx.self ! SetWebhookSuccess
|
||||
else
|
||||
ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}."))
|
||||
case Failure(exception) =>
|
||||
ctx.self ! SetWebhookFailure(exception)
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case SetWebhookSuccess =>
|
||||
ctx.log.info("action=set_webhook result=success")
|
||||
stashBuffer.unstashAll(ctx, started(binding))
|
||||
case SetWebhookFailure(exception) =>
|
||||
if (attempt > 20) {
|
||||
ctx.log.error(exception, "action=set_webhook result=failure attempt={}", attempt)
|
||||
ctx.log.error("action=start_bot result=failure")
|
||||
unbindingServer(binding, None)
|
||||
} else {
|
||||
settingWebhook(binding, attempt = attempt + 1)
|
||||
http
|
||||
.bindAndHandle(routes, interface, localPort, httpsContext.getOrElse(http.defaultServerHttpContext))
|
||||
.onComplete {
|
||||
case Success(binding) => ctx.self ! BindingSuccess(binding)
|
||||
case Failure(exception) => ctx.self ! BindingFailure(exception)
|
||||
}
|
||||
case otherCommand: Command =>
|
||||
stashBuffer.stash(otherCommand)
|
||||
Behaviors.same
|
||||
|
||||
Behaviors.receiveMessage[Command] {
|
||||
case BindingSuccess(binding) =>
|
||||
ctx.log.info("action=bind_server result=success")
|
||||
settingWebhook(binding)
|
||||
case BindingFailure(exception) =>
|
||||
ctx.log.error("action=bind_server result=failure", exception)
|
||||
ctx.log.error("action=start_bot result=failure")
|
||||
Behaviors.stopped
|
||||
case otherCommand: Command =>
|
||||
stashBuffer.stash(otherCommand)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def deletingWebhook(binding: Http.ServerBinding, replyTo: ActorRef[Done]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
case object DeleteWebhookSuccess extends Command
|
||||
case class DeleteWebhookFailure(exception: Throwable) extends Command
|
||||
def unbindingServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
case object UnbindingSuccess extends Command
|
||||
case class UnbindingFailure(exception: Throwable) extends Command
|
||||
|
||||
ctx.log.info("action=delete_webhook url={} webhook={}", botUri.deleteWebhook, webhookUri)
|
||||
ctx.log.info("action=unbind_server interface={} port={}", interface, localPort)
|
||||
|
||||
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
|
||||
binding
|
||||
.unbind()
|
||||
.onComplete {
|
||||
case Success(Done) => ctx.self ! UnbindingSuccess
|
||||
case Failure(exception) => ctx.self ! UnbindingFailure(exception)
|
||||
}
|
||||
|
||||
http
|
||||
.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST))
|
||||
.onComplete {
|
||||
case Success(response) =>
|
||||
if (response.status.isSuccess())
|
||||
ctx.self ! DeleteWebhookSuccess
|
||||
else
|
||||
ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}"))
|
||||
case Failure(exception) =>
|
||||
ctx.self ! DeleteWebhookFailure(exception)
|
||||
Behaviors.receiveMessage[Command] {
|
||||
case UnbindingSuccess =>
|
||||
ctx.log.info("action=unbind_server result=success")
|
||||
replyTo.foreach(_ ! Done)
|
||||
Behaviors.stopped
|
||||
case UnbindingFailure(exception) =>
|
||||
ctx.log.error("action=unbind_server result=failure", exception)
|
||||
replyTo.foreach(_ ! Done)
|
||||
Behaviors.stopped
|
||||
case _ => Behaviors.unhandled
|
||||
}
|
||||
}
|
||||
|
||||
def settingWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
case object SetWebhookSuccess extends Command
|
||||
case class SetWebhookFailure(exception: Throwable) extends Command
|
||||
|
||||
ctx.log.info("action=set_webhook url={} webhook={}", botUri.setWebhook, webhookUri)
|
||||
|
||||
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
|
||||
|
||||
val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString()))
|
||||
val urlPart = Some(Multipart.FormData.BodyPart.Strict("url", urlEntity))
|
||||
|
||||
val certificatePart = if (useHttpsServer) {
|
||||
val certificate = ByteString(Source.fromResource("telegram-bot.pem").mkString)
|
||||
val certificateEntity = HttpEntity.Strict(ContentTypes.`application/octet-stream`, certificate)
|
||||
|
||||
Some(Multipart.FormData.BodyPart.Strict("certificate", certificateEntity, Map("filename" -> "cert.pem")))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case DeleteWebhookSuccess =>
|
||||
ctx.log.info("action=delete_webhook result=success")
|
||||
unbindingServer(binding, Some(replyTo))
|
||||
case DeleteWebhookFailure(exception) =>
|
||||
ctx.log.error("action=delete_webhook result=failure", exception)
|
||||
unbindingServer(binding, Some(replyTo))
|
||||
case _ => Behaviors.unhandled
|
||||
val formParts = immutable.Seq(urlPart, certificatePart).flatten
|
||||
val formData = Multipart.FormData.Strict(formParts)
|
||||
|
||||
Marshal(formData)
|
||||
.to[RequestEntity]
|
||||
.flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity)))
|
||||
.onComplete {
|
||||
case Success(response) =>
|
||||
if (response.status.isSuccess())
|
||||
ctx.self ! SetWebhookSuccess
|
||||
else
|
||||
ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}."))
|
||||
case Failure(exception) =>
|
||||
ctx.self ! SetWebhookFailure(exception)
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case SetWebhookSuccess =>
|
||||
ctx.log.info("action=set_webhook result=success")
|
||||
stashBuffer.unstashAll(started(binding))
|
||||
case SetWebhookFailure(exception) =>
|
||||
if (attempt > 20) {
|
||||
ctx.log.error(s"action=set_webhook result=failure attempt=$attempt", exception)
|
||||
ctx.log.error("action=start_bot result=failure")
|
||||
unbindingServer(binding, None)
|
||||
} else {
|
||||
settingWebhook(binding, attempt = attempt + 1)
|
||||
}
|
||||
case otherCommand: Command =>
|
||||
stashBuffer.stash(otherCommand)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def started(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
ctx.log.info("action=start_bot result=success")
|
||||
def deletingWebhook(binding: Http.ServerBinding, replyTo: ActorRef[Done]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
case object DeleteWebhookSuccess extends Command
|
||||
case class DeleteWebhookFailure(exception: Throwable) extends Command
|
||||
|
||||
Behaviors.receiveMessage[Command] {
|
||||
case stopCommand@Stop(replyTo) =>
|
||||
ctx.log.info("action=stop_bot")
|
||||
deletingWebhook(binding, replyTo)
|
||||
case _ =>
|
||||
Behaviors.unhandled
|
||||
ctx.log.info("action=delete_webhook url={} webhook={}", botUri.deleteWebhook, webhookUri)
|
||||
|
||||
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
|
||||
|
||||
http
|
||||
.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST))
|
||||
.onComplete {
|
||||
case Success(response) =>
|
||||
if (response.status.isSuccess())
|
||||
ctx.self ! DeleteWebhookSuccess
|
||||
else
|
||||
ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}"))
|
||||
case Failure(exception) =>
|
||||
ctx.self ! DeleteWebhookFailure(exception)
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case DeleteWebhookSuccess =>
|
||||
ctx.log.info("action=delete_webhook result=success")
|
||||
unbindingServer(binding, Some(replyTo))
|
||||
case DeleteWebhookFailure(exception) =>
|
||||
ctx.log.error("action=delete_webhook result=failure", exception)
|
||||
unbindingServer(binding, Some(replyTo))
|
||||
case _ => Behaviors.unhandled
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bindingServer
|
||||
def started(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx =>
|
||||
ctx.log.info("action=start_bot result=success")
|
||||
|
||||
Behaviors.receiveMessage[Command] {
|
||||
case Stop(replyTo) =>
|
||||
ctx.log.info("action=stop_bot")
|
||||
deletingWebhook(binding, replyTo)
|
||||
case _ =>
|
||||
Behaviors.unhandled
|
||||
}
|
||||
}
|
||||
|
||||
bindingServer
|
||||
}
|
||||
}
|
||||
|
||||
private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = {
|
||||
@@ -214,7 +213,7 @@ object TelegramBot {
|
||||
post {
|
||||
extractLog { log =>
|
||||
entity(as[Update]) { update =>
|
||||
onComplete(updatesProcessor.?[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) {
|
||||
onComplete(updatesProcessor.ask[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) {
|
||||
case Success(processResult) => processResult match {
|
||||
case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK))
|
||||
case DialogManager.ProcessUpdateFailure(exception) =>
|
||||
|
||||
Reference in New Issue
Block a user