From 1ddefef6a6b8001e9cd8d40702215a5b1cba6785 Mon Sep 17 00:00:00 2001 From: Pavel Kachalouski Date: Sun, 28 Oct 2018 16:15:47 +0100 Subject: [PATCH] First working version with czech post checks --- build.sbt | 2 +- .../xeppaka/bot/CheckCzechPostDelivery.scala | 156 ------------ .../eu/xeppaka/bot/CheckDeliveryDialog.scala | 181 +++++++++++--- .../xeppaka/bot/CzechPostDeliveryCheck.scala | 225 ++++++++++++++++++ .../scala/eu/xeppaka/bot/DialogManager.scala | 2 +- .../src/main/scala/eu/xeppaka/bot/Main.scala | 7 +- .../scala/eu/xeppaka/bot/TelegramBot.scala | 4 +- 7 files changed, 384 insertions(+), 193 deletions(-) delete mode 100644 telegram-bot/src/main/scala/eu/xeppaka/bot/CheckCzechPostDelivery.scala create mode 100644 telegram-bot/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala diff --git a/build.sbt b/build.sbt index 4be51bc..819586c 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ lazy val commonSettings = Seq( organization := "com.example", scalaVersion := "2.12.7", version := "0.1.0-SNAPSHOT", - mainClass := Some("eu.xeppaka.bot.TelegramBotServer") + mainClass := Some("eu.xeppaka.bot.Main") ) inThisBuild(commonSettings) diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckCzechPostDelivery.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckCzechPostDelivery.scala deleted file mode 100644 index e5db461..0000000 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckCzechPostDelivery.scala +++ /dev/null @@ -1,156 +0,0 @@ -package eu.xeppaka.bot - -import java.security.cert.X509Certificate - -import akka.actor.ActorSystem -import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler} -import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} -import akka.http.scaladsl.UseHttp2.Negotiated -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.{Accept, `User-Agent`} -import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.http.scaladsl.{Http, HttpsConnectionContext} -import akka.persistence.typed.scaladsl.PersistentBehaviors.{CommandHandler, EventHandler} -import akka.persistence.typed.scaladsl.{Effect, PersistentBehaviors} -import akka.stream.ActorMaterializer -import com.typesafe.sslconfig.akka.AkkaSSLConfig -import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ -import io.circe.generic.auto._ -import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager} - -import scala.collection.immutable -import scala.concurrent.ExecutionContextExecutor -import scala.concurrent.duration._ -import scala.util.{Failure, Success} - -object Entities { - case class Attributes( - parcelType: String, - weight: Double, - currency: String, - ) - - case class State( - id: String, - date: String, - text: String, - postcode: Option[String], - postoffice: Option[String], - idIcon: Option[String], - publicAccess: Int, - latitude: Option[Double], - longitude: Option[Double], - timeDeliveryAttempt: Option[String] - ) - - case class States(state: Seq[State]) - - case class ParcelHistory(id: String, attributes: Attributes, states: States) -} - -object CheckCzechPostDelivery { - sealed trait Command - sealed trait Event - case class State( - attributes: Option[Entities.Attributes] = None, - states: Set[Entities.State] = Set.empty - ) - - private case object CheckParcel extends Command - private case class ParcelHistoryData(data: Entities.ParcelHistory) extends Command - case class DeliveryStateChanged(state: String) - - case class HistoryStateAdded(state: Entities.State) extends Event - case class AttributesChanged(attributes: Entities.Attributes) extends Event - - private val trustfulSslContext: SSLContext = { - object NoCheckX509TrustManager extends X509TrustManager { - override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = () - override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = () - override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]() - } - - val context = SSLContext.getInstance("TLS") - context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null) - context - } - - def behavior(chatId: String, parcelId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.setup[Command] { ctx => - Behaviors.withTimers(scheduler => checkParcel(chatId, parcelId, stateReporter, scheduler)) - } - - private def checkParcel(chatId: String, parcelId: String, stateReporter: ActorRef[DeliveryStateChanged], scheduler: TimerScheduler[Command]): Behavior[Command] = Behaviors.setup { ctx => - implicit val actorSystem: ActorSystem = ctx.system.toUntyped - implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) - implicit val materializer: ActorMaterializer = ActorMaterializer() - val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$parcelId&language=en") - val http = Http() - val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) - val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose - .withAcceptAnyCertificate(true) - .withDisableHostnameVerification(true))) - val originalCtx = http.createClientHttpsContext(badSslConfig) - val sslContext = new HttpsConnectionContext( - trustfulSslContext, - originalCtx.sslConfig, - originalCtx.enabledCipherSuites, - originalCtx.enabledProtocols, - originalCtx.clientAuth, - originalCtx.sslParameters, - Negotiated - ) - val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"))) - val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings) - - scheduler.startPeriodicTimer("check-delivery-state", CheckParcel, 5.seconds) - - val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { - cmd match { - case CheckParcel => - http - .singleRequest(request, connectionContext = sslContext, settings = connectionSettings) - .transform { - case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) - case response: Failure[HttpResponse] => response - } - .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) - .andThen { - case Success(parcelHistory) => ctx.self ! ParcelHistoryData(parcelHistory.head) - case Failure(exception) => - ctx.log.error(exception, "Error checking parcel history.") - } - - Effect.none - case ParcelHistoryData(parcelHistory) => - val attributesEvent = (if (state.attributes.isEmpty) - Some(parcelHistory.attributes) - else - state.attributes.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None)) - .map(AttributesChanged.apply).to[collection.immutable.Seq] - - val newStates = parcelHistory.states.state.toSet -- state.states - val stateEvents: Seq[Event] = newStates.map(HistoryStateAdded.apply).to[collection.immutable.Seq] - - Effect - .persist(attributesEvent ++ stateEvents) - .thenRun(_ => stateReporter ! DeliveryStateChanged(newStates.toString())) - } - } - - val eventHandler: EventHandler[State, Event] = (state, evt) => { - evt match { - case HistoryStateAdded(newState) => state.copy(states = state.states + newState) - case AttributesChanged(newAttributes) => state.copy(attributes = Some(newAttributes)) - } - } - - PersistentBehaviors.receive[Command, Event, State]( - persistenceId = parcelId, - emptyState = State(), - commandHandler = commandHandler, - eventHandler = eventHandler - ) - } -} diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala index 67e406f..85e6a09 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala @@ -1,62 +1,183 @@ package eu.xeppaka.bot -import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.scaladsl.{Behaviors, StashBuffer} import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} +import akka.http.scaladsl.Http import akka.http.scaladsl.model._ -import akka.http.scaladsl.{Http, HttpExt} -import akka.util.ByteString +import akka.util.{ByteString, Timeout} import eu.xeppaka.bot.TelegramEntities.{Message, SendMessage} -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ import scala.util.{Failure, Success} object CheckDeliveryDialog { sealed trait Command sealed trait CommandResult + sealed trait DialogCommand extends Command case class ProcessMessage(msg: Message, replyTo: ActorRef[CommandResult]) extends Command case object ProcessMessageSuccess extends CommandResult case class ProcessMessageFailure(exception: Throwable) extends CommandResult + case object AddParcel extends DialogCommand + case object RemoveParcel extends DialogCommand + case object ListParcels extends DialogCommand + case object Help extends DialogCommand + + object DialogCommand { + def apply(msg: String, replyTo: ActorRef[CommandResult]): Option[DialogCommand] = msg match { + case "/add" => Some(AddParcel) + case "/remove" => Some(RemoveParcel) + case "/list" => Some(ListParcels) + case "/help" => Some(Help) + case _ => None + } + } + // internal messages private case class DeliveryStateChanged(state: String) extends Command def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => implicit val executionContext: ExecutionContext = ctx.system.dispatchers.lookup(DispatcherSelector.default()) val http = Http()(ctx.system.toUntyped) - val deliveryStateAdapter: ActorRef[CheckCzechPostDelivery.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state)) + val stashBuffer = StashBuffer[Command](100) + val deliveryStateAdapter: ActorRef[CzechPostDeliveryCheck.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state)) + val czechPostDeliveryCheck = ctx.spawnAnonymous(CzechPostDeliveryCheck.behavior(chatId.toString, deliveryStateAdapter)) - Behaviors.receiveMessage { + def initial: Behavior[Command] = Behaviors.receiveMessage { case ProcessMessage(msg, replyTo) => - sendMessage(http, botUri, chatId, msg.text.get) - .onComplete { - case Success(response) => - if (response.status.isSuccess()) { - replyTo ! ProcessMessageSuccess - } else { - replyTo ! ProcessMessageFailure(new RuntimeException(s"Error sending response. HTTP response code: ${response.status.value}.")) - } - case Failure(exception) => replyTo ! ProcessMessageFailure(exception) - } + val command = DialogCommand(msg.text.getOrElse("unknown message"), replyTo) + replyTo ! ProcessMessageSuccess - ctx.spawnAnonymous(CheckCzechPostDelivery.behavior(chatId.toString, "RR541190869CZ", deliveryStateAdapter)) - Behaviors.same + if (command.isDefined) { + ctx.self ! command.get + Behaviors.same + } else { + sendMessage("This command is unsupported.", initial, initial) + } + case AddParcel => + sendMessage("Please enter parcel ID.", waitParcelId(parcelId => addParcel(parcelId)), initial) + case RemoveParcel => + sendMessage("Please enter parcel ID.", waitParcelId(parcelId => removeParcel(parcelId)), initial) + case ListParcels => sendMessage("This command is not supported yet.", initial, initial) + case Help => + sendMessage("Supported commands: /add, /remove, /list, /help", initial, initial) case DeliveryStateChanged(state) => - sendMessage(http, botUri, chatId, state) - Behaviors.same + sendMessage(state, initial, initial) + case _ => + Behaviors.unhandled } - } - private def sendMessage(http: HttpExt, botUri: BotUri, chatId: Long, text: String): Future[HttpResponse] = { - import io.circe._ - import io.circe.generic.auto._ - import io.circe.syntax._ + def addParcel(parcelId: String): Behavior[Command] = Behaviors.setup { ctx => + case object AddParcelSuccess extends Command + case class AddParcelFailure(exception: Throwable) extends Command + implicit val timeout: Timeout = 5.seconds - val sendMessage = SendMessage(chatId, text) - val printer = Printer.noSpaces.copy(dropNullValues = true) - val json = printer.pretty(sendMessage.asJson) - val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json))) - http.singleRequest(request) + ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.AddParcel(parcelId, ref)) { + case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess + case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception) + case Failure(exception) => AddParcelFailure(exception) + } + + Behaviors.receiveMessage { + case AddParcelSuccess => + sendMessage(s"Parcel $parcelId was added to the watch list.", initial, initial) + case AddParcelFailure(exception) => + exception match { + case CzechPostDeliveryCheck.DuplicateParcelId(_) => + sendMessage(s"Parcel $parcelId is in the watch list already.", initial, initial) + case _ => + ctx.log.error(exception, "action=add_parcel result=failure") + sendMessage(s"Adding parcel failed. Please try again.", initial, initial) + } + case otherMessage => + stashBuffer.stash(otherMessage) + Behaviors.same + } + } + + def removeParcel(parcelId: String): Behavior[Command] = Behaviors.setup { ctx => + case object RemoveParcelSuccess extends Command + case class RemoveParcelFailure(exception: Throwable) extends Command + implicit val timeout: Timeout = 5.seconds + + ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) { + case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess + case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception) + case Failure(exception) => RemoveParcelFailure(exception) + } + + Behaviors.receiveMessage { + case RemoveParcelSuccess => + sendMessage(s"Parcel $parcelId was removed from the watch list.", initial, initial) + case RemoveParcelFailure(exception) => + exception match { + case CzechPostDeliveryCheck.ParcelIdNotFound(_) => + sendMessage(s"Parcel $parcelId is not found in the list of the watched parcels.", initial, initial) + case _ => + ctx.log.error(exception, "action=add_parcel result=failure") + sendMessage(s"Remove of the parcel failed. Please try again.", initial, initial) + } + case otherMessage => + stashBuffer.stash(otherMessage) + Behaviors.same + } + } + + def waitParcelId(onSuccess: String => Behavior[Command]): Behavior[Command] = Behaviors.setup[Command] { ctx => + Behaviors.receiveMessage { + case ProcessMessage(msg, replyTo) => + if (msg.text.isDefined) { + val parcelId = msg.text.get + replyTo ! ProcessMessageSuccess + onSuccess(parcelId) + } else { + replyTo ! ProcessMessageSuccess + waitParcelId(onSuccess) + } + case otherMsg => + stashBuffer.stash(otherMsg) + Behaviors.same + } + } + + def sendMessage(text: String, onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] = Behaviors.setup[Command] { ctx => + import io.circe._ + import io.circe.generic.auto._ + import io.circe.syntax._ + + case object SendMessageSuccess extends Command + case class SendMessageFailure(exception: Throwable) extends Command + + val sendMessage = SendMessage(chatId, text, Some("Markdown")) + val printer = Printer.noSpaces.copy(dropNullValues = true) + val json = printer.pretty(sendMessage.asJson) + val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json))) + http + .singleRequest(request) + .onComplete { + case Success(response) => if (response.status.isSuccess()) { + ctx.self ! SendMessageSuccess + } else { + ctx.self ! SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}.")) + } + case Failure(exception) => ctx.self ! SendMessageFailure(exception) + } + + Behaviors.receiveMessage { + case SendMessageSuccess => + stashBuffer.unstashAll(ctx, onSuccess) + case SendMessageFailure(exception) => + ctx.log.error(exception, "action=send_message result=failure") + stashBuffer.unstashAll(ctx, onFailure) + case otherMsg => + stashBuffer.stash(otherMsg) + Behaviors.same + } + } + + initial } } diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala new file mode 100644 index 0000000..318ae9e --- /dev/null +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala @@ -0,0 +1,225 @@ +package eu.xeppaka.bot + +import java.security.cert.X509Certificate + +import akka.actor.ActorSystem +import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler} +import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} +import akka.http.scaladsl.UseHttp2.Negotiated +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.{Accept, `User-Agent`} +import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.http.scaladsl.{Http, HttpsConnectionContext} +import akka.persistence.typed.scaladsl.PersistentBehaviors.{CommandHandler, EventHandler} +import akka.persistence.typed.scaladsl.{Effect, PersistentBehaviors} +import akka.stream.ActorMaterializer +import com.typesafe.sslconfig.akka.AkkaSSLConfig +import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ +import io.circe.generic.auto._ +import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager} + +import scala.collection.immutable +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +object Entities { + case class Attributes( + parcelType: String, + weight: Double, + currency: String, + ) + + case class State( + id: String, + date: String, + text: String, + postcode: Option[String], + postoffice: Option[String], + idIcon: Option[String], + publicAccess: Int, + latitude: Option[Double], + longitude: Option[Double], + timeDeliveryAttempt: Option[String] + ) { + def prettyPrint: String = + s"$date\n$text" + } + + case class States(state: Seq[State]) + + case class ParcelHistory(id: String, attributes: Attributes, states: States) +} + +object CzechPostDeliveryCheck { + sealed trait Command + sealed trait CommandResult + sealed trait Event + case class ParcelState(attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) { + def prettyPrint: String = { + val statesString = states + .map(state => s"${state.prettyPrint}\n===========================\n") + .mkString + + s"""|*New state(s):* + |=========================== + |$statesString""".stripMargin + } + } + case class State(parcelStates: Map[String, ParcelState] = Map.empty) + + case class AddParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command + case class RemoveParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command + + case object CommandResultSuccess extends CommandResult + case class CommandResultFailure(exception: Throwable) extends CommandResult + + case class ParcelIdNotFound(parcelId: String) extends Exception + case class DuplicateParcelId(parcelId: String) extends Exception + + // internal commands + private case object CheckParcels extends Command + private case class ParcelHistoryRetrieved(parcelHistory: Entities.ParcelHistory) extends Command + case class DeliveryStateChanged(state: String) + + case class ParcelAdded(parcelId: String) extends Event + case class ParcelRemoved(parcelId: String) extends Event + case class ParcelHistoryStateAdded(parcelId: String, state: Entities.State) extends Event + case class ParcelAttributesChanged(parcelId: String, attributes: Entities.Attributes) extends Event + + private val trustfulSslContext: SSLContext = { + object NoCheckX509TrustManager extends X509TrustManager { + override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = () + override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = () + override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]() + } + + val context = SSLContext.getInstance("TLS") + context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null) + context + } + + def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.setup[Command] { ctx => + Behaviors.withTimers(scheduler => checkParcel(chatId, stateReporter, scheduler)) + } + + private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged], scheduler: TimerScheduler[Command]): Behavior[Command] = Behaviors.setup { ctx => + implicit val actorSystem: ActorSystem = ctx.system.toUntyped + implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) + implicit val materializer: ActorMaterializer = ActorMaterializer() + val http = Http() + val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose + .withAcceptAnyCertificate(true) + .withDisableHostnameVerification(true))) + val originalCtx = http.createClientHttpsContext(badSslConfig) + val sslContext = new HttpsConnectionContext( + trustfulSslContext, + originalCtx.sslConfig, + originalCtx.enabledCipherSuites, + originalCtx.enabledProtocols, + originalCtx.clientAuth, + originalCtx.sslParameters, + Negotiated + ) + val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"))) + val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings) + + scheduler.startPeriodicTimer("check-delivery-state", CheckParcels, 5.minutes) + + val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { + cmd match { + case AddParcel(parcelId, replyTo) => + if (state.parcelStates.keySet.contains(parcelId)) { + Effect + .none + .thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelId))) + } else { + Effect + .persist(ParcelAdded(parcelId)) + .thenRun(_ => { + replyTo ! CommandResultSuccess + ctx.self ! CheckParcels + }) + } + case RemoveParcel(parcelId, replyTo) => + if (state.parcelStates.keySet.contains(parcelId)) { + Effect + .persist(ParcelRemoved(parcelId)) + .thenRun(_ => replyTo ! CommandResultSuccess) + } else { + Effect + .none + .thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelId))) + } + + case CheckParcels => + val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id")) + + for (ids <- parcelIds) { + val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=en") + val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) + + http + .singleRequest(request, connectionContext = sslContext, settings = connectionSettings) + .transform { + case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) + case response: Failure[HttpResponse] => response + } + .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) + .andThen { + case Success(parcelHistories) => + parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory)) + case Failure(exception) => + ctx.log.error(exception, "Error checking parcel history.") + } + } + + Effect.none + case ParcelHistoryRetrieved(parcelHistory) => + val parcelId = parcelHistory.id + val parcelState = state.parcelStates(parcelId) + val attributesChangedEvent = (if (parcelState.attributes.isEmpty) + Some(parcelHistory.attributes) + else + parcelState.attributes + .flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None)) + .map(attributes => ParcelAttributesChanged(parcelId, attributes)).to[collection.immutable.Seq] + + val newStates = parcelHistory.states.state.toSet -- parcelState.states + val stateEvents: Seq[Event] = newStates.map(state => ParcelHistoryStateAdded(parcelId, state)).to[collection.immutable.Seq] + + Effect + .persist(attributesChangedEvent ++ stateEvents) + .thenRun(_ => { + if (newStates.nonEmpty) { + stateReporter ! DeliveryStateChanged(ParcelState(None, newStates).prettyPrint) + } + }) + } + } + + val eventHandler: EventHandler[State, Event] = (state, evt) => { + evt match { + case ParcelAdded(parcelId) => state.copy(parcelStates = state.parcelStates + (parcelId -> ParcelState())) + case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId) + case ParcelHistoryStateAdded(parcelId, newState) => + val parcelState = state.parcelStates(parcelId) + val newParcelState = parcelState.copy(states = parcelState.states + newState) + state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState)) + case ParcelAttributesChanged(parcelId, newAttributes) => + val parcelState = state.parcelStates(parcelId) + val newParcelState = parcelState.copy(attributes = Some(newAttributes)) + state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState)) + } + } + + PersistentBehaviors.receive[Command, Event, State]( + persistenceId = s"$chatId-czechpost", + emptyState = State(), + commandHandler = commandHandler, + eventHandler = eventHandler + ) + } +} diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala index 203b916..d64cb4c 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala @@ -56,7 +56,7 @@ object DialogManager { ctx.log.info("action=ask_dialog id={}", dialogKey.id) - implicit val timeout: Timeout = 5.seconds + implicit val timeout: Timeout = 20.seconds ctx.ask(dialog)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) { case Success(ProcessMessageSuccess) => DialogResponseSuccess(dialogKey.id, replyTo) case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(dialogKey.id, exception, replyTo) diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala index 2b66da7..6dcf49e 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/Main.scala @@ -14,8 +14,9 @@ import scala.io.StdIn object Main { def main(args: Array[String]): Unit = { - val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" - val telegramBot = ActorSystem(TelegramBot.behavior(botId, "lenovo", 8443), "telegram-bot") + //val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" // useless bot + val botId = "693134480:AAE8JRXA6j1mkOKTaxapP6A-E4LPHRuiIf8" // delivery bot + val telegramBot = ActorSystem(TelegramBot.behavior(botId, "0.0.0.0", 8443), "telegram-bot") implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default()) implicit val scheduler: Scheduler = telegramBot.scheduler @@ -30,6 +31,6 @@ object Main { .andThen { case _ => Http().shutdownAllConnectionPools() } .andThen { case _ => telegramBot.terminate() } - Await.ready(terminateFuture, 5.seconds) + Await.ready(terminateFuture, 20.seconds) } } diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala index f2fadce..ca50334 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/TelegramBot.scala @@ -44,7 +44,7 @@ object TelegramBot { val botUri = BotUri(botId) val http: HttpExt = Http() val hookId = UUID.randomUUID().toString - val webhookUri = Uri(s"https://xeppaka.eu:88/$hookId") + val webhookUri = Uri(s"https://xeppaka.eu:8443/$hookId") val httpsContext = createHttpsConnectionContext val stashBuffer = StashBuffer[Command](10) val dialogManager = ctx.spawnAnonymous(DialogManager.behavior(botUri)) @@ -198,7 +198,7 @@ object TelegramBot { import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ import io.circe.generic.auto._ - implicit val timeout: Timeout = 10.seconds + implicit val timeout: Timeout = 30.seconds path(hookId) { post {