diff --git a/build.sbt b/build.sbt index 737de62..d83950a 100644 --- a/build.sbt +++ b/build.sbt @@ -13,12 +13,13 @@ lazy val `telegram-bot-delivery` = (project in file(".")) name := "telegram-bot-delivery", libraryDependencies ++= Seq( scalaTest % Test, - akka, akkaTyped, + akkaClusterShardingTyped, akkaHttp, akkaStream, akkaPersistence, akkaPersistenceCassandra, + akkaPersistenceQuery, levelDbJni, circleCore, circleGeneric, @@ -26,13 +27,13 @@ lazy val `telegram-bot-delivery` = (project in file(".")) circeAkkaHttp, slibTelegram ), - dockerBaseImage := "openjdk:13-jdk-oracle", + dockerBaseImage := "openjdk:11", dockerExposedPorts := Seq(8443), dockerRepository := Some("registry.xeppaka.eu:443"), Docker / daemonUserUid := Some("1001"), Docker / daemonUser := "telegram-bot", Docker / defaultLinuxInstallLocation := "/opt/telegram-bot-delivery", - version := "1.0.1" + version := "1.1.1" ) .enablePlugins(JavaServerAppPackaging) .enablePlugins(DockerPlugin) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b0aff66..32948b3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,22 +4,23 @@ import Dependencies.Versions._ object Dependencies { object Versions { - val akkaVersion = "2.5.26" - val akkaHttpVersion = "10.1.10" - val akkaPersistenceCassandraVersion = "0.100" + val akkaVersion = "2.6.3" + val akkaHttpVersion = "10.1.11" + val akkaPersistenceCassandraVersion = "0.103" val levelDbJniVersion = "1.8" - val circeVersion = "0.12.3" - val akkaHttpCirceVersion = "1.29.1" - val scalaTestVersion = "3.2.0-M1" + val circeVersion = "0.13.0" + val akkaHttpCirceVersion = "1.31.0" + val scalaTestVersion = "3.2.0-M4" val slibTelegramVersion = "0.1.0" } - val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion - val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.100" + val akkaClusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion + val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandraVersion + val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion val levelDbJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % levelDbJniVersion val circleCore = "io.circe" %% "circe-core" % circeVersion val circleGeneric = "io.circe" %% "circe-generic" % circeVersion diff --git a/project/build.properties b/project/build.properties index 6adcdc7..a919a9b 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.3 +sbt.version=1.3.8 diff --git a/project/plugins.sbt b/project/plugins.sbt index f9199c9..3137ef2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,3 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.4.1") addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.12") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1") \ No newline at end of file diff --git a/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala b/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala index f589be7..0e934b7 100644 --- a/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala +++ b/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala @@ -1,11 +1,11 @@ package eu.xeppaka.bot +import akka.actor.ActorSystem +import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.scaladsl.{Behaviors, StashBuffer} -import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector, SupervisorStrategy} +import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} import akka.http.scaladsl.Http import akka.http.scaladsl.model._ -import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Sink, Source} import akka.util.{ByteString, Timeout} import eu.xeppaka.telegram.bot.TelegramEntities._ @@ -59,10 +59,9 @@ object CheckDeliveryDialog { private val removeKeyboard = Some(ReplyKeyboardRemove()) def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => - implicit val materializer: ActorMaterializer = ActorMaterializer()(ctx.system.toClassic) - implicit val executionContext: ExecutionContext = ctx.system.dispatchers.lookup(DispatcherSelector.default()) - val http = Http()(ctx.system.toClassic) - val stashBuffer = StashBuffer[Command](100) + implicit val system: ActorSystem = ctx.system.toClassic + implicit val executionContext: ExecutionContext = ctx.executionContext + val http = Http() val deliveryStateAdapter: ActorRef[CzechPostDeliveryCheck.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state)) val czechPostDeliveryCheck = ctx.spawnAnonymous(Behaviors.supervise(CzechPostDeliveryCheck.behavior(chatId.toString, deliveryStateAdapter)).onFailure(SupervisorStrategy.restart)) @@ -96,69 +95,73 @@ object CheckDeliveryDialog { Behaviors.unhandled } - def addParcel(parcelId: String, comment: String): Behavior[Command] = Behaviors.setup { ctx => - case object AddParcelSuccess extends Command - case class AddParcelFailure(exception: Throwable) extends Command - implicit val timeout: Timeout = 5.seconds + def addParcel(parcelId: String, comment: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => + Behaviors.setup { ctx => + case object AddParcelSuccess extends Command + case class AddParcelFailure(exception: Throwable) extends Command + implicit val timeout: Timeout = 5.seconds - ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) { - case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess - case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception) - case Failure(exception) => AddParcelFailure(exception) - } + ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) { + case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess + case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception) + case Failure(exception) => AddParcelFailure(exception) + } - Behaviors.receiveMessage { - case AddParcelSuccess => - val message = SendMessage(chatId, s"Parcel $parcelId was added to the watch list.", reply_markup = commandsKeyboard) - sendMessage(message, waitCommand, waitCommand) - case AddParcelFailure(exception) => - exception match { - case CzechPostDeliveryCheck.DuplicateParcelId(_) => - val message = SendMessage(chatId, s"Parcel $parcelId is in the watch list already.", reply_markup = commandsKeyboard) - sendMessage(message, waitCommand, waitCommand) - case _ => - ctx.log.error(exception, "action=add_parcel result=failure") - val message = SendMessage(chatId, s"Adding parcel failed. Please try again.", reply_markup = commandsKeyboard) - sendMessage(message, waitCommand, waitCommand) - } - case otherMessage => - stashBuffer.stash(otherMessage) - Behaviors.same + Behaviors.receiveMessage { + case AddParcelSuccess => + val message = SendMessage(chatId, s"Parcel $parcelId was added to the watch list.", reply_markup = commandsKeyboard) + sendMessage(message, waitCommand, waitCommand) + case AddParcelFailure(exception) => + exception match { + case CzechPostDeliveryCheck.DuplicateParcelId(_) => + val message = SendMessage(chatId, s"Parcel $parcelId is in the watch list already.", reply_markup = commandsKeyboard) + sendMessage(message, waitCommand, waitCommand) + case _ => + ctx.log.error("action=add_parcel result=failure", exception) + val message = SendMessage(chatId, s"Adding parcel failed. Please try again.", reply_markup = commandsKeyboard) + sendMessage(message, waitCommand, waitCommand) + } + case otherMessage => + stashBuffer.stash(otherMessage) + Behaviors.same + } } } - def listParcels: Behavior[Command] = Behaviors.setup { ctx => - case class ListParcelsSuccess(parcelsList: Seq[String]) extends Command - case class ListParcelsFailure(exception: Throwable) extends Command - implicit val timeout: Timeout = 5.seconds + def listParcels: Behavior[Command] = Behaviors.withStash(100) { stashBuffer => + Behaviors.setup { ctx => + case class ListParcelsSuccess(parcelsList: Seq[String]) extends Command + case class ListParcelsFailure(exception: Throwable) extends Command + implicit val timeout: Timeout = 5.seconds - ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.ListParcels(ref)) { - case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList) - case Failure(exception) => ListParcelsFailure(exception) - } + ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcels(ref)) { + case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList) + case Failure(exception) => ListParcelsFailure(exception) + } - Behaviors.receiveMessage { - case ListParcelsSuccess(parcelsList) => - val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.sorted.mkString("\n") else "(empty)") - val message = SendMessage(chatId, messageText, Some("Markdown"), reply_markup = commandsKeyboard) - sendMessage(message, waitCommand, waitCommand) - case ListParcelsFailure(exception) => - ctx.log.error(exception, "action=list_parcels result=failure chat_id={}", chatId) - val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard) - sendMessage(message, waitCommand, waitCommand) - case otherMessage => - stashBuffer.stash(otherMessage) - Behaviors.same + Behaviors.receiveMessage { + case ListParcelsSuccess(parcelsList) => + val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.sorted.mkString("\n") else "(empty)") + val message = SendMessage(chatId, messageText, Some("Markdown"), reply_markup = commandsKeyboard) + sendMessage(message, waitCommand, waitCommand) + case ListParcelsFailure(exception) => + ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception) + val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard) + sendMessage(message, waitCommand, waitCommand) + case otherMessage => + stashBuffer.stash(otherMessage) + Behaviors.same + } } } - def removeParcel(onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] = + def removeParcel(onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => Behaviors.setup { ctx => case class ListParcelIdsSuccess(parcelsList: Seq[String]) extends Command case class ListParcelIdsFailure(exception: Throwable) extends Command implicit val timeout: Timeout = 5.seconds - ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.ListParcelIds(ref)) { + ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcelIds(ref)) { case Success(CzechPostDeliveryCheck.ListParcelIdsResult(parcelsList)) => ListParcelIdsSuccess(parcelsList) case Failure(exception) => ListParcelIdsFailure(exception) } @@ -175,7 +178,7 @@ object CheckDeliveryDialog { sendMessage(message, onSuccess, onFailure) } case ListParcelIdsFailure(exception) => - ctx.log.error(exception, "action=list_parcels result=failure chat_id={}", chatId) + ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception) val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard) sendMessage(message, waitCommand, waitCommand) case otherMessage => @@ -183,35 +186,38 @@ object CheckDeliveryDialog { Behaviors.same } } + } - def removeParcelId(parcelId: String): Behavior[Command] = Behaviors.setup { ctx => - case object RemoveParcelSuccess extends Command - case class RemoveParcelFailure(exception: Throwable) extends Command - implicit val timeout: Timeout = 5.seconds + def removeParcelId(parcelId: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => + Behaviors.setup { ctx => + case object RemoveParcelSuccess extends Command + case class RemoveParcelFailure(exception: Throwable) extends Command + implicit val timeout: Timeout = 5.seconds - ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) { - case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess - case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception) - case Failure(exception) => RemoveParcelFailure(exception) - } + ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) { + case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess + case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception) + case Failure(exception) => RemoveParcelFailure(exception) + } - Behaviors.receiveMessage { - case RemoveParcelSuccess => - val message = SendMessage(chatId, s"Parcel $parcelId was removed from the watch list.", reply_markup = commandsKeyboard) - sendMessage(message, waitCommand, waitCommand) - case RemoveParcelFailure(exception) => - exception match { - case CzechPostDeliveryCheck.ParcelIdNotFound(_) => - val message = SendMessage(chatId, s"Parcel $parcelId is not found in the list of the watched parcels.", reply_markup = commandsKeyboard) - sendMessage(message, waitCommand, waitCommand) - case _ => - ctx.log.error(exception, "action=add_parcel result=failure") - val message = SendMessage(chatId, s"Remove of the parcel failed. Please try again.", reply_markup = commandsKeyboard) - sendMessage(message, waitCommand, waitCommand) - } - case otherMessage => - stashBuffer.stash(otherMessage) - Behaviors.same + Behaviors.receiveMessage { + case RemoveParcelSuccess => + val message = SendMessage(chatId, s"Parcel $parcelId was removed from the watch list.", reply_markup = commandsKeyboard) + sendMessage(message, waitCommand, waitCommand) + case RemoveParcelFailure(exception) => + exception match { + case CzechPostDeliveryCheck.ParcelIdNotFound(_) => + val message = SendMessage(chatId, s"Parcel $parcelId is not found in the list of the watched parcels.", reply_markup = commandsKeyboard) + sendMessage(message, waitCommand, waitCommand) + case _ => + ctx.log.error("action=add_parcel result=failure", exception) + val message = SendMessage(chatId, s"Remove of the parcel failed. Please try again.", reply_markup = commandsKeyboard) + sendMessage(message, waitCommand, waitCommand) + } + case otherMessage => + stashBuffer.stash(otherMessage) + Behaviors.same + } } } @@ -225,66 +231,16 @@ object CheckDeliveryDialog { // sendMessage(message, waitParcelId(parcelId => addParcel(parcelId)), waitCommand) // } - def waitTextMessage(onFinish: String => Behavior[Command]): Behavior[Command] = Behaviors.receiveMessage { - case ProcessMessage(msg, replyTo) => - if (msg.text.isDefined) { - val parcelId = msg.text.get - replyTo ! ProcessMessageSuccess - onFinish(parcelId) - } else { - replyTo ! ProcessMessageSuccess - waitTextMessage(onFinish) - } - case otherMsg => - stashBuffer.stash(otherMsg) - Behaviors.same - } - - def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx => - import io.circe.generic.auto._ - import io.circe.syntax._ - - case object SendMessageSuccess extends Command - case class SendMessageFailure(exception: Throwable) extends Command - - val json = printer.print(message.asJson) - val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json))) - - ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, json) - - Source - .single(request) - .initialDelay(2.seconds * (attempt - 1)) - .mapAsync(1) { request => - http - .singleRequest(request) - .transform { - case Success(response) => - if (response.status.isSuccess()) { - Success(SendMessageSuccess) - } else { - Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}."))) - } - case Failure(exception) => - ctx.log.error(exception, "action=send_message status=finished result=failure chat_id={}", chatId) - Success(SendMessageFailure(exception)) - } - } - .to(Sink.foreach(ctx.self ! _)) - .run() - + def waitTextMessage(onFinish: String => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => Behaviors.receiveMessage { - case SendMessageSuccess => - ctx.log.debug("action=send_message status=finished result=success chat_id={}", chatId) - stashBuffer.unstashAll(ctx, onSuccess) - case SendMessageFailure(exception) => - ctx.log.error(exception, "action=send_message status=finished result=failure chat_id={} attempt={}", chatId, attempt) - - if (attempt > 5) { - ctx.log.error(exception, "action=send_message result=failure message=attempts threshold exceeded") - stashBuffer.unstashAll(ctx, onFailure) + case ProcessMessage(msg, replyTo) => + if (msg.text.isDefined) { + val parcelId = msg.text.get + replyTo ! ProcessMessageSuccess + onFinish(parcelId) } else { - sendMessage(message, onSuccess, onFailure, attempt + 1) + replyTo ! ProcessMessageSuccess + waitTextMessage(onFinish) } case otherMsg => stashBuffer.stash(otherMsg) @@ -292,6 +248,60 @@ object CheckDeliveryDialog { } } + def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => + Behaviors.setup[Command] { ctx => + import io.circe.generic.auto._ + import io.circe.syntax._ + + case object SendMessageSuccess extends Command + case class SendMessageFailure(exception: Throwable) extends Command + + val json = printer.print(message.asJson) + val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json))) + + ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, json) + + Source + .single(request) + .initialDelay(2.seconds * (attempt - 1)) + .mapAsync(1) { request => + http + .singleRequest(request) + .transform { + case Success(response) => + if (response.status.isSuccess()) { + Success(SendMessageSuccess) + } else { + Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}."))) + } + case Failure(exception) => + ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId", exception) + Success(SendMessageFailure(exception)) + } + } + .to(Sink.foreach(ctx.self ! _)) + .run() + + Behaviors.receiveMessage { + case SendMessageSuccess => + ctx.log.debug("action=send_message status=finished result=success chat_id={}", chatId) + stashBuffer.unstashAll(onSuccess) + case SendMessageFailure(exception) => + ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId attempt=$attempt", exception) + + if (attempt > 5) { + ctx.log.error("action=send_message result=failure message=attempts threshold exceeded", exception) + stashBuffer.unstashAll(onFailure) + } else { + sendMessage(message, onSuccess, onFailure, attempt + 1) + } + case otherMsg => + stashBuffer.stash(otherMsg) + Behaviors.same + } + } + } + waitCommand } } diff --git a/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala b/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala index e6e7d12..84a7251 100644 --- a/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala +++ b/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala @@ -4,8 +4,8 @@ import java.security.cert.X509Certificate import java.text.SimpleDateFormat import akka.actor.ActorSystem +import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler} import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} import akka.http.scaladsl.UseHttp2.Negotiated import akka.http.scaladsl.model._ @@ -16,7 +16,6 @@ import akka.http.scaladsl.{Http, HttpsConnectionContext} import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler} import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior} -import akka.stream.ActorMaterializer import com.typesafe.sslconfig.akka.AkkaSSLConfig import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ import io.circe.generic.auto._ @@ -56,6 +55,7 @@ object Entities { object CzechPostDeliveryCheck { private val czechPostDateFormat = new SimpleDateFormat("yyyy-MM-dd") private val printDateFormat = new SimpleDateFormat("dd-MM-yyyy") + private val entityType = "czechpost" sealed trait Command sealed trait CommandResult @@ -123,154 +123,153 @@ object CzechPostDeliveryCheck { context } - def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.setup[Command] { ctx => - Behaviors.withTimers(scheduler => checkParcel(chatId, stateReporter, scheduler)) - } + def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = checkParcel(chatId, stateReporter) - private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged], scheduler: TimerScheduler[Command]): Behavior[Command] = Behaviors.setup { ctx => - implicit val actorSystem: ActorSystem = ctx.system.toUntyped - implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) - implicit val materializer: ActorMaterializer = ActorMaterializer() - val http = Http() - val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose - .withAcceptAnyCertificate(true) - .withDisableHostnameVerification(true))) - val originalCtx = http.createClientHttpsContext(badSslConfig) - val sslContext = new HttpsConnectionContext( - trustfulSslContext, - originalCtx.sslConfig, - originalCtx.enabledCipherSuites, - originalCtx.enabledProtocols, - originalCtx.clientAuth, - originalCtx.sslParameters, - Negotiated - ) - val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"))) - val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings) + private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.withTimers { scheduler => + Behaviors.setup { ctx => + implicit val actorSystem: ActorSystem = ctx.system.toClassic + implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) + val http = Http() + val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose + .withAcceptAnyCertificate(true) + .withDisableHostnameVerification(true))) + val originalCtx = http.createClientHttpsContext(badSslConfig) + val sslContext = new HttpsConnectionContext( + trustfulSslContext, + originalCtx.sslConfig, + originalCtx.enabledCipherSuites, + originalCtx.enabledProtocols, + originalCtx.clientAuth, + originalCtx.sslParameters, + Negotiated + ) + val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"))) + val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings) - scheduler.startPeriodicTimer("check-delivery-state", CheckParcels, 5.minutes) + scheduler.startPeriodicTimer("check-delivery-state", CheckParcels, 5.minutes) - val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { - cmd match { - case AddParcel(parcelId, comment, replyTo) => - val parcelIdUpper = parcelId.toUpperCase - if (state.parcelStates.keySet.contains(parcelIdUpper)) { + val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { + cmd match { + case AddParcel(parcelId, comment, replyTo) => + val parcelIdUpper = parcelId.toUpperCase + if (state.parcelStates.keySet.contains(parcelIdUpper)) { + Effect + .none + .thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper))) + } else { + Effect + .persist(ParcelAdded(parcelIdUpper, comment)) + .thenRun(_ => { + replyTo ! CommandResultSuccess + ctx.self ! CheckParcels + }) + } + case RemoveParcel(parcelId, replyTo) => + val parcelIdUpper = parcelId.toUpperCase + if (state.parcelStates.contains(parcelIdUpper)) { + Effect + .persist(ParcelRemoved(parcelIdUpper)) + .thenRun(_ => replyTo ! CommandResultSuccess) + } else { + Effect + .none + .thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper))) + } + + case ListParcels(replyTo) => + Effect.none + .thenRun { state => + val parcelsList = state.latestStatesPrint + replyTo ! ListParcelsResult(parcelsList) + } + + case ListParcelIds(replyTo) => + Effect.none + .thenRun { state => + replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq) + } + + case CheckParcels => Effect .none - .thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper))) - } else { + .thenRun { _ => + ctx.log.info("action=check_parcel_state chat_id={}", chatId) + val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id")) + + for (ids <- parcelIds) { + val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz") + val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) + + ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri) + + http + .singleRequest(request, connectionContext = sslContext, settings = connectionSettings) + .transform { + case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) + case response: Failure[HttpResponse] => response + } + .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) + .andThen { + case Success(parcelHistories) => + parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory)) + case Failure(exception) => + ctx.log.error("Error checking parcel history.", exception) + } + .andThen { + case Success(_) => ctx.log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri) + case Failure(exception) => ctx.log.error(s"action=check_parcel_state result=failure chat_id=$chatId check_uri=$checkUri", exception) + } + } + } + case ParcelHistoryRetrieved(parcelHistory) => + val parcelId = parcelHistory.id + val parcelState = state.parcelStates(parcelId) + val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty) + Some(parcelHistory.attributes) + else + parcelState.attributes + .flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None)) + .map(attributes => ParcelAttributesChanged(parcelId, attributes)) + .toSeq + + val newStates = parcelHistory.states.state.toSet -- parcelState.states + val stateEvents: Seq[Event] = newStates + .map(state => ParcelHistoryStateAdded(parcelId, state)) + .toSeq + val comment = state.parcelStates(parcelId).comment + Effect - .persist(ParcelAdded(parcelIdUpper, comment)) + .persist(attributesChangedEvents ++ stateEvents) .thenRun(_ => { - replyTo ! CommandResultSuccess - ctx.self ! CheckParcels + if (newStates.nonEmpty) { + stateReporter ! DeliveryStateChanged(Parcel(comment, None, newStates).fullStatePrint(parcelId)) + } }) - } - case RemoveParcel(parcelId, replyTo) => - val parcelIdUpper = parcelId.toUpperCase - if (state.parcelStates.contains(parcelIdUpper)) { - Effect - .persist(ParcelRemoved(parcelIdUpper)) - .thenRun(_ => replyTo ! CommandResultSuccess) - } else { - Effect - .none - .thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper))) - } - - case ListParcels(replyTo) => - Effect.none - .thenRun { state => - val parcelsList = state.latestStatesPrint - replyTo ! ListParcelsResult(parcelsList) - } - - case ListParcelIds(replyTo) => - Effect.none - .thenRun { state => - replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq) - } - - case CheckParcels => - Effect - .none - .thenRun { _ => - ctx.log.info("action=check_parcel_state chat_id={}", chatId) - val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id")) - - for (ids <- parcelIds) { - val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz") - val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) - - ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri) - - http - .singleRequest(request, connectionContext = sslContext, settings = connectionSettings) - .transform { - case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) - case response: Failure[HttpResponse] => response - } - .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) - .andThen { - case Success(parcelHistories) => - parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory)) - case Failure(exception) => - ctx.log.error(exception, "Error checking parcel history.") - } - .andThen { - case Success(_) => ctx.log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri) - case Failure(exception) => ctx.log.error(exception, "action=check_parcel_state result=failure chat_id={} check_uri={}", chatId, checkUri) - } - } - } - case ParcelHistoryRetrieved(parcelHistory) => - val parcelId = parcelHistory.id - val parcelState = state.parcelStates(parcelId) - val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty) - Some(parcelHistory.attributes) - else - parcelState.attributes - .flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None)) - .map(attributes => ParcelAttributesChanged(parcelId, attributes)) - .toSeq - - val newStates = parcelHistory.states.state.toSet -- parcelState.states - val stateEvents: Seq[Event] = newStates - .map(state => ParcelHistoryStateAdded(parcelId, state)) - .toSeq - val comment = state.parcelStates(parcelId).comment - - Effect - .persist(attributesChangedEvents ++ stateEvents) - .thenRun(_ => { - if (newStates.nonEmpty) { - stateReporter ! DeliveryStateChanged(Parcel(comment, None, newStates).fullStatePrint(parcelId)) - } - }) + } } - } - val eventHandler: EventHandler[State, Event] = (state, evt) => { - evt match { - case ParcelAdded(parcelId, comment) => - state.copy(parcelStates = state.parcelStates + (parcelId -> Parcel(comment))) - case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId) - case ParcelHistoryStateAdded(parcelId, newState) => - val parcelState = state.parcelStates(parcelId) - val newParcelState = parcelState.copy(states = parcelState.states + newState) - state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState)) - case ParcelAttributesChanged(parcelId, newAttributes) => - val parcelState = state.parcelStates(parcelId) - val newParcelState = parcelState.copy(attributes = Some(newAttributes)) - state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState)) + val eventHandler: EventHandler[State, Event] = (state, evt) => { + evt match { + case ParcelAdded(parcelId, comment) => + state.copy(parcelStates = state.parcelStates + (parcelId -> Parcel(comment))) + case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId) + case ParcelHistoryStateAdded(parcelId, newState) => + val parcelState = state.parcelStates(parcelId) + val newParcelState = parcelState.copy(states = parcelState.states + newState) + state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState)) + case ParcelAttributesChanged(parcelId, newAttributes) => + val parcelState = state.parcelStates(parcelId) + val newParcelState = parcelState.copy(attributes = Some(newAttributes)) + state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState)) + } } - } - EventSourcedBehavior[Command, Event, State]( - persistenceId = PersistenceId(s"$chatId-czechpost"), - emptyState = State(), - commandHandler = commandHandler, - eventHandler = eventHandler - ) + EventSourcedBehavior[Command, Event, State]( + persistenceId = PersistenceId(entityType, chatId), + emptyState = State(), + commandHandler = commandHandler, + eventHandler = eventHandler + ) + } } } diff --git a/src/main/scala/eu/xeppaka/bot/DialogManager.scala b/src/main/scala/eu/xeppaka/bot/DialogManager.scala index 7295dc1..a436e5a 100644 --- a/src/main/scala/eu/xeppaka/bot/DialogManager.scala +++ b/src/main/scala/eu/xeppaka/bot/DialogManager.scala @@ -1,10 +1,7 @@ package eu.xeppaka.bot import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} -import akka.persistence.typed.PersistenceId -import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler} -import akka.persistence.typed.scaladsl.{Effect, EffectBuilder, EventSourcedBehavior} +import akka.actor.typed.{ActorRef, Behavior} import akka.util.Timeout import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess} import eu.xeppaka.telegram.bot.TelegramEntities._ @@ -24,77 +21,34 @@ object DialogManager { private case class DialogResponseSuccess(dialogId: Long, replyTo: ActorRef[CommandResult]) extends Command private case class DialogResponseFailure(dialogId: Long, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command - sealed trait Event - private case class DialogAdded(chatId: Long) extends Event - - case class State(dialogs: Map[Long, ActorRef[CheckDeliveryDialog.Command]] = Map.empty) - def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => - val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { - cmd match { - case ProcessUpdate(update, replyTo) => - if (update.message.isDefined) { - val chatId = update.message.get.chat.id + Behaviors.receiveMessagePartial { + case ProcessUpdate(update, replyTo) => + if (update.message.isDefined) { + val chatId = update.message.get.chat.id + ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get) + val msg = update.message.get + val dialogActor = ctx.child(chatId.toString).getOrElse(ctx.spawn(CheckDeliveryDialog.behavior(chatId, botUri), chatId.toString)).unsafeUpcast[CheckDeliveryDialog.Command] + ctx.log.info("action=ask_dialog id={}", chatId) - val effect: EffectBuilder[Event, State] = if (state.dialogs.contains(chatId)) { - Effect.none - } else { - Effect.persist(DialogAdded(chatId)) - } - - effect - .thenRun(_ => ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get)) - .thenRun { state => - val msg = update.message.get - val dialogActor = state.dialogs(chatId) - - ctx.log.info("action=ask_dialog id={}", chatId) - - implicit val timeout: Timeout = 20.seconds - ctx.ask(dialogActor)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) { - case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo) - case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo) - case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo) - } - } - } else { - Effect - .none - .thenRun { _ => - ctx.log.debug("action=process_update result=success message=update message is empty") - } + implicit val timeout: Timeout = 5.seconds + ctx.ask[CheckDeliveryDialog.Command, CheckDeliveryDialog.CommandResult](dialogActor, replyTo => CheckDeliveryDialog.ProcessMessage(msg, replyTo)) { + case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo) + case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo) + case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo) } - - case DialogResponseSuccess(dialogId, replyTo) => - Effect - .none - .thenRun { _ => - ctx.log.info("action=ask_dialog id={} result=success", dialogId) - replyTo ! ProcessUpdateSuccess - } - case DialogResponseFailure(dialogId, exception, replyTo) => - Effect - .none - .thenRun { _ => - ctx.log.error(exception, "action=ask_dialog id={} result=failure", dialogId) - replyTo ! ProcessUpdateFailure(exception) - } - } + } else { + ctx.log.debug("action=process_update result=success message=update message is empty") + } + Behaviors.same + case DialogResponseSuccess(dialogId, replyTo) => + ctx.log.info("action=ask_dialog id={} result=success", dialogId) + replyTo ! ProcessUpdateSuccess + Behaviors.same + case DialogResponseFailure(dialogId, exception, replyTo) => + ctx.log.error(s"action=ask_dialog id=$dialogId result=failure", exception) + replyTo ! ProcessUpdateFailure(exception) + Behaviors.same } - - val eventHandler: EventHandler[State, Event] = (state, evt) => { - evt match { - case DialogAdded(chatId) => - val dialogActor = ctx.spawn(Behaviors.supervise(CheckDeliveryDialog.behavior(chatId, botUri)).onFailure(SupervisorStrategy.restart), s"delivery-check-$chatId") - state.copy(dialogs = state.dialogs.updated(chatId, dialogActor)) - } - } - - EventSourcedBehavior( - persistenceId = PersistenceId("dialog-manager"), - emptyState = State(), - commandHandler = commandHandler, - eventHandler = eventHandler - ) } } diff --git a/src/main/scala/eu/xeppaka/bot/TelegramBot.scala b/src/main/scala/eu/xeppaka/bot/TelegramBot.scala index 6a1010a..31df1bf 100644 --- a/src/main/scala/eu/xeppaka/bot/TelegramBot.scala +++ b/src/main/scala/eu/xeppaka/bot/TelegramBot.scala @@ -5,16 +5,15 @@ import java.security.{KeyStore, SecureRandom} import java.util.UUID import akka.Done +import akka.actor.ActorSystem +import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.scaladsl.{Behaviors, StashBuffer} -import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector, SupervisorStrategy} -import akka.actor.{ActorSystem, Scheduler} +import akka.actor.typed._ import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives.{as, complete, entity, extractLog, onComplete, path, post} import akka.http.scaladsl.server.Route import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext} -import akka.stream.ActorMaterializer import akka.util.{ByteString, Timeout} import eu.xeppaka.telegram.bot.TelegramEntities._ import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} @@ -34,173 +33,173 @@ object TelegramBot { case object GetBotInfo case object GetWebhookInfo - def behavior(botId: String, interface: String, localPort: Int, hookDomain: String, hookPort: Int, useHttpsServer: Boolean = true): Behavior[Command] = Behaviors.setup[Command] { ctx => - ctx.log.info("action=start_bot") - - implicit val untypedSystem: ActorSystem = ctx.system.toClassic - implicit val actorMaterializer: ActorMaterializer = ActorMaterializer() - implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) - - val botUri = BotUri(botId) - val http: HttpExt = Http() - val hookId = UUID.randomUUID().toString - val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId") - val httpsContext = if (useHttpsServer) Some(createHttpsConnectionContext) else None - val stashBuffer = StashBuffer[Command](10) - val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart)) - val routes = botRoutes(hookId, dialogManager)(untypedSystem.scheduler) - - def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx => - case class BindingSuccess(binding: Http.ServerBinding) extends Command - case class BindingFailure(exception: Throwable) extends Command - - ctx.log.info("action=bind_server interface={} port={}", interface, localPort) - - http - .bindAndHandle(routes, interface, localPort, httpsContext.getOrElse(http.defaultServerHttpContext)) - .onComplete { - case Success(binding) => ctx.self ! BindingSuccess(binding) - case Failure(exception) => ctx.self ! BindingFailure(exception) - } - - Behaviors.receiveMessage[Command] { - case BindingSuccess(binding) => - ctx.log.info("action=bind_server result=success") - settingWebhook(binding) - case BindingFailure(exception) => - ctx.log.error("action=bind_server result=failure", exception) - ctx.log.error("action=start_bot result=failure") - Behaviors.stopped - case otherCommand: Command => - stashBuffer.stash(otherCommand) - Behaviors.same - } - } - - def unbindingServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx => - case object UnbindingSuccess extends Command - case class UnbindingFailure(exception: Throwable) extends Command - - ctx.log.info("action=unbind_server interface={} port={}", interface, localPort) - - binding - .unbind() - .onComplete { - case Success(Done) => ctx.self ! UnbindingSuccess - case Failure(exception) => ctx.self ! UnbindingFailure(exception) - } - - Behaviors.receiveMessage[Command] { - case UnbindingSuccess => - ctx.log.info("action=unbind_server result=success") - replyTo.foreach(_ ! Done) - Behaviors.stopped - case UnbindingFailure(exception) => - ctx.log.error("action=unbind_server result=failure", exception) - replyTo.foreach(_ ! Done) - Behaviors.stopped - case _ => Behaviors.unhandled - } - } - - def settingWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx => - case object SetWebhookSuccess extends Command - case class SetWebhookFailure(exception: Throwable) extends Command - - ctx.log.info("action=set_webhook url={} webhook={}", botUri.setWebhook, webhookUri) + def behavior(botId: String, interface: String, localPort: Int, hookDomain: String, hookPort: Int, useHttpsServer: Boolean = true): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => + Behaviors.setup[Command] { ctx => + ctx.log.info("action=start_bot") + implicit val untypedSystem: ActorSystem = ctx.system.toClassic implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) - val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString())) - val urlPart = Some(Multipart.FormData.BodyPart.Strict("url", urlEntity)) + val botUri = BotUri(botId) + val http: HttpExt = Http() + val hookId = UUID.randomUUID().toString + val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId") + val httpsContext = if (useHttpsServer) Some(createHttpsConnectionContext) else None + val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart)) + val routes = botRoutes(hookId, dialogManager)(ctx.system.scheduler) - val certificatePart = if (useHttpsServer) { - val certificate = ByteString(Source.fromResource("telegram-bot.pem").mkString) - val certificateEntity = HttpEntity.Strict(ContentTypes.`application/octet-stream`, certificate) + def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx => + case class BindingSuccess(binding: Http.ServerBinding) extends Command + case class BindingFailure(exception: Throwable) extends Command - Some(Multipart.FormData.BodyPart.Strict("certificate", certificateEntity, Map("filename" -> "cert.pem"))) - } else { - None - } + ctx.log.info("action=bind_server interface={} port={}", interface, localPort) - val formParts = immutable.Seq(urlPart, certificatePart).flatten - val formData = Multipart.FormData.Strict(formParts) - - Marshal(formData) - .to[RequestEntity] - .flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))) - .onComplete { - case Success(response) => - if (response.status.isSuccess()) - ctx.self ! SetWebhookSuccess - else - ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}.")) - case Failure(exception) => - ctx.self ! SetWebhookFailure(exception) - } - - Behaviors.receiveMessage { - case SetWebhookSuccess => - ctx.log.info("action=set_webhook result=success") - stashBuffer.unstashAll(ctx, started(binding)) - case SetWebhookFailure(exception) => - if (attempt > 20) { - ctx.log.error(exception, "action=set_webhook result=failure attempt={}", attempt) - ctx.log.error("action=start_bot result=failure") - unbindingServer(binding, None) - } else { - settingWebhook(binding, attempt = attempt + 1) + http + .bindAndHandle(routes, interface, localPort, httpsContext.getOrElse(http.defaultServerHttpContext)) + .onComplete { + case Success(binding) => ctx.self ! BindingSuccess(binding) + case Failure(exception) => ctx.self ! BindingFailure(exception) } - case otherCommand: Command => - stashBuffer.stash(otherCommand) - Behaviors.same + + Behaviors.receiveMessage[Command] { + case BindingSuccess(binding) => + ctx.log.info("action=bind_server result=success") + settingWebhook(binding) + case BindingFailure(exception) => + ctx.log.error("action=bind_server result=failure", exception) + ctx.log.error("action=start_bot result=failure") + Behaviors.stopped + case otherCommand: Command => + stashBuffer.stash(otherCommand) + Behaviors.same + } } - } - def deletingWebhook(binding: Http.ServerBinding, replyTo: ActorRef[Done]): Behavior[Command] = Behaviors.setup[Command] { ctx => - case object DeleteWebhookSuccess extends Command - case class DeleteWebhookFailure(exception: Throwable) extends Command + def unbindingServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx => + case object UnbindingSuccess extends Command + case class UnbindingFailure(exception: Throwable) extends Command - ctx.log.info("action=delete_webhook url={} webhook={}", botUri.deleteWebhook, webhookUri) + ctx.log.info("action=unbind_server interface={} port={}", interface, localPort) - implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) + binding + .unbind() + .onComplete { + case Success(Done) => ctx.self ! UnbindingSuccess + case Failure(exception) => ctx.self ! UnbindingFailure(exception) + } - http - .singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)) - .onComplete { - case Success(response) => - if (response.status.isSuccess()) - ctx.self ! DeleteWebhookSuccess - else - ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}")) - case Failure(exception) => - ctx.self ! DeleteWebhookFailure(exception) + Behaviors.receiveMessage[Command] { + case UnbindingSuccess => + ctx.log.info("action=unbind_server result=success") + replyTo.foreach(_ ! Done) + Behaviors.stopped + case UnbindingFailure(exception) => + ctx.log.error("action=unbind_server result=failure", exception) + replyTo.foreach(_ ! Done) + Behaviors.stopped + case _ => Behaviors.unhandled + } + } + + def settingWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx => + case object SetWebhookSuccess extends Command + case class SetWebhookFailure(exception: Throwable) extends Command + + ctx.log.info("action=set_webhook url={} webhook={}", botUri.setWebhook, webhookUri) + + implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) + + val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString())) + val urlPart = Some(Multipart.FormData.BodyPart.Strict("url", urlEntity)) + + val certificatePart = if (useHttpsServer) { + val certificate = ByteString(Source.fromResource("telegram-bot.pem").mkString) + val certificateEntity = HttpEntity.Strict(ContentTypes.`application/octet-stream`, certificate) + + Some(Multipart.FormData.BodyPart.Strict("certificate", certificateEntity, Map("filename" -> "cert.pem"))) + } else { + None } - Behaviors.receiveMessage { - case DeleteWebhookSuccess => - ctx.log.info("action=delete_webhook result=success") - unbindingServer(binding, Some(replyTo)) - case DeleteWebhookFailure(exception) => - ctx.log.error("action=delete_webhook result=failure", exception) - unbindingServer(binding, Some(replyTo)) - case _ => Behaviors.unhandled + val formParts = immutable.Seq(urlPart, certificatePart).flatten + val formData = Multipart.FormData.Strict(formParts) + + Marshal(formData) + .to[RequestEntity] + .flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))) + .onComplete { + case Success(response) => + if (response.status.isSuccess()) + ctx.self ! SetWebhookSuccess + else + ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}.")) + case Failure(exception) => + ctx.self ! SetWebhookFailure(exception) + } + + Behaviors.receiveMessage { + case SetWebhookSuccess => + ctx.log.info("action=set_webhook result=success") + stashBuffer.unstashAll(started(binding)) + case SetWebhookFailure(exception) => + if (attempt > 20) { + ctx.log.error(s"action=set_webhook result=failure attempt=$attempt", exception) + ctx.log.error("action=start_bot result=failure") + unbindingServer(binding, None) + } else { + settingWebhook(binding, attempt = attempt + 1) + } + case otherCommand: Command => + stashBuffer.stash(otherCommand) + Behaviors.same + } } - } - def started(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx => - ctx.log.info("action=start_bot result=success") + def deletingWebhook(binding: Http.ServerBinding, replyTo: ActorRef[Done]): Behavior[Command] = Behaviors.setup[Command] { ctx => + case object DeleteWebhookSuccess extends Command + case class DeleteWebhookFailure(exception: Throwable) extends Command - Behaviors.receiveMessage[Command] { - case stopCommand@Stop(replyTo) => - ctx.log.info("action=stop_bot") - deletingWebhook(binding, replyTo) - case _ => - Behaviors.unhandled + ctx.log.info("action=delete_webhook url={} webhook={}", botUri.deleteWebhook, webhookUri) + + implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) + + http + .singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)) + .onComplete { + case Success(response) => + if (response.status.isSuccess()) + ctx.self ! DeleteWebhookSuccess + else + ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}")) + case Failure(exception) => + ctx.self ! DeleteWebhookFailure(exception) + } + + Behaviors.receiveMessage { + case DeleteWebhookSuccess => + ctx.log.info("action=delete_webhook result=success") + unbindingServer(binding, Some(replyTo)) + case DeleteWebhookFailure(exception) => + ctx.log.error("action=delete_webhook result=failure", exception) + unbindingServer(binding, Some(replyTo)) + case _ => Behaviors.unhandled + } } - } - bindingServer + def started(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx => + ctx.log.info("action=start_bot result=success") + + Behaviors.receiveMessage[Command] { + case Stop(replyTo) => + ctx.log.info("action=stop_bot") + deletingWebhook(binding, replyTo) + case _ => + Behaviors.unhandled + } + } + + bindingServer + } } private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = { @@ -214,7 +213,7 @@ object TelegramBot { post { extractLog { log => entity(as[Update]) { update => - onComplete(updatesProcessor.?[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) { + onComplete(updatesProcessor.ask[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) { case Success(processResult) => processResult match { case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK)) case DialogManager.ProcessUpdateFailure(exception) =>