From 9e845d29c6b4ee3bc5e531482ef6598e022cb28d Mon Sep 17 00:00:00 2001 From: Pavel Kachalouski Date: Sun, 20 May 2018 22:50:31 +0200 Subject: [PATCH] Implemented basic chat with akka-typed --- .idea/modules/root-build.iml | 2 +- .idea/modules/root.iml | 1 - .../eu/xeppaka/bot1/TelegramBotServer.scala | 99 +++++++++++-------- .../eu/xeppaka/bot1/TelegramEntities.scala | 4 +- .../eu/xeppaka/bot1/actors/ChatActor.scala | 54 ++++++++++ .../eu/xeppaka/bot1/actors/DialogActor.scala | 18 ---- .../eu/xeppaka/bot1/actors/UpdateActor.scala | 73 +++++++++++++- 7 files changed, 188 insertions(+), 63 deletions(-) create mode 100644 src/main/scala/eu/xeppaka/bot1/actors/ChatActor.scala delete mode 100644 src/main/scala/eu/xeppaka/bot1/actors/DialogActor.scala diff --git a/.idea/modules/root-build.iml b/.idea/modules/root-build.iml index a93460e..4c662cd 100644 --- a/.idea/modules/root-build.iml +++ b/.idea/modules/root-build.iml @@ -1,5 +1,5 @@ - + diff --git a/.idea/modules/root.iml b/.idea/modules/root.iml index 351b328..a232ccc 100644 --- a/.idea/modules/root.iml +++ b/.idea/modules/root.iml @@ -6,7 +6,6 @@ - diff --git a/src/main/scala/eu/xeppaka/bot1/TelegramBotServer.scala b/src/main/scala/eu/xeppaka/bot1/TelegramBotServer.scala index aa6ef02..a28efa2 100644 --- a/src/main/scala/eu/xeppaka/bot1/TelegramBotServer.scala +++ b/src/main/scala/eu/xeppaka/bot1/TelegramBotServer.scala @@ -4,7 +4,10 @@ import java.io.InputStream import java.security.{KeyStore, SecureRandom} import java.util.UUID -import akka.actor.{ActorSystem, Props} +import akka.actor +import akka.actor.Scheduler +import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.{ActorSystem, DispatcherSelector} import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ @@ -12,24 +15,28 @@ import akka.http.scaladsl.server.{Route, RouteResult} import akka.http.scaladsl.unmarshalling.Unmarshal import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext} import akka.stream.ActorMaterializer -import akka.util.ByteString -import eu.xeppaka.bot1.actors.UpdateActor2 -import eu.xeppaka.bot1.actors.UpdateActor2.ReceivedUpdate +import akka.util.{ByteString, Timeout} +import eu.xeppaka.bot1.actors.UpdateActor +import eu.xeppaka.bot1.actors.UpdateActor.UpdateResponse import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{ExecutionContextExecutor, Future} import scala.io.{Source, StdIn} +import scala.util.{Failure, Success} + +class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConnectionContext]) { -class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConnectionContext])(implicit val actorSystem: ActorSystem) { import FailFastCirceSupport._ - import io.circe.generic.auto._ import eu.xeppaka.bot1.TelegramEntities._ + import io.circe.generic.auto._ private val botUri = BotUri(botId) + private implicit val updateSystem: ActorSystem[UpdateActor.UpdateCommand] = ActorSystem(UpdateActor.behavior, "telegram-bot") + private implicit val actorSystem: actor.ActorSystem = updateSystem.toUntyped private implicit val materializer: ActorMaterializer = ActorMaterializer() - private implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher + private implicit val executionContext: ExecutionContextExecutor = updateSystem.dispatchers.lookup(DispatcherSelector.default()) private val http: HttpExt = Http() private val hookId = UUID.randomUUID().toString @@ -38,15 +45,16 @@ class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConn "pkcloud", port, connectionContext = httpsContext.getOrElse(http.defaultClientHttpsContext)) - private val updateActor = actorSystem.actorOf(UpdateActor2.props(botUri, http)) - println(s"webhook path: $webhookUri") + println(s"Webhook path: $webhookUri") + setWebhook() def stop(): Unit = { bindingFuture - .andThen { case _ => http.shutdownAllConnectionPools() } - .flatMap(_.unbind()) - .onComplete(_ => actorSystem.terminate()) + .flatMap(binding => deleteWebhook().map(_ => binding)) + .flatMap(binding => http.shutdownAllConnectionPools().map(_ => binding)) + .flatMap(binding => binding.unbind()) + .onComplete(_ => updateSystem.terminate()) } def printRequestMethodAndResponseStatus(req: HttpRequest)(res: RouteResult): Unit = { @@ -58,22 +66,44 @@ class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConn path(hookId) { post { entity(as[Update]) { update => - handleWith(processUpdate) + handleWith(receivedUpdate) } } } } - def processUpdate(update: Update): HttpResponse = { - updateActor ! ReceivedUpdate(update) - HttpResponse() + private def receivedUpdate(update: Update): Future[HttpResponse] = { + import akka.actor.typed.scaladsl.AskPattern._ + + implicit val timeout: Timeout = 3.seconds + implicit val scheduler: Scheduler = updateSystem.scheduler + + val result: Future[UpdateActor.UpdateResponse] = updateSystem ? (ref => UpdateActor.UpdateReceived(update, ref)) + + result.andThen { + case Success(response) => sendResponse(response.chatId, response.text) + case Failure(ex) => println("Failed to process message...") + } + + result.map(res => HttpResponse()).fallbackTo(Future.successful(HttpResponse())) + } + + private def sendResponse(chatId: Long, text: String) = { + import io.circe._, io.circe.generic.auto._, io.circe.syntax._ + + val sendMessage = SendMessage(chatId, text) + val printer = Printer.noSpaces.copy(dropNullValues = true) + val json = printer.pretty(sendMessage.asJson) + val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json))) + http.singleRequest(request) } def getBotInfo: Future[Response[GetMe]] = { http.singleRequest(HttpRequest(uri = botUri.getMe)).flatMap(Unmarshal(_).to[Response[GetMe]]) } - def setWebhook(): Future[HttpResponse] = { + private def setWebhook(): Future[HttpResponse] = { + print("Setting webhook...") val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString())) val urlPart = Multipart.FormData.BodyPart.Strict("url", urlEntity) @@ -86,13 +116,20 @@ class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConn Marshal(setWebhookFormData) .to[RequestEntity] .flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))) - // .flatMap(Unmarshal(_).to[Response[String]]) + .andThen { + case Success(response) => println(s" ${response.status.value}") + case Failure(exception) => println(s" failed with exception: ${exception.getMessage}") + } } - def deleteWebhook(): Future[Response[String]] = { + private def deleteWebhook(): Future[HttpResponse] = { + print("Deleting webhook...") http .singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)) - .flatMap(Unmarshal(_).to[Response[String]]) + .andThen { + case Success(response) => println(s" ${response.status.value}") + case Failure(exception) => println(s" failed with exception: ${exception.getMessage}") + } } def getWebhookInfo(): Future[Response[WebhookInfo]] = { @@ -105,32 +142,14 @@ class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConn object TelegramBotServer { private val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" - def apply(port: Int, httpsContext: Option[HttpsConnectionContext])(implicit actorSystem: ActorSystem): TelegramBotServer = new TelegramBotServer(botId, port, httpsContext)(actorSystem) + def apply(port: Int, httpsContext: Option[HttpsConnectionContext]): TelegramBotServer = new TelegramBotServer(botId, port, httpsContext) def main(args: Array[String]): Unit = { val httpsContext = createHttpsConnectionContext - - implicit val actorSystem: ActorSystem = ActorSystem("telegram-bot") - implicit val materializer: ActorMaterializer = ActorMaterializer() - implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher - val tbs = TelegramBotServer(88, Some(createHttpsConnectionContext)) - tbs.setWebhook() - .flatMap(response => response.entity.toStrict(5 seconds)) - .onComplete(entity => { - println(entity.get.data.utf8String) - entity.get.discardBytes() - }) - - // tbs - // .getWebhookInfo() - // .onComplete(println(_)) - StdIn.readLine() - - tbs.deleteWebhook() - .onComplete(r => tbs.stop()) + tbs.stop() } def createHttpsConnectionContext: HttpsConnectionContext = { diff --git a/src/main/scala/eu/xeppaka/bot1/TelegramEntities.scala b/src/main/scala/eu/xeppaka/bot1/TelegramEntities.scala index e081e94..7ab6908 100644 --- a/src/main/scala/eu/xeppaka/bot1/TelegramEntities.scala +++ b/src/main/scala/eu/xeppaka/bot1/TelegramEntities.scala @@ -78,7 +78,7 @@ object TelegramEntities { username: Option[String] = None, language_code: Option[String] = None) - case class SendMessage(chat_id: Int, + case class SendMessage(chat_id: Long, text: String, parse_mode: Option[String] = None, disable_web_page_preview: Option[Boolean] = None, @@ -171,7 +171,7 @@ object TelegramEntities { case class ChatPhoto(small_file_id: String, big_file_id: String) - case class Chat(id: Int, + case class Chat(id: Long, `type`: String, title: Option[String] = None, username: Option[String] = None, diff --git a/src/main/scala/eu/xeppaka/bot1/actors/ChatActor.scala b/src/main/scala/eu/xeppaka/bot1/actors/ChatActor.scala new file mode 100644 index 0000000..b7381d8 --- /dev/null +++ b/src/main/scala/eu/xeppaka/bot1/actors/ChatActor.scala @@ -0,0 +1,54 @@ +package eu.xeppaka.bot1.actors + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ActorRef, Behavior} + +object ChatActor { + + sealed trait ChatCommand { + def replyTo: ActorRef[Response] + } + + case class Response(text: String) + + case class Help(replyTo: ActorRef[Response]) extends ChatCommand + + case class Start(replyTo: ActorRef[Response]) extends ChatCommand + + case class MessageReceived(text: String, replyTo: ActorRef[Response]) extends ChatCommand + + case class Stop(replyTo: ActorRef[Response]) extends ChatCommand + + private val started: Behavior[ChatCommand] = Behaviors.receive { (ctx, msg) => + msg match { + case MessageReceived(text, replyTo) => + replyTo ! Response(s"Ok, you said: $text") + Behaviors.same + case Help(replyTo) => + replyHelp(replyTo) + Behaviors.same + case Stop(replyTo) => + replyTo ! Response("Bye, bye!") + initial + case _ => + Behaviors.unhandled + } + } + + private val initial: Behavior[ChatCommand] = Behaviors.receive { (ctx, msg) => + msg match { + case Start(replyTo) => + replyTo ! Response("You started. Try /help, motherfucker...") + started + case c: ChatCommand => + c.replyTo ! Response("Only /start command is supported. Try it...") + Behaviors.same + } + } + + private def replyHelp(replyTo: ActorRef[Response]): Unit = { + replyTo ! Response("No help is provided for such motherfuckers like you! But... ok, send /stop and we are free.") + } + + val behavior: Behavior[ChatCommand] = initial +} diff --git a/src/main/scala/eu/xeppaka/bot1/actors/DialogActor.scala b/src/main/scala/eu/xeppaka/bot1/actors/DialogActor.scala deleted file mode 100644 index 3142b1b..0000000 --- a/src/main/scala/eu/xeppaka/bot1/actors/DialogActor.scala +++ /dev/null @@ -1,18 +0,0 @@ -package eu.xeppaka.bot1.actors - -import java.util.UUID - -import akka.actor.Actor - -class DialogActor extends Actor { - private val dialogId = UUID.randomUUID() - private var userId: Option[Int] = None - - override def receive: Receive = { - case 1 => - } -} - -object DialogActor { - case class Start() -} diff --git a/src/main/scala/eu/xeppaka/bot1/actors/UpdateActor.scala b/src/main/scala/eu/xeppaka/bot1/actors/UpdateActor.scala index 601156d..5b603c2 100644 --- a/src/main/scala/eu/xeppaka/bot1/actors/UpdateActor.scala +++ b/src/main/scala/eu/xeppaka/bot1/actors/UpdateActor.scala @@ -1,7 +1,78 @@ package eu.xeppaka.bot1.actors +import akka.NotUsed +import akka.actor.typed.receptionist.Receptionist.Find +import akka.actor.typed.receptionist.{Receptionist, ServiceKey} +import akka.actor.typed.scaladsl.{ActorContext, Behaviors} +import akka.actor.typed.{ActorRef, Behavior} import eu.xeppaka.bot1.TelegramEntities +import eu.xeppaka.bot1.actors.ChatActor.{Help, MessageReceived, Start, Stop} object UpdateActor { - final case class ProcessUpdate(update: TelegramEntities.Update) + object BotMessages { + val start = "/start" + val stop = "/stop" + val help = "/help" + } + + sealed trait UpdateCommand + case class UpdateReceived(update: TelegramEntities.Update, replyTo: ActorRef[UpdateResponse]) extends UpdateCommand + case class UpdateResponse(chatId: Long, text: String) + + val behavior: Behavior[UpdateCommand] = Behaviors.receive[UpdateCommand] { (ctx, msg) => + msg match { + case UpdateReceived(receivedUpdate, replyTo) => + if (receivedUpdate.message.isDefined) { + ctx.spawn(processMessage(ctx, receivedUpdate.message.get, replyTo), s"process-update-${receivedUpdate.update_id}") + } + + Behaviors.same + } + } + + private def processMessage(parentContext: ActorContext[UpdateCommand], message: TelegramEntities.Message, replyTo: ActorRef[UpdateResponse]): Behavior[NotUsed] = { + Behaviors.setup[AnyRef] { ctx => + val chatId = message.chat.id + val chatKey = ServiceKey[ChatActor.ChatCommand](chatId.toString) + + println(s"Sending Find to receptionist to find actor with id: $chatId") + + ctx.system.receptionist ! Find(chatKey, ctx.self.narrow[Receptionist.Listing]) + + Behaviors.receive[AnyRef] { (ctx, msg) => + msg match { + case chatKey.Listing(listing) => + if (listing.isEmpty) { + println(s"Actor with id: $chatId not found") + } else { + println(s"Actor with id: $chatId is found") + } + + val chat = listing.headOption + .getOrElse({ + val chatActor = parentContext.spawn(ChatActor.behavior, chatId.toString) + ctx.system.receptionist ! Receptionist.Register(chatKey, chatActor) + chatActor + }) + + chat ! getChatMessage(message, ctx.self.narrow[ChatActor.Response]) + Behaviors.same + case ChatActor.Response(text) => + replyTo ! UpdateResponse(chatId, text) + Behaviors.stopped + } + } + } + }.narrow[NotUsed] + + private def getChatMessage(message: TelegramEntities.Message, replyTo: ActorRef[ChatActor.Response]): ChatActor.ChatCommand = { + import BotMessages._ + + message.text.getOrElse(help) match { + case `start` => Start(replyTo) + case `stop` => Stop(replyTo) + case `help` => Help(replyTo) + case msgText@_ => MessageReceived(msgText, replyTo) + } + } }