First working version with czech post checks

This commit is contained in:
Pavel Kachalouski
2018-10-28 16:15:47 +01:00
parent 197c808cfd
commit 1ddefef6a6
7 changed files with 384 additions and 193 deletions

View File

@@ -4,7 +4,7 @@ lazy val commonSettings = Seq(
organization := "com.example", organization := "com.example",
scalaVersion := "2.12.7", scalaVersion := "2.12.7",
version := "0.1.0-SNAPSHOT", version := "0.1.0-SNAPSHOT",
mainClass := Some("eu.xeppaka.bot.TelegramBotServer") mainClass := Some("eu.xeppaka.bot.Main")
) )
inThisBuild(commonSettings) inThisBuild(commonSettings)

View File

@@ -1,156 +0,0 @@
package eu.xeppaka.bot
import java.security.cert.X509Certificate
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.http.scaladsl.UseHttp2.Negotiated
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Accept, `User-Agent`}
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{Http, HttpsConnectionContext}
import akka.persistence.typed.scaladsl.PersistentBehaviors.{CommandHandler, EventHandler}
import akka.persistence.typed.scaladsl.{Effect, PersistentBehaviors}
import akka.stream.ActorMaterializer
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._
import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager}
import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.util.{Failure, Success}
object Entities {
case class Attributes(
parcelType: String,
weight: Double,
currency: String,
)
case class State(
id: String,
date: String,
text: String,
postcode: Option[String],
postoffice: Option[String],
idIcon: Option[String],
publicAccess: Int,
latitude: Option[Double],
longitude: Option[Double],
timeDeliveryAttempt: Option[String]
)
case class States(state: Seq[State])
case class ParcelHistory(id: String, attributes: Attributes, states: States)
}
object CheckCzechPostDelivery {
sealed trait Command
sealed trait Event
case class State(
attributes: Option[Entities.Attributes] = None,
states: Set[Entities.State] = Set.empty
)
private case object CheckParcel extends Command
private case class ParcelHistoryData(data: Entities.ParcelHistory) extends Command
case class DeliveryStateChanged(state: String)
case class HistoryStateAdded(state: Entities.State) extends Event
case class AttributesChanged(attributes: Entities.Attributes) extends Event
private val trustfulSslContext: SSLContext = {
object NoCheckX509TrustManager extends X509TrustManager {
override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]()
}
val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null)
context
}
def behavior(chatId: String, parcelId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.withTimers(scheduler => checkParcel(chatId, parcelId, stateReporter, scheduler))
}
private def checkParcel(chatId: String, parcelId: String, stateReporter: ActorRef[DeliveryStateChanged], scheduler: TimerScheduler[Command]): Behavior[Command] = Behaviors.setup { ctx =>
implicit val actorSystem: ActorSystem = ctx.system.toUntyped
implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
implicit val materializer: ActorMaterializer = ActorMaterializer()
val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$parcelId&language=en")
val http = Http()
val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`)))
val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose
.withAcceptAnyCertificate(true)
.withDisableHostnameVerification(true)))
val originalCtx = http.createClientHttpsContext(badSslConfig)
val sslContext = new HttpsConnectionContext(
trustfulSslContext,
originalCtx.sslConfig,
originalCtx.enabledCipherSuites,
originalCtx.enabledProtocols,
originalCtx.clientAuth,
originalCtx.sslParameters,
Negotiated
)
val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0")))
val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings)
scheduler.startPeriodicTimer("check-delivery-state", CheckParcel, 5.seconds)
val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
cmd match {
case CheckParcel =>
http
.singleRequest(request, connectionContext = sslContext, settings = connectionSettings)
.transform {
case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}."))
case response: Failure[HttpResponse] => response
}
.flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]])
.andThen {
case Success(parcelHistory) => ctx.self ! ParcelHistoryData(parcelHistory.head)
case Failure(exception) =>
ctx.log.error(exception, "Error checking parcel history.")
}
Effect.none
case ParcelHistoryData(parcelHistory) =>
val attributesEvent = (if (state.attributes.isEmpty)
Some(parcelHistory.attributes)
else
state.attributes.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None))
.map(AttributesChanged.apply).to[collection.immutable.Seq]
val newStates = parcelHistory.states.state.toSet -- state.states
val stateEvents: Seq[Event] = newStates.map(HistoryStateAdded.apply).to[collection.immutable.Seq]
Effect
.persist(attributesEvent ++ stateEvents)
.thenRun(_ => stateReporter ! DeliveryStateChanged(newStates.toString()))
}
}
val eventHandler: EventHandler[State, Event] = (state, evt) => {
evt match {
case HistoryStateAdded(newState) => state.copy(states = state.states + newState)
case AttributesChanged(newAttributes) => state.copy(attributes = Some(newAttributes))
}
}
PersistentBehaviors.receive[Command, Event, State](
persistenceId = parcelId,
emptyState = State(),
commandHandler = commandHandler,
eventHandler = eventHandler
)
}
}

View File

@@ -1,62 +1,183 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.http.scaladsl.{Http, HttpExt} import akka.util.{ByteString, Timeout}
import akka.util.ByteString
import eu.xeppaka.bot.TelegramEntities.{Message, SendMessage} import eu.xeppaka.bot.TelegramEntities.{Message, SendMessage}
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
object CheckDeliveryDialog { object CheckDeliveryDialog {
sealed trait Command sealed trait Command
sealed trait CommandResult sealed trait CommandResult
sealed trait DialogCommand extends Command
case class ProcessMessage(msg: Message, replyTo: ActorRef[CommandResult]) extends Command case class ProcessMessage(msg: Message, replyTo: ActorRef[CommandResult]) extends Command
case object ProcessMessageSuccess extends CommandResult case object ProcessMessageSuccess extends CommandResult
case class ProcessMessageFailure(exception: Throwable) extends CommandResult case class ProcessMessageFailure(exception: Throwable) extends CommandResult
case object AddParcel extends DialogCommand
case object RemoveParcel extends DialogCommand
case object ListParcels extends DialogCommand
case object Help extends DialogCommand
object DialogCommand {
def apply(msg: String, replyTo: ActorRef[CommandResult]): Option[DialogCommand] = msg match {
case "/add" => Some(AddParcel)
case "/remove" => Some(RemoveParcel)
case "/list" => Some(ListParcels)
case "/help" => Some(Help)
case _ => None
}
}
// internal messages // internal messages
private case class DeliveryStateChanged(state: String) extends Command private case class DeliveryStateChanged(state: String) extends Command
def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
implicit val executionContext: ExecutionContext = ctx.system.dispatchers.lookup(DispatcherSelector.default()) implicit val executionContext: ExecutionContext = ctx.system.dispatchers.lookup(DispatcherSelector.default())
val http = Http()(ctx.system.toUntyped) val http = Http()(ctx.system.toUntyped)
val deliveryStateAdapter: ActorRef[CheckCzechPostDelivery.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state)) val stashBuffer = StashBuffer[Command](100)
val deliveryStateAdapter: ActorRef[CzechPostDeliveryCheck.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state))
val czechPostDeliveryCheck = ctx.spawnAnonymous(CzechPostDeliveryCheck.behavior(chatId.toString, deliveryStateAdapter))
Behaviors.receiveMessage { def initial: Behavior[Command] = Behaviors.receiveMessage {
case ProcessMessage(msg, replyTo) => case ProcessMessage(msg, replyTo) =>
sendMessage(http, botUri, chatId, msg.text.get) val command = DialogCommand(msg.text.getOrElse("unknown message"), replyTo)
.onComplete { replyTo ! ProcessMessageSuccess
case Success(response) =>
if (response.status.isSuccess()) {
replyTo ! ProcessMessageSuccess
} else {
replyTo ! ProcessMessageFailure(new RuntimeException(s"Error sending response. HTTP response code: ${response.status.value}."))
}
case Failure(exception) => replyTo ! ProcessMessageFailure(exception)
}
ctx.spawnAnonymous(CheckCzechPostDelivery.behavior(chatId.toString, "RR541190869CZ", deliveryStateAdapter)) if (command.isDefined) {
Behaviors.same ctx.self ! command.get
Behaviors.same
} else {
sendMessage("This command is unsupported.", initial, initial)
}
case AddParcel =>
sendMessage("Please enter parcel ID.", waitParcelId(parcelId => addParcel(parcelId)), initial)
case RemoveParcel =>
sendMessage("Please enter parcel ID.", waitParcelId(parcelId => removeParcel(parcelId)), initial)
case ListParcels => sendMessage("This command is not supported yet.", initial, initial)
case Help =>
sendMessage("Supported commands: /add, /remove, /list, /help", initial, initial)
case DeliveryStateChanged(state) => case DeliveryStateChanged(state) =>
sendMessage(http, botUri, chatId, state) sendMessage(state, initial, initial)
Behaviors.same case _ =>
Behaviors.unhandled
} }
}
private def sendMessage(http: HttpExt, botUri: BotUri, chatId: Long, text: String): Future[HttpResponse] = { def addParcel(parcelId: String): Behavior[Command] = Behaviors.setup { ctx =>
import io.circe._ case object AddParcelSuccess extends Command
import io.circe.generic.auto._ case class AddParcelFailure(exception: Throwable) extends Command
import io.circe.syntax._ implicit val timeout: Timeout = 5.seconds
val sendMessage = SendMessage(chatId, text) ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.AddParcel(parcelId, ref)) {
val printer = Printer.noSpaces.copy(dropNullValues = true) case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess
val json = printer.pretty(sendMessage.asJson) case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception)
val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json))) case Failure(exception) => AddParcelFailure(exception)
http.singleRequest(request) }
Behaviors.receiveMessage {
case AddParcelSuccess =>
sendMessage(s"Parcel $parcelId was added to the watch list.", initial, initial)
case AddParcelFailure(exception) =>
exception match {
case CzechPostDeliveryCheck.DuplicateParcelId(_) =>
sendMessage(s"Parcel $parcelId is in the watch list already.", initial, initial)
case _ =>
ctx.log.error(exception, "action=add_parcel result=failure")
sendMessage(s"Adding parcel failed. Please try again.", initial, initial)
}
case otherMessage =>
stashBuffer.stash(otherMessage)
Behaviors.same
}
}
def removeParcel(parcelId: String): Behavior[Command] = Behaviors.setup { ctx =>
case object RemoveParcelSuccess extends Command
case class RemoveParcelFailure(exception: Throwable) extends Command
implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) {
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception)
case Failure(exception) => RemoveParcelFailure(exception)
}
Behaviors.receiveMessage {
case RemoveParcelSuccess =>
sendMessage(s"Parcel $parcelId was removed from the watch list.", initial, initial)
case RemoveParcelFailure(exception) =>
exception match {
case CzechPostDeliveryCheck.ParcelIdNotFound(_) =>
sendMessage(s"Parcel $parcelId is not found in the list of the watched parcels.", initial, initial)
case _ =>
ctx.log.error(exception, "action=add_parcel result=failure")
sendMessage(s"Remove of the parcel failed. Please try again.", initial, initial)
}
case otherMessage =>
stashBuffer.stash(otherMessage)
Behaviors.same
}
}
def waitParcelId(onSuccess: String => Behavior[Command]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.receiveMessage {
case ProcessMessage(msg, replyTo) =>
if (msg.text.isDefined) {
val parcelId = msg.text.get
replyTo ! ProcessMessageSuccess
onSuccess(parcelId)
} else {
replyTo ! ProcessMessageSuccess
waitParcelId(onSuccess)
}
case otherMsg =>
stashBuffer.stash(otherMsg)
Behaviors.same
}
}
def sendMessage(text: String, onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
import io.circe._
import io.circe.generic.auto._
import io.circe.syntax._
case object SendMessageSuccess extends Command
case class SendMessageFailure(exception: Throwable) extends Command
val sendMessage = SendMessage(chatId, text, Some("Markdown"))
val printer = Printer.noSpaces.copy(dropNullValues = true)
val json = printer.pretty(sendMessage.asJson)
val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json)))
http
.singleRequest(request)
.onComplete {
case Success(response) => if (response.status.isSuccess()) {
ctx.self ! SendMessageSuccess
} else {
ctx.self ! SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}."))
}
case Failure(exception) => ctx.self ! SendMessageFailure(exception)
}
Behaviors.receiveMessage {
case SendMessageSuccess =>
stashBuffer.unstashAll(ctx, onSuccess)
case SendMessageFailure(exception) =>
ctx.log.error(exception, "action=send_message result=failure")
stashBuffer.unstashAll(ctx, onFailure)
case otherMsg =>
stashBuffer.stash(otherMsg)
Behaviors.same
}
}
initial
} }
} }

View File

@@ -0,0 +1,225 @@
package eu.xeppaka.bot
import java.security.cert.X509Certificate
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.http.scaladsl.UseHttp2.Negotiated
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Accept, `User-Agent`}
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{Http, HttpsConnectionContext}
import akka.persistence.typed.scaladsl.PersistentBehaviors.{CommandHandler, EventHandler}
import akka.persistence.typed.scaladsl.{Effect, PersistentBehaviors}
import akka.stream.ActorMaterializer
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._
import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager}
import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.util.{Failure, Success}
object Entities {
case class Attributes(
parcelType: String,
weight: Double,
currency: String,
)
case class State(
id: String,
date: String,
text: String,
postcode: Option[String],
postoffice: Option[String],
idIcon: Option[String],
publicAccess: Int,
latitude: Option[Double],
longitude: Option[Double],
timeDeliveryAttempt: Option[String]
) {
def prettyPrint: String =
s"$date\n$text"
}
case class States(state: Seq[State])
case class ParcelHistory(id: String, attributes: Attributes, states: States)
}
object CzechPostDeliveryCheck {
sealed trait Command
sealed trait CommandResult
sealed trait Event
case class ParcelState(attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) {
def prettyPrint: String = {
val statesString = states
.map(state => s"${state.prettyPrint}\n===========================\n")
.mkString
s"""|*New state(s):*
|===========================
|$statesString""".stripMargin
}
}
case class State(parcelStates: Map[String, ParcelState] = Map.empty)
case class AddParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command
case class RemoveParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command
case object CommandResultSuccess extends CommandResult
case class CommandResultFailure(exception: Throwable) extends CommandResult
case class ParcelIdNotFound(parcelId: String) extends Exception
case class DuplicateParcelId(parcelId: String) extends Exception
// internal commands
private case object CheckParcels extends Command
private case class ParcelHistoryRetrieved(parcelHistory: Entities.ParcelHistory) extends Command
case class DeliveryStateChanged(state: String)
case class ParcelAdded(parcelId: String) extends Event
case class ParcelRemoved(parcelId: String) extends Event
case class ParcelHistoryStateAdded(parcelId: String, state: Entities.State) extends Event
case class ParcelAttributesChanged(parcelId: String, attributes: Entities.Attributes) extends Event
private val trustfulSslContext: SSLContext = {
object NoCheckX509TrustManager extends X509TrustManager {
override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]()
}
val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null)
context
}
def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.withTimers(scheduler => checkParcel(chatId, stateReporter, scheduler))
}
private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged], scheduler: TimerScheduler[Command]): Behavior[Command] = Behaviors.setup { ctx =>
implicit val actorSystem: ActorSystem = ctx.system.toUntyped
implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
implicit val materializer: ActorMaterializer = ActorMaterializer()
val http = Http()
val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose
.withAcceptAnyCertificate(true)
.withDisableHostnameVerification(true)))
val originalCtx = http.createClientHttpsContext(badSslConfig)
val sslContext = new HttpsConnectionContext(
trustfulSslContext,
originalCtx.sslConfig,
originalCtx.enabledCipherSuites,
originalCtx.enabledProtocols,
originalCtx.clientAuth,
originalCtx.sslParameters,
Negotiated
)
val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0")))
val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings)
scheduler.startPeriodicTimer("check-delivery-state", CheckParcels, 5.minutes)
val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
cmd match {
case AddParcel(parcelId, replyTo) =>
if (state.parcelStates.keySet.contains(parcelId)) {
Effect
.none
.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelId)))
} else {
Effect
.persist(ParcelAdded(parcelId))
.thenRun(_ => {
replyTo ! CommandResultSuccess
ctx.self ! CheckParcels
})
}
case RemoveParcel(parcelId, replyTo) =>
if (state.parcelStates.keySet.contains(parcelId)) {
Effect
.persist(ParcelRemoved(parcelId))
.thenRun(_ => replyTo ! CommandResultSuccess)
} else {
Effect
.none
.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelId)))
}
case CheckParcels =>
val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id"))
for (ids <- parcelIds) {
val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=en")
val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`)))
http
.singleRequest(request, connectionContext = sslContext, settings = connectionSettings)
.transform {
case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}."))
case response: Failure[HttpResponse] => response
}
.flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]])
.andThen {
case Success(parcelHistories) =>
parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory))
case Failure(exception) =>
ctx.log.error(exception, "Error checking parcel history.")
}
}
Effect.none
case ParcelHistoryRetrieved(parcelHistory) =>
val parcelId = parcelHistory.id
val parcelState = state.parcelStates(parcelId)
val attributesChangedEvent = (if (parcelState.attributes.isEmpty)
Some(parcelHistory.attributes)
else
parcelState.attributes
.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None))
.map(attributes => ParcelAttributesChanged(parcelId, attributes)).to[collection.immutable.Seq]
val newStates = parcelHistory.states.state.toSet -- parcelState.states
val stateEvents: Seq[Event] = newStates.map(state => ParcelHistoryStateAdded(parcelId, state)).to[collection.immutable.Seq]
Effect
.persist(attributesChangedEvent ++ stateEvents)
.thenRun(_ => {
if (newStates.nonEmpty) {
stateReporter ! DeliveryStateChanged(ParcelState(None, newStates).prettyPrint)
}
})
}
}
val eventHandler: EventHandler[State, Event] = (state, evt) => {
evt match {
case ParcelAdded(parcelId) => state.copy(parcelStates = state.parcelStates + (parcelId -> ParcelState()))
case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId)
case ParcelHistoryStateAdded(parcelId, newState) =>
val parcelState = state.parcelStates(parcelId)
val newParcelState = parcelState.copy(states = parcelState.states + newState)
state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
case ParcelAttributesChanged(parcelId, newAttributes) =>
val parcelState = state.parcelStates(parcelId)
val newParcelState = parcelState.copy(attributes = Some(newAttributes))
state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
}
}
PersistentBehaviors.receive[Command, Event, State](
persistenceId = s"$chatId-czechpost",
emptyState = State(),
commandHandler = commandHandler,
eventHandler = eventHandler
)
}
}

