diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala new file mode 100644 index 0000000..cffd99e --- /dev/null +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala @@ -0,0 +1,53 @@ +package eu.xeppaka.bot + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} +import akka.http.scaladsl.model._ +import akka.http.scaladsl.{Http, HttpExt} +import akka.util.ByteString +import eu.xeppaka.bot.TelegramEntities.{Message, SendMessage} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +object CheckDeliveryDialog { + sealed trait Command + sealed trait CommandResult + + case class ProcessMessage(msg: Message, replyTo: ActorRef[CommandResult]) extends Command + case object ProcessMessageSuccess extends CommandResult + case class ProcessMessageFailure(exception: Throwable) extends CommandResult + + def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => + implicit val executionContext: ExecutionContext = ctx.system.dispatchers.lookup(DispatcherSelector.default()) + val http = Http()(ctx.system.toUntyped) + + Behaviors.receiveMessage { + case ProcessMessage(msg, replyTo) => + sendResponse(http, botUri, chatId, msg.text.get) + .onComplete { + case Success(response) => + if (response.status.isSuccess()) { + replyTo ! ProcessMessageSuccess + } else { + replyTo ! ProcessMessageFailure(new RuntimeException(s"Error sending response. HTTP response code: ${response.status.value}.")) + } + case Failure(exception) => replyTo ! ProcessMessageFailure(exception) + } + Behaviors.same + } + } + + private def sendResponse(http: HttpExt, botUri: BotUri, chatId: Long, text: String): Future[HttpResponse] = { + import io.circe._ + import io.circe.generic.auto._ + import io.circe.syntax._ + + val sendMessage = SendMessage(chatId, text) + val printer = Printer.noSpaces.copy(dropNullValues = true) + val json = printer.pretty(sendMessage.asJson) + val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json))) + http.singleRequest(request) + } +} diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala new file mode 100644 index 0000000..2f4bca8 --- /dev/null +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala @@ -0,0 +1,78 @@ +package eu.xeppaka.bot + +import akka.actor.typed.receptionist.{Receptionist, ServiceKey} +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ActorRef, Behavior} +import akka.util.Timeout +import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess} +import eu.xeppaka.bot.TelegramEntities.{Message, Update} + +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +object DialogManager { + sealed trait Command + sealed trait CommandResult + + case class ProcessUpdate(update: Update, replyTo: ActorRef[CommandResult]) extends Command + case object ProcessUpdateSuccess extends CommandResult + case class ProcessUpdateFailure(exception: Throwable) extends CommandResult + + // internal messages + private case class ReceptionistListingWrapper(chatId: Long, key: ServiceKey[CheckDeliveryDialog.Command], listing: Receptionist.Listing, msg: Message, replyTo: ActorRef[CommandResult]) extends Command + private case class DialogResponseSuccess(dialogId: String, replyTo: ActorRef[CommandResult]) extends Command + private case class DialogResponseFailure(dialogId: String, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command + + def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => + + Behaviors.receiveMessage { + case ProcessUpdate(update, replyTo) => + ctx.log.debug("action=update_received update={}", update) + + if (update.message.isDefined) { + val chatId = update.message.get.chat.id + val dialogKey = ServiceKey[CheckDeliveryDialog.Command](chatId.toString) + val receptionistMapper: ActorRef[Receptionist.Listing] = ctx.messageAdapter(listing => ReceptionistListingWrapper(chatId, dialogKey, listing, update.message.get, replyTo)) + ctx.log.debug(s"action=find_dialog id=${chatId.toString}") + + ctx.system.receptionist ! Receptionist.Find(dialogKey, receptionistMapper) + } + + Behaviors.same + case ReceptionistListingWrapper(chatId, dialogKey, listing, msg, replyTo) => + listing match { + case dialogKey.Listing(dialogs) => + if (dialogs.isEmpty) { + ctx.log.debug("action=find_dialog id={} result=not_found", dialogKey.id) + } else { + ctx.log.debug("action=find_dialog id={} result=found", dialogKey.id) + } + + val dialog = dialogs.headOption.getOrElse({ + val dialogActor = ctx.spawn(CheckDeliveryDialog.behavior(chatId, botUri), s"delivery-check-${dialogKey.id}") + ctx.system.receptionist ! Receptionist.Register(dialogKey, dialogActor) + dialogActor + }) + + ctx.log.info("action=ask_dialog id={}", dialogKey.id) + + implicit val timeout: Timeout = 5.seconds + ctx.ask(dialog)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) { + case Success(ProcessMessageSuccess) => DialogResponseSuccess(dialogKey.id, replyTo) + case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(dialogKey.id, exception, replyTo) + case Failure(exception) => DialogResponseFailure(dialogKey.id, exception, replyTo) + } + } + + Behaviors.same + case DialogResponseSuccess(dialogId, replyTo) => + ctx.log.info("action=ask_dialog id={} result=success", dialogId) + replyTo ! ProcessUpdateSuccess + Behaviors.same + case DialogResponseFailure(dialogId, exception, replyTo) => + ctx.log.error(exception, "action=ask_dialog id={} result=failure", dialogId) + replyTo ! ProcessUpdateFailure(exception) + Behaviors.same + } + } +} diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala index 814529a..2b66da7 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala @@ -2,8 +2,11 @@ package eu.xeppaka.bot import akka.actor.Scheduler import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.{ActorSystem, DispatcherSelector} -import eu.xeppaka.bot.TelegramBot._ +import akka.http.scaladsl.Http +import akka.util.Timeout +import akka.{Done, actor} import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContextExecutor, Future} @@ -13,16 +16,18 @@ object Main { def main(args: Array[String]): Unit = { val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" val telegramBot = ActorSystem(TelegramBot.behavior(botId, "lenovo", 8443), "telegram-bot") + implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default()) implicit val scheduler: Scheduler = telegramBot.scheduler - - val startFuture: Future[StartResult] = (telegramBot ? (ref => TelegramBot.Start(ref))) (10.seconds, scheduler) + implicit val timeout: Timeout = 10.seconds println("Press enter to finish bot...") StdIn.readLine() - val terminateFuture = startFuture - .flatMap { _ => (telegramBot ? (ref => TelegramBot.Stop(ref))) (10.seconds, scheduler) } + val stopFuture: Future[Done] = telegramBot ? (ref => TelegramBot.Stop(ref)) + + val terminateFuture = stopFuture + .andThen { case _ => Http().shutdownAllConnectionPools() } .andThen { case _ => telegramBot.terminate() } Await.ready(terminateFuture, 5.seconds) diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala index 818cb46..f2fadce 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala @@ -5,42 +5,38 @@ import java.security.{KeyStore, SecureRandom} import java.util.UUID import akka.Done -import akka.actor.ActorSystem +import akka.actor.{ActorSystem, Scheduler} import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.{Behaviors, StashBuffer} import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ -import akka.http.scaladsl.server.Directives.{as, complete, entity, path, post} +import akka.http.scaladsl.server.Directives.{as, entity, onComplete, path, post, complete} import akka.http.scaladsl.server.Route import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext} import akka.stream.ActorMaterializer -import akka.util.ByteString +import akka.util.{ByteString, Timeout} import eu.xeppaka.bot.TelegramEntities._ import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} import scala.collection.immutable import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration._ import scala.io.Source import scala.util.{Failure, Success} object TelegramBot { sealed trait Command sealed trait CommandResult - sealed trait StartResult extends CommandResult sealed trait StopResult extends CommandResult - case object StartSuccess extends StartResult - case class StartFailure(exception: Throwable) extends StartResult - case object StopSuccess extends StopResult - case class StopFailure(exception: Throwable) extends StopResult - - case class Start(replyTo: ActorRef[StartResult]) extends Command - case class Stop(replyTo: ActorRef[StopResult]) extends Command + case class Stop(replyTo: ActorRef[Done]) extends Command case object GetBotInfo case object GetWebhookInfo def behavior(botId: String, interface: String, port: Int): Behavior[Command] = Behaviors.setup[Command] { ctx => + ctx.log.info("action=start_bot") + implicit val untypedSystem: ActorSystem = ctx.system.toUntyped implicit val actorMaterializer: ActorMaterializer = ActorMaterializer() implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) @@ -51,48 +47,10 @@ object TelegramBot { val webhookUri = Uri(s"https://xeppaka.eu:88/$hookId") val httpsContext = createHttpsConnectionContext val stashBuffer = StashBuffer[Command](10) - val updatesProcessor = ctx.spawn(UpdatesProcessor.behavior, "updatesProcessor") - val routes = botRoutes(hookId, updatesProcessor) + val dialogManager = ctx.spawnAnonymous(DialogManager.behavior(botUri)) + val routes = botRoutes(hookId, dialogManager)(untypedSystem.scheduler) - def reply(command: Command, exceptions: Seq[Throwable]): Unit = { - command match { - case Start(replyTo) => - if (exceptions.isEmpty) { - ctx.log.info("action=start_bot result=success") - replyTo ! StartSuccess - } else { - ctx.log.error("action=start_bot result=failure", exceptions.head) - replyTo ! StartFailure(exceptions.head) - } - case Stop(replyTo) => - if (exceptions.isEmpty) { - ctx.log.info("action=stop_bot result=success") - replyTo ! StopSuccess - } else { - ctx.log.error("action=stop_bot result=failure", exceptions.head) - replyTo ! StopFailure(exceptions.head) - } - case _ => throw new IllegalArgumentException(s"Unsupported command to reply: $command.") - } - } - - def stopped(replyOnCommand: Option[Command] = None, exceptions: Seq[Throwable] = Seq.empty): Behavior[Command] = Behaviors.setup { ctx => - replyOnCommand.foreach(reply(_, exceptions)) - - Behaviors.receiveMessage[Command] { - case startCommand@Start(_) => - ctx.log.info("action=start_bot") - bindingServer(startCommand) - case stopCommand@Stop(_) => - ctx.log.info("action=stop_bot") - reply(stopCommand, Seq.empty) - Behaviors.same - case _ => - Behaviors.unhandled - } - } - - def bindingServer(replyOnCommand: Command): Behavior[Command] = Behaviors.setup[Command] { ctx => + def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx => case class BindingSuccess(binding: Http.ServerBinding) extends Command case class BindingFailure(exception: Throwable) extends Command @@ -108,17 +66,18 @@ object TelegramBot { Behaviors.receiveMessage[Command] { case BindingSuccess(binding) => ctx.log.info("action=bind_server result=success") - stashBuffer.unstashAll(ctx, settingWebhook(binding, replyOnCommand, Seq.empty)) + settingWebhook(binding) case BindingFailure(exception) => ctx.log.error("action=bind_server result=failure", exception) - stashBuffer.unstashAll(ctx, stopped(Some(replyOnCommand), Seq(exception))) + ctx.log.error("action=start_bot result=failure") + Behaviors.stopped case otherCommand: Command => stashBuffer.stash(otherCommand) Behaviors.same } } - def unbindingServer(binding: Http.ServerBinding, replyOnCommand: Command, exceptions: Seq[Throwable]): Behavior[Command] = Behaviors.setup[Command] { ctx => + def unbindingServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx => case object UnbindingSuccess extends Command case class UnbindingFailure(exception: Throwable) extends Command @@ -134,17 +93,17 @@ object TelegramBot { Behaviors.receiveMessage[Command] { case UnbindingSuccess => ctx.log.info("action=unbind_server result=success") - stashBuffer.unstashAll(ctx, stopped(Some(replyOnCommand), exceptions)) + replyTo.foreach(_ ! Done) + Behaviors.stopped case UnbindingFailure(exception) => ctx.log.error("action=unbind_server result=failure", exception) - stashBuffer.unstashAll(ctx, stopped(Some(replyOnCommand), exceptions :+ exception)) - case otherCommand: Command => - stashBuffer.stash(otherCommand) - Behaviors.same + replyTo.foreach(_ ! Done) + Behaviors.stopped + case _ => Behaviors.unhandled } } - def settingWebhook(binding: Http.ServerBinding, replyOnCommand: Command, exceptions: Seq[Throwable]): Behavior[Command] = Behaviors.setup[Command] { ctx => + def settingWebhook(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx => case object SetWebhookSuccess extends Command case class SetWebhookFailure(exception: Throwable) extends Command @@ -177,17 +136,18 @@ object TelegramBot { Behaviors.receiveMessage { case SetWebhookSuccess => ctx.log.info("action=set_webhook result=success") - stashBuffer.unstashAll(ctx, started(binding, replyOnCommand, exceptions)) + stashBuffer.unstashAll(ctx, started(binding)) case SetWebhookFailure(exception) => ctx.log.error("action=set_webhook result=failure", exception) - stashBuffer.unstashAll(ctx, unbindingServer(binding, replyOnCommand, exceptions :+ exception)) + ctx.log.error("action=start_bot result=failure") + unbindingServer(binding, None) case otherCommand: Command => stashBuffer.stash(otherCommand) Behaviors.same } } - def deletingWebhook(binding: Http.ServerBinding, replyOnCommand: Command, exceptions: Seq[Throwable]): Behavior[Command] = Behaviors.setup[Command] { ctx => + def deletingWebhook(binding: Http.ServerBinding, replyTo: ActorRef[Done]): Behavior[Command] = Behaviors.setup[Command] { ctx => case object DeleteWebhookSuccess extends Command case class DeleteWebhookFailure(exception: Throwable) extends Command @@ -210,44 +170,46 @@ object TelegramBot { Behaviors.receiveMessage { case DeleteWebhookSuccess => ctx.log.info("action=delete_webhook result=success") - stashBuffer.unstashAll(ctx, unbindingServer(binding, replyOnCommand, exceptions)) + unbindingServer(binding, Some(replyTo)) case DeleteWebhookFailure(exception) => ctx.log.error("action=delete_webhook result=failure", exception) - stashBuffer.unstashAll(ctx, unbindingServer(binding, replyOnCommand, exceptions :+ exception)) - case otherCommand: Command => - stashBuffer.stash(otherCommand) - Behaviors.same + unbindingServer(binding, Some(replyTo)) + case _ => Behaviors.unhandled } } - def started(binding: Http.ServerBinding, replyOnCommand: Command, exceptions: Seq[Throwable]): Behavior[Command] = Behaviors.setup[Command] { ctx => - reply(replyOnCommand, exceptions) + def started(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx => + ctx.log.info("action=start_bot result=success") Behaviors.receiveMessage[Command] { - case startCommand@Start(_) => - ctx.log.info("action=start_bot") - reply(startCommand, Seq.empty) - Behaviors.same - case stopCommand@Stop(_) => + case stopCommand@Stop(replyTo) => ctx.log.info("action=stop_bot") - deletingWebhook(binding, stopCommand, exceptions) + deletingWebhook(binding, replyTo) case _ => Behaviors.unhandled } } - stopped() + bindingServer } - private def botRoutes(hookId: String, updatesProcessor: ActorRef[UpdatesProcessor.Command]): Route = { + private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = { + import akka.actor.typed.scaladsl.AskPattern._ import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ import io.circe.generic.auto._ + implicit val timeout: Timeout = 10.seconds + path(hookId) { post { entity(as[Update]) { update => - updatesProcessor ! UpdatesProcessor.UpdateReceived(update) - complete(HttpResponse()) + onComplete(updatesProcessor.?[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) { + case Success(processResult) => processResult match { + case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK)) + case DialogManager.ProcessUpdateFailure(exception) => complete(HttpResponse(status = StatusCodes.InternalServerError)) + } + case Failure(exception) => complete(HttpResponse(status = StatusCodes.InternalServerError)) + } } } } diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/UpdatesProcessor.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/UpdatesProcessor.scala deleted file mode 100644 index 8a52bef..0000000 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/UpdatesProcessor.scala +++ /dev/null @@ -1,20 +0,0 @@ -package eu.xeppaka.bot - -import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.Behaviors -import eu.xeppaka.bot.TelegramEntities.Update - -object UpdatesProcessor { - sealed trait Command - sealed trait CommandResult - - case class UpdateReceived(update: Update) extends Command - - def behavior: Behavior[Command] = Behaviors.receive[Command] { (ctx, msg) => - msg match { - case UpdateReceived(update) => - ctx.log.info("action=update_received update={}", update) - Behaviors.same - } - } -}