Implemented delivery dialog basics and dialog manager

This commit is contained in:
Pavel Kachalouski
2018-10-23 21:25:54 +02:00
parent 38c18570ff
commit 08fc3f82e1
5 changed files with 184 additions and 106 deletions

View File

@@ -0,0 +1,53 @@
package eu.xeppaka.bot
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.http.scaladsl.model._
import akka.http.scaladsl.{Http, HttpExt}
import akka.util.ByteString
import eu.xeppaka.bot.TelegramEntities.{Message, SendMessage}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object CheckDeliveryDialog {
sealed trait Command
sealed trait CommandResult
case class ProcessMessage(msg: Message, replyTo: ActorRef[CommandResult]) extends Command
case object ProcessMessageSuccess extends CommandResult
case class ProcessMessageFailure(exception: Throwable) extends CommandResult
def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
implicit val executionContext: ExecutionContext = ctx.system.dispatchers.lookup(DispatcherSelector.default())
val http = Http()(ctx.system.toUntyped)
Behaviors.receiveMessage {
case ProcessMessage(msg, replyTo) =>
sendResponse(http, botUri, chatId, msg.text.get)
.onComplete {
case Success(response) =>
if (response.status.isSuccess()) {
replyTo ! ProcessMessageSuccess
} else {
replyTo ! ProcessMessageFailure(new RuntimeException(s"Error sending response. HTTP response code: ${response.status.value}."))
}
case Failure(exception) => replyTo ! ProcessMessageFailure(exception)
}
Behaviors.same
}
}
private def sendResponse(http: HttpExt, botUri: BotUri, chatId: Long, text: String): Future[HttpResponse] = {
import io.circe._
import io.circe.generic.auto._
import io.circe.syntax._
val sendMessage = SendMessage(chatId, text)
val printer = Printer.noSpaces.copy(dropNullValues = true)
val json = printer.pretty(sendMessage.asJson)
val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json)))
http.singleRequest(request)
}
}

View File

@@ -0,0 +1,78 @@
package eu.xeppaka.bot
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.util.Timeout
import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess}
import eu.xeppaka.bot.TelegramEntities.{Message, Update}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
object DialogManager {
sealed trait Command
sealed trait CommandResult
case class ProcessUpdate(update: Update, replyTo: ActorRef[CommandResult]) extends Command
case object ProcessUpdateSuccess extends CommandResult
case class ProcessUpdateFailure(exception: Throwable) extends CommandResult
// internal messages
private case class ReceptionistListingWrapper(chatId: Long, key: ServiceKey[CheckDeliveryDialog.Command], listing: Receptionist.Listing, msg: Message, replyTo: ActorRef[CommandResult]) extends Command
private case class DialogResponseSuccess(dialogId: String, replyTo: ActorRef[CommandResult]) extends Command
private case class DialogResponseFailure(dialogId: String, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command
def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.receiveMessage {
case ProcessUpdate(update, replyTo) =>
ctx.log.debug("action=update_received update={}", update)
if (update.message.isDefined) {
val chatId = update.message.get.chat.id
val dialogKey = ServiceKey[CheckDeliveryDialog.Command](chatId.toString)
val receptionistMapper: ActorRef[Receptionist.Listing] = ctx.messageAdapter(listing => ReceptionistListingWrapper(chatId, dialogKey, listing, update.message.get, replyTo))
ctx.log.debug(s"action=find_dialog id=${chatId.toString}")
ctx.system.receptionist ! Receptionist.Find(dialogKey, receptionistMapper)
}
Behaviors.same
case ReceptionistListingWrapper(chatId, dialogKey, listing, msg, replyTo) =>
listing match {
case dialogKey.Listing(dialogs) =>
if (dialogs.isEmpty) {
ctx.log.debug("action=find_dialog id={} result=not_found", dialogKey.id)
} else {
ctx.log.debug("action=find_dialog id={} result=found", dialogKey.id)
}
val dialog = dialogs.headOption.getOrElse({
val dialogActor = ctx.spawn(CheckDeliveryDialog.behavior(chatId, botUri), s"delivery-check-${dialogKey.id}")
ctx.system.receptionist ! Receptionist.Register(dialogKey, dialogActor)
dialogActor
})
ctx.log.info("action=ask_dialog id={}", dialogKey.id)
implicit val timeout: Timeout = 5.seconds
ctx.ask(dialog)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) {
case Success(ProcessMessageSuccess) => DialogResponseSuccess(dialogKey.id, replyTo)
case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(dialogKey.id, exception, replyTo)
case Failure(exception) => DialogResponseFailure(dialogKey.id, exception, replyTo)
}
}
Behaviors.same
case DialogResponseSuccess(dialogId, replyTo) =>
ctx.log.info("action=ask_dialog id={} result=success", dialogId)
replyTo ! ProcessUpdateSuccess
Behaviors.same
case DialogResponseFailure(dialogId, exception, replyTo) =>
ctx.log.error(exception, "action=ask_dialog id={} result=failure", dialogId)
replyTo ! ProcessUpdateFailure(exception)
Behaviors.same
}
}
}

