diff --git a/.idea/modules/delivery-checker-bot-build.iml b/.idea/modules/delivery-checker-bot-build.iml deleted file mode 100644 index 19ecb3a..0000000 --- a/.idea/modules/delivery-checker-bot-build.iml +++ /dev/null @@ -1,133 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules/delivery-checker-bot.iml b/.idea/modules/delivery-checker-bot.iml deleted file mode 100644 index 899bff0..0000000 --- a/.idea/modules/delivery-checker-bot.iml +++ /dev/null @@ -1,44 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules/telegram-bot.iml b/.idea/modules/telegram-bot.iml index 747efe4..859c125 100644 --- a/.idea/modules/telegram-bot.iml +++ b/.idea/modules/telegram-bot.iml @@ -14,6 +14,7 @@ + @@ -43,5 +44,9 @@ + + + + \ No newline at end of file diff --git a/build.sbt b/build.sbt index 1c35b5a..4be51bc 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,8 @@ lazy val `telegram-bot` = (project in file("telegram-bot")) akkaTyped, akkaHttp, akkaStream, + akkaPersistence, + levelDbJni, circleCore, circleGeneric, circleParser, @@ -24,20 +26,6 @@ lazy val `telegram-bot` = (project in file("telegram-bot")) ) ) -lazy val `delivery-checker-bot` = (project in file("delivery-checker-bot")) - .settings( - libraryDependencies ++= Seq( - scalaTest % Test, - akka, - akkaTyped, - akkaHttp, - akkaStream, - circleCore, - circleGeneric, - circleParser - ) - ) - assemblyMergeStrategy in assembly := { case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first case x => diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ab1c894..f0df78a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -5,6 +5,8 @@ object Dependencies { lazy val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % "2.5.17" lazy val akkaStream = "com.typesafe.akka" %% "akka-stream" % "2.5.17" lazy val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.5" + lazy val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % "2.5.17" + lazy val levelDbJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8" //lazy val vkapi = "com.vk.api" % "sdk" % "0.5.12" lazy val circleCore = "io.circe" %% "circe-core" % "0.10.0" lazy val circleGeneric = "io.circe" %% "circe-generic" % "0.10.0" diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckCzechPostDelivery.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckCzechPostDelivery.scala new file mode 100644 index 0000000..e5db461 --- /dev/null +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckCzechPostDelivery.scala @@ -0,0 +1,156 @@ +package eu.xeppaka.bot + +import java.security.cert.X509Certificate + +import akka.actor.ActorSystem +import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler} +import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} +import akka.http.scaladsl.UseHttp2.Negotiated +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.{Accept, `User-Agent`} +import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.http.scaladsl.{Http, HttpsConnectionContext} +import akka.persistence.typed.scaladsl.PersistentBehaviors.{CommandHandler, EventHandler} +import akka.persistence.typed.scaladsl.{Effect, PersistentBehaviors} +import akka.stream.ActorMaterializer +import com.typesafe.sslconfig.akka.AkkaSSLConfig +import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ +import io.circe.generic.auto._ +import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager} + +import scala.collection.immutable +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +object Entities { + case class Attributes( + parcelType: String, + weight: Double, + currency: String, + ) + + case class State( + id: String, + date: String, + text: String, + postcode: Option[String], + postoffice: Option[String], + idIcon: Option[String], + publicAccess: Int, + latitude: Option[Double], + longitude: Option[Double], + timeDeliveryAttempt: Option[String] + ) + + case class States(state: Seq[State]) + + case class ParcelHistory(id: String, attributes: Attributes, states: States) +} + +object CheckCzechPostDelivery { + sealed trait Command + sealed trait Event + case class State( + attributes: Option[Entities.Attributes] = None, + states: Set[Entities.State] = Set.empty + ) + + private case object CheckParcel extends Command + private case class ParcelHistoryData(data: Entities.ParcelHistory) extends Command + case class DeliveryStateChanged(state: String) + + case class HistoryStateAdded(state: Entities.State) extends Event + case class AttributesChanged(attributes: Entities.Attributes) extends Event + + private val trustfulSslContext: SSLContext = { + object NoCheckX509TrustManager extends X509TrustManager { + override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = () + override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = () + override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]() + } + + val context = SSLContext.getInstance("TLS") + context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null) + context + } + + def behavior(chatId: String, parcelId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.setup[Command] { ctx => + Behaviors.withTimers(scheduler => checkParcel(chatId, parcelId, stateReporter, scheduler)) + } + + private def checkParcel(chatId: String, parcelId: String, stateReporter: ActorRef[DeliveryStateChanged], scheduler: TimerScheduler[Command]): Behavior[Command] = Behaviors.setup { ctx => + implicit val actorSystem: ActorSystem = ctx.system.toUntyped + implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) + implicit val materializer: ActorMaterializer = ActorMaterializer() + val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$parcelId&language=en") + val http = Http() + val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) + val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose + .withAcceptAnyCertificate(true) + .withDisableHostnameVerification(true))) + val originalCtx = http.createClientHttpsContext(badSslConfig) + val sslContext = new HttpsConnectionContext( + trustfulSslContext, + originalCtx.sslConfig, + originalCtx.enabledCipherSuites, + originalCtx.enabledProtocols, + originalCtx.clientAuth, + originalCtx.sslParameters, + Negotiated + ) + val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"))) + val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings) + + scheduler.startPeriodicTimer("check-delivery-state", CheckParcel, 5.seconds) + + val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { + cmd match { + case CheckParcel => + http + .singleRequest(request, connectionContext = sslContext, settings = connectionSettings) + .transform { + case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) + case response: Failure[HttpResponse] => response + } + .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) + .andThen { + case Success(parcelHistory) => ctx.self ! ParcelHistoryData(parcelHistory.head) + case Failure(exception) => + ctx.log.error(exception, "Error checking parcel history.") + } + + Effect.none + case ParcelHistoryData(parcelHistory) => + val attributesEvent = (if (state.attributes.isEmpty) + Some(parcelHistory.attributes) + else + state.attributes.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None)) + .map(AttributesChanged.apply).to[collection.immutable.Seq] + + val newStates = parcelHistory.states.state.toSet -- state.states + val stateEvents: Seq[Event] = newStates.map(HistoryStateAdded.apply).to[collection.immutable.Seq] + + Effect + .persist(attributesEvent ++ stateEvents) + .thenRun(_ => stateReporter ! DeliveryStateChanged(newStates.toString())) + } + } + + val eventHandler: EventHandler[State, Event] = (state, evt) => { + evt match { + case HistoryStateAdded(newState) => state.copy(states = state.states + newState) + case AttributesChanged(newAttributes) => state.copy(attributes = Some(newAttributes)) + } + } + + PersistentBehaviors.receive[Command, Event, State]( + persistenceId = parcelId, + emptyState = State(), + commandHandler = commandHandler, + eventHandler = eventHandler + ) + } +} diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala index cffd99e..67e406f 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala @@ -19,13 +19,17 @@ object CheckDeliveryDialog { case object ProcessMessageSuccess extends CommandResult case class ProcessMessageFailure(exception: Throwable) extends CommandResult + // internal messages + private case class DeliveryStateChanged(state: String) extends Command + def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => implicit val executionContext: ExecutionContext = ctx.system.dispatchers.lookup(DispatcherSelector.default()) val http = Http()(ctx.system.toUntyped) + val deliveryStateAdapter: ActorRef[CheckCzechPostDelivery.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state)) Behaviors.receiveMessage { case ProcessMessage(msg, replyTo) => - sendResponse(http, botUri, chatId, msg.text.get) + sendMessage(http, botUri, chatId, msg.text.get) .onComplete { case Success(response) => if (response.status.isSuccess()) { @@ -35,11 +39,16 @@ object CheckDeliveryDialog { } case Failure(exception) => replyTo ! ProcessMessageFailure(exception) } + + ctx.spawnAnonymous(CheckCzechPostDelivery.behavior(chatId.toString, "RR541190869CZ", deliveryStateAdapter)) + Behaviors.same + case DeliveryStateChanged(state) => + sendMessage(http, botUri, chatId, state) Behaviors.same } } - private def sendResponse(http: HttpExt, botUri: BotUri, chatId: Long, text: String): Future[HttpResponse] = { + private def sendMessage(http: HttpExt, botUri: BotUri, chatId: Long, text: String): Future[HttpResponse] = { import io.circe._ import io.circe.generic.auto._ import io.circe.syntax._ diff --git a/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala b/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala index 2f4bca8..203b916 100644 --- a/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala +++ b/telegram-bot/src/main/scala/eu/xeppaka/bot/DialogManager.scala @@ -32,10 +32,10 @@ object DialogManager { if (update.message.isDefined) { val chatId = update.message.get.chat.id val dialogKey = ServiceKey[CheckDeliveryDialog.Command](chatId.toString) - val receptionistMapper: ActorRef[Receptionist.Listing] = ctx.messageAdapter(listing => ReceptionistListingWrapper(chatId, dialogKey, listing, update.message.get, replyTo)) + val receptionistAdapter: ActorRef[Receptionist.Listing] = ctx.messageAdapter(listing => ReceptionistListingWrapper(chatId, dialogKey, listing, update.message.get, replyTo)) ctx.log.debug(s"action=find_dialog id=${chatId.toString}") - ctx.system.receptionist ! Receptionist.Find(dialogKey, receptionistMapper) + ctx.system.receptionist ! Receptionist.Find(dialogKey, receptionistAdapter) } Behaviors.same