diff --git a/.idea/modules/telegram-bot.iml b/.idea/modules/telegram-bot.iml index f2e4e4b..6794500 100644 --- a/.idea/modules/telegram-bot.iml +++ b/.idea/modules/telegram-bot.iml @@ -15,39 +15,39 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/build.sbt b/build.sbt index 5b9baf5..f7aec26 100644 --- a/build.sbt +++ b/build.sbt @@ -25,10 +25,3 @@ lazy val `telegram-bot` = (project in file("telegram-bot")) circeAkkaHttp ) ) - -assemblyMergeStrategy in assembly := { - case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first - case x => - val oldStrategy = (assemblyMergeStrategy in assembly).value - oldStrategy(x) -} diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala index 4a99c9b..74b156c 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala @@ -153,7 +153,7 @@ object CzechPostDeliveryCheck { } case RemoveParcel(parcelId, replyTo) => val parcelIdUpper = parcelId.toUpperCase - if (state.parcelStates.keySet.contains(parcelIdUpper)) { + if (state.parcelStates.contains(parcelIdUpper)) { Effect .persist(ParcelRemoved(parcelIdUpper)) .thenRun(_ => replyTo ! CommandResultSuccess) @@ -164,40 +164,44 @@ object CzechPostDeliveryCheck { } case ListParcels(replyTo) => - val parcelsList = state.parcelStates.keySet Effect.none - .thenRun(_ => replyTo ! ListParcelsResult(parcelsList)) + .thenRun { state => + val parcelsList = state.parcelStates.keySet + replyTo ! ListParcelsResult(parcelsList) + } case CheckParcels => - ctx.log.info("action=check_parcel_state chat_id={}", chatId) - val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id")) + Effect + .none + .thenRun { _ => + ctx.log.info("action=check_parcel_state chat_id={}", chatId) + val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id")) - for (ids <- parcelIds) { - val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=en") - val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) + for (ids <- parcelIds) { + val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz") + val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) - ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri) + ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri) - http - .singleRequest(request, connectionContext = sslContext, settings = connectionSettings) - .transform { - case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) - case response: Failure[HttpResponse] => response + http + .singleRequest(request, connectionContext = sslContext, settings = connectionSettings) + .transform { + case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) + case response: Failure[HttpResponse] => response + } + .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) + .andThen { + case Success(parcelHistories) => + parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory)) + case Failure(exception) => + ctx.log.error(exception, "Error checking parcel history.") + } + .andThen { + case Success(_) => ctx.log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri) + case Failure(exception) => ctx.log.error(exception, "action=check_parcel_state result=failure chat_id={} check_uri={}", chatId, checkUri) + } } - .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) - .andThen { - case Success(parcelHistories) => - parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory)) - case Failure(exception) => - ctx.log.error(exception, "Error checking parcel history.") - } - .andThen { - case Success(_) => ctx.log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri) - case Failure(exception) => ctx.log.error(exception, "action=check_parcel_state result=failure chat_id={} check_uri={}", chatId, checkUri) - } - } - - Effect.none + } case ParcelHistoryRetrieved(parcelHistory) => val parcelId = parcelHistory.id val parcelState = state.parcelStates(parcelId) diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala index d64cb4c..d21eeaa 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala @@ -1,11 +1,13 @@ package eu.xeppaka.bot -import akka.actor.typed.receptionist.{Receptionist, ServiceKey} import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler} +import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior} import akka.util.Timeout import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess} -import eu.xeppaka.bot.TelegramEntities.{Message, Update} +import eu.xeppaka.bot.TelegramEntities.Update import scala.concurrent.duration._ import scala.util.{Failure, Success} @@ -19,60 +21,80 @@ object DialogManager { case class ProcessUpdateFailure(exception: Throwable) extends CommandResult // internal messages - private case class ReceptionistListingWrapper(chatId: Long, key: ServiceKey[CheckDeliveryDialog.Command], listing: Receptionist.Listing, msg: Message, replyTo: ActorRef[CommandResult]) extends Command - private case class DialogResponseSuccess(dialogId: String, replyTo: ActorRef[CommandResult]) extends Command - private case class DialogResponseFailure(dialogId: String, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command + private case class DialogResponseSuccess(dialogId: Long, replyTo: ActorRef[CommandResult]) extends Command + private case class DialogResponseFailure(dialogId: Long, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command + + sealed trait Event + private case class DialogAdded(chatId: Long) extends Event + + case class State(dialogs: Map[Long, ActorRef[CheckDeliveryDialog.Command]] = Map.empty) def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => + val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { + cmd match { + case ProcessUpdate(update, replyTo) => + if (update.message.isDefined) { + val chatId = update.message.get.chat.id - Behaviors.receiveMessage { - case ProcessUpdate(update, replyTo) => - ctx.log.debug("action=update_received update={}", update) - - if (update.message.isDefined) { - val chatId = update.message.get.chat.id - val dialogKey = ServiceKey[CheckDeliveryDialog.Command](chatId.toString) - val receptionistAdapter: ActorRef[Receptionist.Listing] = ctx.messageAdapter(listing => ReceptionistListingWrapper(chatId, dialogKey, listing, update.message.get, replyTo)) - ctx.log.debug(s"action=find_dialog id=${chatId.toString}") - - ctx.system.receptionist ! Receptionist.Find(dialogKey, receptionistAdapter) - } - - Behaviors.same - case ReceptionistListingWrapper(chatId, dialogKey, listing, msg, replyTo) => - listing match { - case dialogKey.Listing(dialogs) => - if (dialogs.isEmpty) { - ctx.log.debug("action=find_dialog id={} result=not_found", dialogKey.id) + val effect: Effect[Event, State] = if (state.dialogs.contains(chatId)) { + Effect.none } else { - ctx.log.debug("action=find_dialog id={} result=found", dialogKey.id) + Effect.persist(DialogAdded(chatId)) } - val dialog = dialogs.headOption.getOrElse({ - val dialogActor = ctx.spawn(CheckDeliveryDialog.behavior(chatId, botUri), s"delivery-check-${dialogKey.id}") - ctx.system.receptionist ! Receptionist.Register(dialogKey, dialogActor) - dialogActor - }) + effect + .thenRun(_ => ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get)) + .thenRun { state => + val msg = update.message.get + val dialogActor = state.dialogs(chatId) - ctx.log.info("action=ask_dialog id={}", dialogKey.id) + ctx.log.info("action=ask_dialog id={}", chatId) - implicit val timeout: Timeout = 20.seconds - ctx.ask(dialog)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) { - case Success(ProcessMessageSuccess) => DialogResponseSuccess(dialogKey.id, replyTo) - case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(dialogKey.id, exception, replyTo) - case Failure(exception) => DialogResponseFailure(dialogKey.id, exception, replyTo) + implicit val timeout: Timeout = 20.seconds + ctx.ask(dialogActor)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) { + case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo) + case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo) + case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo) + } } - } + } else { + Effect + .none + .thenRun { _ => + ctx.log.debug("action=process_update result=success message=update message is empty") + } + } - Behaviors.same - case DialogResponseSuccess(dialogId, replyTo) => - ctx.log.info("action=ask_dialog id={} result=success", dialogId) - replyTo ! ProcessUpdateSuccess - Behaviors.same - case DialogResponseFailure(dialogId, exception, replyTo) => - ctx.log.error(exception, "action=ask_dialog id={} result=failure", dialogId) - replyTo ! ProcessUpdateFailure(exception) - Behaviors.same + case DialogResponseSuccess(dialogId, replyTo) => + Effect + .none + .thenRun { _ => + ctx.log.info("action=ask_dialog id={} result=success", dialogId) + replyTo ! ProcessUpdateSuccess + } + case DialogResponseFailure(dialogId, exception, replyTo) => + Effect + .none + .thenRun { _ => + ctx.log.error(exception, "action=ask_dialog id={} result=failure", dialogId) + replyTo ! ProcessUpdateFailure(exception) + } + } } + + val eventHandler: EventHandler[State, Event] = (state, evt) => { + evt match { + case DialogAdded(chatId) => + val dialogActor = ctx.spawn(Behaviors.supervise(CheckDeliveryDialog.behavior(chatId, botUri)).onFailure(SupervisorStrategy.restart), s"delivery-check-$chatId") + state.copy(dialogs = state.dialogs.updated(chatId, dialogActor)) + } + } + + EventSourcedBehavior( + persistenceId = PersistenceId("dialog-manager"), + emptyState = State(), + commandHandler = commandHandler, + eventHandler = eventHandler + ) } } diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala index f745493..52a0e98 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala @@ -2,8 +2,9 @@ package eu.xeppaka.bot import akka.actor.Scheduler import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.{ActorSystem, DispatcherSelector} +import akka.actor.typed.{ActorSystem, DispatcherSelector, SupervisorStrategy} import akka.http.scaladsl.Http import akka.util.Timeout import akka.{Done, actor} @@ -14,9 +15,10 @@ import scala.io.StdIn object Main { def main(args: Array[String]): Unit = { - val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" // useless bot - //val botId = "693134480:AAE8JRXA6j1mkOKTaxapP6A-E4LPHRuiIf8" // delivery bot - val telegramBot = ActorSystem(TelegramBot.behavior(botId, "0.0.0.0", 8443), "telegram-bot") + //val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" // useless bot + val botId = "693134480:AAE8JRXA6j1mkOKTaxapP6A-E4LPHRuiIf8" // delivery bot + val botBehavior = Behaviors.supervise(TelegramBot.behavior(botId, "0.0.0.0", 88)).onFailure(SupervisorStrategy.restart) + val telegramBot = ActorSystem(botBehavior, "telegram-bot") implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default()) implicit val scheduler: Scheduler = telegramBot.scheduler diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala index fc6e781..99a0c2b 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala @@ -8,10 +8,10 @@ import akka.Done import akka.actor.{ActorSystem, Scheduler} import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.{Behaviors, StashBuffer} -import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} +import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector, SupervisorStrategy} import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ -import akka.http.scaladsl.server.Directives.{as, entity, onComplete, path, post, complete, extractLog} +import akka.http.scaladsl.server.Directives.{as, complete, entity, extractLog, onComplete, path, post} import akka.http.scaladsl.server.Route import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext} import akka.stream.ActorMaterializer @@ -44,10 +44,10 @@ object TelegramBot { val botUri = BotUri(botId) val http: HttpExt = Http() val hookId = UUID.randomUUID().toString - val webhookUri = Uri(s"https://xeppaka.eu:8443/$hookId") + val webhookUri = Uri(s"https://xeppaka.eu:88/$hookId") val httpsContext = createHttpsConnectionContext val stashBuffer = StashBuffer[Command](10) - val dialogManager = ctx.spawnAnonymous(DialogManager.behavior(botUri)) + val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart)) val routes = botRoutes(hookId, dialogManager)(untypedSystem.scheduler) def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx => diff --git a/telegram-bot/src/test/scala/eu/xeppaka/bot/JsonSpec.scala b/telegram-bot/src/test/scala/eu/xeppaka/bot/JsonSpec.scala deleted file mode 100644 index f70d725..0000000 --- a/telegram-bot/src/test/scala/eu/xeppaka/bot/JsonSpec.scala +++ /dev/null @@ -1,16 +0,0 @@ -package eu.xeppaka.bot - -import eu.xeppaka.bot.TelegramEntities._ -import io.circe.Printer -import io.circe.generic.auto._ -import io.circe.syntax._ -import org.scalatest.FlatSpec -import TelegramEntitiesDerivations._ - -class JsonSpec extends FlatSpec { - "blah" should "blah" in { - val keyboard = ReplyKeyboardRemove() - val message = SendMessage(100000, "Please enter command.", reply_markup = Some(keyboard)) - println(message.asJson.pretty(Printer.spaces2.copy(dropNullValues = true))) - } -}