View File

@@ -56,7 +56,7 @@ object DialogManager {
ctx.log.info("action=ask_dialog id={}", dialogKey.id) ctx.log.info("action=ask_dialog id={}", dialogKey.id)
implicit val timeout: Timeout = 5.seconds implicit val timeout: Timeout = 20.seconds
ctx.ask(dialog)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) { ctx.ask(dialog)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) {
case Success(ProcessMessageSuccess) => DialogResponseSuccess(dialogKey.id, replyTo) case Success(ProcessMessageSuccess) => DialogResponseSuccess(dialogKey.id, replyTo)
case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(dialogKey.id, exception, replyTo) case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(dialogKey.id, exception, replyTo)

View File

@@ -14,8 +14,9 @@ import scala.io.StdIn
object Main { object Main {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" //val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" // useless bot
val telegramBot = ActorSystem(TelegramBot.behavior(botId, "lenovo", 8443), "telegram-bot") val botId = "693134480:AAE8JRXA6j1mkOKTaxapP6A-E4LPHRuiIf8" // delivery bot
val telegramBot = ActorSystem(TelegramBot.behavior(botId, "0.0.0.0", 8443), "telegram-bot")
implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped
implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default()) implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default())
implicit val scheduler: Scheduler = telegramBot.scheduler implicit val scheduler: Scheduler = telegramBot.scheduler
@@ -30,6 +31,6 @@ object Main {
.andThen { case _ => Http().shutdownAllConnectionPools() } .andThen { case _ => Http().shutdownAllConnectionPools() }
.andThen { case _ => telegramBot.terminate() } .andThen { case _ => telegramBot.terminate() }
Await.ready(terminateFuture, 5.seconds) Await.ready(terminateFuture, 20.seconds)
} }
} }

View File

@@ -44,7 +44,7 @@ object TelegramBot {
val botUri = BotUri(botId) val botUri = BotUri(botId)
val http: HttpExt = Http() val http: HttpExt = Http()
val hookId = UUID.randomUUID().toString val hookId = UUID.randomUUID().toString
val webhookUri = Uri(s"https://xeppaka.eu:88/$hookId") val webhookUri = Uri(s"https://xeppaka.eu:8443/$hookId")
val httpsContext = createHttpsConnectionContext val httpsContext = createHttpsConnectionContext
val stashBuffer = StashBuffer[Command](10) val stashBuffer = StashBuffer[Command](10)
val dialogManager = ctx.spawnAnonymous(DialogManager.behavior(botUri)) val dialogManager = ctx.spawnAnonymous(DialogManager.behavior(botUri))
@@ -198,7 +198,7 @@ object TelegramBot {
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._ import io.circe.generic.auto._
implicit val timeout: Timeout = 10.seconds implicit val timeout: Timeout = 30.seconds
path(hookId) { path(hookId) {
post { post {