View File

@@ -2,8 +2,11 @@ package eu.xeppaka.bot
import akka.actor.Scheduler
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorSystem, DispatcherSelector}
import eu.xeppaka.bot.TelegramBot._
import akka.http.scaladsl.Http
import akka.util.Timeout
import akka.{Done, actor}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
@@ -13,16 +16,18 @@ object Main {
def main(args: Array[String]): Unit = {
val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4"
val telegramBot = ActorSystem(TelegramBot.behavior(botId, "lenovo", 8443), "telegram-bot")
implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped
implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default())
implicit val scheduler: Scheduler = telegramBot.scheduler
val startFuture: Future[StartResult] = (telegramBot ? (ref => TelegramBot.Start(ref))) (10.seconds, scheduler)
implicit val timeout: Timeout = 10.seconds
println("Press enter to finish bot...")
StdIn.readLine()
val terminateFuture = startFuture
.flatMap { _ => (telegramBot ? (ref => TelegramBot.Stop(ref))) (10.seconds, scheduler) }
val stopFuture: Future[Done] = telegramBot ? (ref => TelegramBot.Stop(ref))
val terminateFuture = stopFuture
.andThen { case _ => Http().shutdownAllConnectionPools() }
.andThen { case _ => telegramBot.terminate() }
Await.ready(terminateFuture, 5.seconds)

View File

@@ -5,42 +5,38 @@ import java.security.{KeyStore, SecureRandom}
import java.util.UUID
import akka.Done
import akka.actor.ActorSystem
import akka.actor.{ActorSystem, Scheduler}
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives.{as, complete, entity, path, post}
import akka.http.scaladsl.server.Directives.{as, entity, onComplete, path, post, complete}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext}
import akka.stream.ActorMaterializer
import akka.util.ByteString
import akka.util.{ByteString, Timeout}
import eu.xeppaka.bot.TelegramEntities._
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.io.Source
import scala.util.{Failure, Success}
object TelegramBot {
sealed trait Command
sealed trait CommandResult
sealed trait StartResult extends CommandResult
sealed trait StopResult extends CommandResult
case object StartSuccess extends StartResult
case class StartFailure(exception: Throwable) extends StartResult
case object StopSuccess extends StopResult
case class StopFailure(exception: Throwable) extends StopResult
case class Start(replyTo: ActorRef[StartResult]) extends Command
case class Stop(replyTo: ActorRef[StopResult]) extends Command
case class Stop(replyTo: ActorRef[Done]) extends Command
case object GetBotInfo
case object GetWebhookInfo
def behavior(botId: String, interface: String, port: Int): Behavior[Command] = Behaviors.setup[Command] { ctx =>
ctx.log.info("action=start_bot")
implicit val untypedSystem: ActorSystem = ctx.system.toUntyped
implicit val actorMaterializer: ActorMaterializer = ActorMaterializer()
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
@@ -51,48 +47,10 @@ object TelegramBot {
val webhookUri = Uri(s"https://xeppaka.eu:88/$hookId")
val httpsContext = createHttpsConnectionContext
val stashBuffer = StashBuffer[Command](10)
val updatesProcessor = ctx.spawn(UpdatesProcessor.behavior, "updatesProcessor")
val routes = botRoutes(hookId, updatesProcessor)
val dialogManager = ctx.spawnAnonymous(DialogManager.behavior(botUri))
val routes = botRoutes(hookId, dialogManager)(untypedSystem.scheduler)
def reply(command: Command, exceptions: Seq[Throwable]): Unit = {
command match {
case Start(replyTo) =>
if (exceptions.isEmpty) {
ctx.log.info("action=start_bot result=success")
replyTo ! StartSuccess
} else {
ctx.log.error("action=start_bot result=failure", exceptions.head)
replyTo ! StartFailure(exceptions.head)
}
case Stop(replyTo) =>
if (exceptions.isEmpty) {
ctx.log.info("action=stop_bot result=success")
replyTo ! StopSuccess
} else {
ctx.log.error("action=stop_bot result=failure", exceptions.head)
replyTo ! StopFailure(exceptions.head)
}
case _ => throw new IllegalArgumentException(s"Unsupported command to reply: $command.")
}
}
def stopped(replyOnCommand: Option[Command] = None, exceptions: Seq[Throwable] = Seq.empty): Behavior[Command] = Behaviors.setup { ctx =>
replyOnCommand.foreach(reply(_, exceptions))
Behaviors.receiveMessage[Command] {
case startCommand@Start(_) =>
ctx.log.info("action=start_bot")
bindingServer(startCommand)
case stopCommand@Stop(_) =>
ctx.log.info("action=stop_bot")
reply(stopCommand, Seq.empty)
Behaviors.same
case _ =>
Behaviors.unhandled
}
}
def bindingServer(replyOnCommand: Command): Behavior[Command] = Behaviors.setup[Command] { ctx =>
def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx =>
case class BindingSuccess(binding: Http.ServerBinding) extends Command
case class BindingFailure(exception: Throwable) extends Command
@@ -108,17 +66,18 @@ object TelegramBot {
Behaviors.receiveMessage[Command] {
case BindingSuccess(binding) =>
ctx.log.info("action=bind_server result=success")
stashBuffer.unstashAll(ctx, settingWebhook(binding, replyOnCommand, Seq.empty))
settingWebhook(binding)
case BindingFailure(exception) =>
ctx.log.error("action=bind_server result=failure", exception)
stashBuffer.unstashAll(ctx, stopped(Some(replyOnCommand), Seq(exception)))
ctx.log.error("action=start_bot result=failure")
Behaviors.stopped
case otherCommand: Command =>
stashBuffer.stash(otherCommand)
Behaviors.same
}
}
def unbindingServer(binding: Http.ServerBinding, replyOnCommand: Command, exceptions: Seq[Throwable]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
def unbindingServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object UnbindingSuccess extends Command
case class UnbindingFailure(exception: Throwable) extends Command
@@ -134,17 +93,17 @@ object TelegramBot {
Behaviors.receiveMessage[Command] {
case UnbindingSuccess =>
ctx.log.info("action=unbind_server result=success")
stashBuffer.unstashAll(ctx, stopped(Some(replyOnCommand), exceptions))
replyTo.foreach(_ ! Done)
Behaviors.stopped
case UnbindingFailure(exception) =>
ctx.log.error("action=unbind_server result=failure", exception)
stashBuffer.unstashAll(ctx, stopped(Some(replyOnCommand), exceptions :+ exception))
case otherCommand: Command =>
stashBuffer.stash(otherCommand)
Behaviors.same
replyTo.foreach(_ ! Done)
Behaviors.stopped
case _ => Behaviors.unhandled
}
}
def settingWebhook(binding: Http.ServerBinding, replyOnCommand: Command, exceptions: Seq[Throwable]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
def settingWebhook(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object SetWebhookSuccess extends Command
case class SetWebhookFailure(exception: Throwable) extends Command
@@ -177,17 +136,18 @@ object TelegramBot {
Behaviors.receiveMessage {
case SetWebhookSuccess =>
ctx.log.info("action=set_webhook result=success")
stashBuffer.unstashAll(ctx, started(binding, replyOnCommand, exceptions))
stashBuffer.unstashAll(ctx, started(binding))
case SetWebhookFailure(exception) =>
ctx.log.error("action=set_webhook result=failure", exception)
stashBuffer.unstashAll(ctx, unbindingServer(binding, replyOnCommand, exceptions :+ exception))
ctx.log.error("action=start_bot result=failure")
unbindingServer(binding, None)
case otherCommand: Command =>
stashBuffer.stash(otherCommand)
Behaviors.same
}
}
def deletingWebhook(binding: Http.ServerBinding, replyOnCommand: Command, exceptions: Seq[Throwable]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
def deletingWebhook(binding: Http.ServerBinding, replyTo: ActorRef[Done]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object DeleteWebhookSuccess extends Command
case class DeleteWebhookFailure(exception: Throwable) extends Command
@@ -210,44 +170,46 @@ object TelegramBot {
Behaviors.receiveMessage {
case DeleteWebhookSuccess =>
ctx.log.info("action=delete_webhook result=success")
stashBuffer.unstashAll(ctx, unbindingServer(binding, replyOnCommand, exceptions))
unbindingServer(binding, Some(replyTo))
case DeleteWebhookFailure(exception) =>
ctx.log.error("action=delete_webhook result=failure", exception)
stashBuffer.unstashAll(ctx, unbindingServer(binding, replyOnCommand, exceptions :+ exception))
case otherCommand: Command =>
stashBuffer.stash(otherCommand)
Behaviors.same
unbindingServer(binding, Some(replyTo))
case _ => Behaviors.unhandled
}
}
def started(binding: Http.ServerBinding, replyOnCommand: Command, exceptions: Seq[Throwable]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
reply(replyOnCommand, exceptions)
def started(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx =>
ctx.log.info("action=start_bot result=success")
Behaviors.receiveMessage[Command] {
case startCommand@Start(_) =>
ctx.log.info("action=start_bot")
reply(startCommand, Seq.empty)
Behaviors.same
case stopCommand@Stop(_) =>
case stopCommand@Stop(replyTo) =>
ctx.log.info("action=stop_bot")
deletingWebhook(binding, stopCommand, exceptions)
deletingWebhook(binding, replyTo)
case _ =>
Behaviors.unhandled
}
}
stopped()
bindingServer
}
private def botRoutes(hookId: String, updatesProcessor: ActorRef[UpdatesProcessor.Command]): Route = {
private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = {
import akka.actor.typed.scaladsl.AskPattern._
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._
implicit val timeout: Timeout = 10.seconds
path(hookId) {
post {
entity(as[Update]) { update =>
updatesProcessor ! UpdatesProcessor.UpdateReceived(update)
complete(HttpResponse())
onComplete(updatesProcessor.?[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) {
case Success(processResult) => processResult match {
case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK))
case DialogManager.ProcessUpdateFailure(exception) => complete(HttpResponse(status = StatusCodes.InternalServerError))
}
case Failure(exception) => complete(HttpResponse(status = StatusCodes.InternalServerError))
}
}
}
}

View File

@@ -1,20 +0,0 @@
package eu.xeppaka.bot
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import eu.xeppaka.bot.TelegramEntities.Update
object UpdatesProcessor {
sealed trait Command
sealed trait CommandResult
case class UpdateReceived(update: Update) extends Command
def behavior: Behavior[Command] = Behaviors.receive[Command] { (ctx, msg) =>
msg match {
case UpdateReceived(update) =>
ctx.log.info("action=update_received update={}", update)
Behaviors.same
}
}
}