Implemented basic chat with akka-typed

This commit is contained in:
Pavel Kachalouski
2018-05-20 22:50:31 +02:00
parent d1969766b1
commit 9e845d29c6
7 changed files with 188 additions and 63 deletions

View File

@@ -4,7 +4,10 @@ import java.io.InputStream
import java.security.{KeyStore, SecureRandom}
import java.util.UUID
import akka.actor.{ActorSystem, Props}
import akka.actor
import akka.actor.Scheduler
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorSystem, DispatcherSelector}
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
@@ -12,24 +15,28 @@ import akka.http.scaladsl.server.{Route, RouteResult}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext}
import akka.stream.ActorMaterializer
import akka.util.ByteString
import eu.xeppaka.bot1.actors.UpdateActor2
import eu.xeppaka.bot1.actors.UpdateActor2.ReceivedUpdate
import akka.util.{ByteString, Timeout}
import eu.xeppaka.bot1.actors.UpdateActor
import eu.xeppaka.bot1.actors.UpdateActor.UpdateResponse
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.io.{Source, StdIn}
import scala.util.{Failure, Success}
class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConnectionContext]) {
class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConnectionContext])(implicit val actorSystem: ActorSystem) {
import FailFastCirceSupport._
import io.circe.generic.auto._
import eu.xeppaka.bot1.TelegramEntities._
import io.circe.generic.auto._
private val botUri = BotUri(botId)
private implicit val updateSystem: ActorSystem[UpdateActor.UpdateCommand] = ActorSystem(UpdateActor.behavior, "telegram-bot")
private implicit val actorSystem: actor.ActorSystem = updateSystem.toUntyped
private implicit val materializer: ActorMaterializer = ActorMaterializer()
private implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
private implicit val executionContext: ExecutionContextExecutor = updateSystem.dispatchers.lookup(DispatcherSelector.default())
private val http: HttpExt = Http()
private val hookId = UUID.randomUUID().toString
@@ -38,15 +45,16 @@ class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConn
"pkcloud",
port,
connectionContext = httpsContext.getOrElse(http.defaultClientHttpsContext))
private val updateActor = actorSystem.actorOf(UpdateActor2.props(botUri, http))
println(s"webhook path: $webhookUri")
println(s"Webhook path: $webhookUri")
setWebhook()
def stop(): Unit = {
bindingFuture
.andThen { case _ => http.shutdownAllConnectionPools() }
.flatMap(_.unbind())
.onComplete(_ => actorSystem.terminate())
.flatMap(binding => deleteWebhook().map(_ => binding))
.flatMap(binding => http.shutdownAllConnectionPools().map(_ => binding))
.flatMap(binding => binding.unbind())
.onComplete(_ => updateSystem.terminate())
}
def printRequestMethodAndResponseStatus(req: HttpRequest)(res: RouteResult): Unit = {
@@ -58,22 +66,44 @@ class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConn
path(hookId) {
post {
entity(as[Update]) { update =>
handleWith(processUpdate)
handleWith(receivedUpdate)
}
}
}
}
def processUpdate(update: Update): HttpResponse = {
updateActor ! ReceivedUpdate(update)
HttpResponse()
private def receivedUpdate(update: Update): Future[HttpResponse] = {
import akka.actor.typed.scaladsl.AskPattern._
implicit val timeout: Timeout = 3.seconds
implicit val scheduler: Scheduler = updateSystem.scheduler
val result: Future[UpdateActor.UpdateResponse] = updateSystem ? (ref => UpdateActor.UpdateReceived(update, ref))
result.andThen {
case Success(response) => sendResponse(response.chatId, response.text)
case Failure(ex) => println("Failed to process message...")
}
result.map(res => HttpResponse()).fallbackTo(Future.successful(HttpResponse()))
}
private def sendResponse(chatId: Long, text: String) = {
import io.circe._, io.circe.generic.auto._, io.circe.syntax._
val sendMessage = SendMessage(chatId, text)
val printer = Printer.noSpaces.copy(dropNullValues = true)
val json = printer.pretty(sendMessage.asJson)
val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json)))
http.singleRequest(request)
}
def getBotInfo: Future[Response[GetMe]] = {
http.singleRequest(HttpRequest(uri = botUri.getMe)).flatMap(Unmarshal(_).to[Response[GetMe]])
}
def setWebhook(): Future[HttpResponse] = {
private def setWebhook(): Future[HttpResponse] = {
print("Setting webhook...")
val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString()))
val urlPart = Multipart.FormData.BodyPart.Strict("url", urlEntity)
@@ -86,13 +116,20 @@ class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConn
Marshal(setWebhookFormData)
.to[RequestEntity]
.flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity)))
// .flatMap(Unmarshal(_).to[Response[String]])
.andThen {
case Success(response) => println(s" ${response.status.value}")
case Failure(exception) => println(s" failed with exception: ${exception.getMessage}")
}
}
def deleteWebhook(): Future[Response[String]] = {
private def deleteWebhook(): Future[HttpResponse] = {
print("Deleting webhook...")
http
.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST))
.flatMap(Unmarshal(_).to[Response[String]])
.andThen {
case Success(response) => println(s" ${response.status.value}")
case Failure(exception) => println(s" failed with exception: ${exception.getMessage}")
}
}
def getWebhookInfo(): Future[Response[WebhookInfo]] = {
@@ -105,32 +142,14 @@ class TelegramBotServer(botId: String, port: Int, httpsContext: Option[HttpsConn
object TelegramBotServer {
private val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4"
def apply(port: Int, httpsContext: Option[HttpsConnectionContext])(implicit actorSystem: ActorSystem): TelegramBotServer = new TelegramBotServer(botId, port, httpsContext)(actorSystem)
def apply(port: Int, httpsContext: Option[HttpsConnectionContext]): TelegramBotServer = new TelegramBotServer(botId, port, httpsContext)
def main(args: Array[String]): Unit = {
val httpsContext = createHttpsConnectionContext
implicit val actorSystem: ActorSystem = ActorSystem("telegram-bot")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
val tbs = TelegramBotServer(88, Some(createHttpsConnectionContext))
tbs.setWebhook()
.flatMap(response => response.entity.toStrict(5 seconds))
.onComplete(entity => {
println(entity.get.data.utf8String)
entity.get.discardBytes()
})
// tbs
// .getWebhookInfo()
// .onComplete(println(_))
StdIn.readLine()
tbs.deleteWebhook()
.onComplete(r => tbs.stop())
tbs.stop()
}
def createHttpsConnectionContext: HttpsConnectionContext = {

View File

@@ -78,7 +78,7 @@ object TelegramEntities {
username: Option[String] = None,
language_code: Option[String] = None)
case class SendMessage(chat_id: Int,
case class SendMessage(chat_id: Long,
text: String,
parse_mode: Option[String] = None,
disable_web_page_preview: Option[Boolean] = None,
@@ -171,7 +171,7 @@ object TelegramEntities {
case class ChatPhoto(small_file_id: String, big_file_id: String)
case class Chat(id: Int,
case class Chat(id: Long,
`type`: String,
title: Option[String] = None,
username: Option[String] = None,

View File

@@ -0,0 +1,54 @@
package eu.xeppaka.bot1.actors
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
object ChatActor {
sealed trait ChatCommand {
def replyTo: ActorRef[Response]
}
case class Response(text: String)
case class Help(replyTo: ActorRef[Response]) extends ChatCommand
case class Start(replyTo: ActorRef[Response]) extends ChatCommand
case class MessageReceived(text: String, replyTo: ActorRef[Response]) extends ChatCommand
case class Stop(replyTo: ActorRef[Response]) extends ChatCommand
private val started: Behavior[ChatCommand] = Behaviors.receive { (ctx, msg) =>
msg match {
case MessageReceived(text, replyTo) =>
replyTo ! Response(s"Ok, you said: $text")
Behaviors.same
case Help(replyTo) =>
replyHelp(replyTo)
Behaviors.same
case Stop(replyTo) =>
replyTo ! Response("Bye, bye!")
initial
case _ =>
Behaviors.unhandled
}
}
private val initial: Behavior[ChatCommand] = Behaviors.receive { (ctx, msg) =>
msg match {
case Start(replyTo) =>
replyTo ! Response("You started. Try /help, motherfucker...")
started
case c: ChatCommand =>
c.replyTo ! Response("Only /start command is supported. Try it...")
Behaviors.same
}
}
private def replyHelp(replyTo: ActorRef[Response]): Unit = {
replyTo ! Response("No help is provided for such motherfuckers like you! But... ok, send /stop and we are free.")
}
val behavior: Behavior[ChatCommand] = initial
}

View File

@@ -1,18 +0,0 @@
package eu.xeppaka.bot1.actors
import java.util.UUID
import akka.actor.Actor
class DialogActor extends Actor {
private val dialogId = UUID.randomUUID()
private var userId: Option[Int] = None
override def receive: Receive = {
case 1 =>
}
}
object DialogActor {
case class Start()
}

View File

@@ -1,7 +1,78 @@
package eu.xeppaka.bot1.actors
import akka.NotUsed
import akka.actor.typed.receptionist.Receptionist.Find
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import eu.xeppaka.bot1.TelegramEntities
import eu.xeppaka.bot1.actors.ChatActor.{Help, MessageReceived, Start, Stop}
object UpdateActor {
final case class ProcessUpdate(update: TelegramEntities.Update)
object BotMessages {
val start = "/start"
val stop = "/stop"
val help = "/help"
}
sealed trait UpdateCommand
case class UpdateReceived(update: TelegramEntities.Update, replyTo: ActorRef[UpdateResponse]) extends UpdateCommand
case class UpdateResponse(chatId: Long, text: String)
val behavior: Behavior[UpdateCommand] = Behaviors.receive[UpdateCommand] { (ctx, msg) =>
msg match {
case UpdateReceived(receivedUpdate, replyTo) =>
if (receivedUpdate.message.isDefined) {
ctx.spawn(processMessage(ctx, receivedUpdate.message.get, replyTo), s"process-update-${receivedUpdate.update_id}")
}
Behaviors.same
}
}
private def processMessage(parentContext: ActorContext[UpdateCommand], message: TelegramEntities.Message, replyTo: ActorRef[UpdateResponse]): Behavior[NotUsed] = {
Behaviors.setup[AnyRef] { ctx =>
val chatId = message.chat.id
val chatKey = ServiceKey[ChatActor.ChatCommand](chatId.toString)
println(s"Sending Find to receptionist to find actor with id: $chatId")
ctx.system.receptionist ! Find(chatKey, ctx.self.narrow[Receptionist.Listing])
Behaviors.receive[AnyRef] { (ctx, msg) =>
msg match {
case chatKey.Listing(listing) =>
if (listing.isEmpty) {
println(s"Actor with id: $chatId not found")
} else {
println(s"Actor with id: $chatId is found")
}
val chat = listing.headOption
.getOrElse({
val chatActor = parentContext.spawn(ChatActor.behavior, chatId.toString)
ctx.system.receptionist ! Receptionist.Register(chatKey, chatActor)
chatActor
})
chat ! getChatMessage(message, ctx.self.narrow[ChatActor.Response])
Behaviors.same
case ChatActor.Response(text) =>
replyTo ! UpdateResponse(chatId, text)
Behaviors.stopped
}
}
}
}.narrow[NotUsed]
private def getChatMessage(message: TelegramEntities.Message, replyTo: ActorRef[ChatActor.Response]): ChatActor.ChatCommand = {
import BotMessages._
message.text.getOrElse(help) match {
case `start` => Start(replyTo)
case `stop` => Stop(replyTo)
case `help` => Help(replyTo)
case msgText@_ => MessageReceived(msgText, replyTo)
}
}
}