From 4702d3a650f0591ff9dc5c8d5d520f8e2fe0a0d9 Mon Sep 17 00:00:00 2001 From: Pavel Kachalouski Date: Sun, 27 Dec 2020 16:03:35 +0100 Subject: [PATCH] Upgrade versions, use jackson for json serialization --- .sbtopts | 1 - build.sbt | 39 ++-- msg.json | 32 ++++ project/Dependencies.scala | 19 +- project/build.properties | 2 +- project/plugins.sbt | 5 +- src/main/resources/application.conf | 17 -- src/main/resources/reference.conf | 65 +++++++ .../eu/xeppaka/bot/CheckDeliveryDialog.scala | 79 ++++---- .../xeppaka/bot/CzechPostDeliveryCheck.scala | 177 ++++++++---------- .../eu/xeppaka/bot/JsonSerializable.scala | 3 + src/main/scala/eu/xeppaka/bot/Main.scala | 11 +- .../scala/eu/xeppaka/bot/TelegramBot.scala | 117 ++++++------ 13 files changed, 302 insertions(+), 265 deletions(-) delete mode 100644 .sbtopts create mode 100644 msg.json delete mode 100644 src/main/resources/application.conf create mode 100644 src/main/resources/reference.conf create mode 100644 src/main/scala/eu/xeppaka/bot/JsonSerializable.scala diff --git a/.sbtopts b/.sbtopts deleted file mode 100644 index ab02e97..0000000 --- a/.sbtopts +++ /dev/null @@ -1 +0,0 @@ --mem 2048 \ No newline at end of file diff --git a/build.sbt b/build.sbt index 58223f2..4144d20 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,7 @@ import Dependencies._ +import Versions._ -lazy val commonSettings = Seq( - organization := "com.example", - scalaVersion := "2.13.1", - mainClass := Some("eu.xeppaka.bot.Main") -) +lazy val commonSettings = Seq(organization := "eu.xeppaka", scalaVersion := "2.13.4", mainClass := Some("eu.xeppaka.bot.Main")) inThisBuild(commonSettings) @@ -12,29 +9,27 @@ lazy val `telegram-bot-delivery` = (project in file(".")) .settings( name := "telegram-bot-delivery", libraryDependencies ++= Seq( - scalaTest % Test, - akkaTyped, - akkaClusterShardingTyped, - akkaHttp, - akkaStream, - akkaPersistence, - akkaPersistenceCassandra, - akkaPersistenceQuery, - levelDbJni, - circleCore, - circleGeneric, - circleParser, - circeAkkaHttp, - slibTelegram, - logback - ), + scalaTest % Test, + akkaTyped, + akkaSerializationJackson, + akkaClusterShardingTyped, + akkaHttp, + akkaHttpJackson, + akkaStream, + akkaPersistence, + akkaPersistenceCassandra, + akkaPersistenceQuery, + slibTelegram, + logback + ), + dependencyOverrides ++= Seq("com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion), dockerBaseImage := "openjdk:11", dockerExposedPorts := Seq(8443), dockerRepository := Some("registry.xeppaka.eu:443"), Docker / daemonUserUid := Some("1001"), Docker / daemonUser := "telegram-bot", Docker / defaultLinuxInstallLocation := "/opt/telegram-bot-delivery", - version := "1.1.1" + version := "1.1.2" ) .enablePlugins(JavaServerAppPackaging) .enablePlugins(DockerPlugin) diff --git a/msg.json b/msg.json new file mode 100644 index 0000000..2e64cf0 --- /dev/null +++ b/msg.json @@ -0,0 +1,32 @@ +{ + "chat_id": 77544923, + "text": "*List of your watched parcels:*\n(empty)", + "parse_mode": "Markdown", + "disable_web_page_preview": null, + "disable_notification": null, + "reply_to_message_id": null, + "reply_markup": { + "keyboard": [ + [ + { + "text": "/add", + "request_contact": null, + "request_location": null + }, + { + "text": "/list", + "request_contact": null, + "request_location": null + }, + { + "text": "/remove", + "request_contact": null, + "request_location": null + } + ] + ], + "resize_keyboard": true, + "one_time_keyboard": true, + "selective": null + } +} \ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ed85b3e..ba3b64a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,29 +4,24 @@ import Dependencies.Versions._ object Dependencies { object Versions { - val akkaVersion = "2.6.3" - val akkaHttpVersion = "10.1.11" - val akkaPersistenceCassandraVersion = "0.103" - val levelDbJniVersion = "1.8" - val circeVersion = "0.13.0" - val akkaHttpCirceVersion = "1.31.0" - val scalaTestVersion = "3.2.0-M4" + val akkaVersion = "2.6.10" + val akkaHttpVersion = "10.2.2" + val akkaHttpJacksonVersion = "1.35.2" + val akkaPersistenceCassandraVersion = "1.0.4" + val scalaTestVersion = "3.2.2" val slibTelegramVersion = "0.1.0" val logbackVersion = "1.2.3" } val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion + val akkaSerializationJackson = "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion + val akkaHttpJackson = "de.heikoseeberger" %% "akka-http-jackson" % akkaHttpJacksonVersion val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion val akkaClusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandraVersion val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion - val levelDbJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % levelDbJniVersion - val circleCore = "io.circe" %% "circe-core" % circeVersion - val circleGeneric = "io.circe" %% "circe-generic" % circeVersion - val circleParser = "io.circe" %% "circe-parser" % circeVersion - val circeAkkaHttp = "de.heikoseeberger" %% "akka-http-circe" % akkaHttpCirceVersion val slibTelegram = "eu.xeppaka" %% "slib-telegram" % slibTelegramVersion val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion val logback = "ch.qos.logback" % "logback-classic" % logbackVersion diff --git a/project/build.properties b/project/build.properties index a919a9b..c06db1b 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.8 +sbt.version=1.4.5 diff --git a/project/plugins.sbt b/project/plugins.sbt index 3137ef2..4368552 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,2 @@ -addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.4.1") -addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.12") -addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1") \ No newline at end of file +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.8.0") +addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13") diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf deleted file mode 100644 index b484ba9..0000000 --- a/src/main/resources/application.conf +++ /dev/null @@ -1,17 +0,0 @@ -akka { - loglevel = "INFO" - - extensions = [akka.persistence.Persistence] - - persistence { - journal { - plugin = "cassandra-journal" - } - } - - actor.allow-java-serialization = on -} - -cassandra-journal { - contact-points = ["cassandra"] -} diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf new file mode 100644 index 0000000..97daa53 --- /dev/null +++ b/src/main/resources/reference.conf @@ -0,0 +1,65 @@ +telegram-bot-delivery { + cassandra.keyspace = "telegram_bot_delivery" + cassandra.autocreate-offset-store = true +} + +datastax-java-driver { + advanced { + reconnect-on-init = true + metadata.schema.enabled = false + metadata.token-map.enabled = false + } + basic { + contact-points = ["127.0.0.1:9042"] + load-balancing-policy.local-datacenter = "datacenter1" + } +} + +akka { + loglevel = "INFO" + extensions = [akka.persistence.Persistence] + + actor { + serialization-bindings { + "eu.xeppaka.bot.JsonSerializable" = jackson-json + } + } + + persistence { + journal { + plugin = "akka.persistence.cassandra.journal" + auto-start-journals = ["akka.persistence.cassandra.journal"] + } + snapshot-store { + plugin = "akka.persistence.cassandra.snapshot" + auto-start-snapshot-stores = ["akka.persistence.cassandra.snapshot"] + } + + cassandra { + journal { + keyspace-autocreate = true + tables-autocreate = true + keyspace = ${telegram-bot-delivery.cassandra.keyspace} + max-message-batch-size = 30 + support-all-persistence-ids = off + } + snapshot { + keyspace-autocreate = true + tables-autocreate = true + keyspace = ${telegram-bot-delivery.cassandra.keyspace} + } + events-by-tag { + max-message-batch-size = 30 + eventual-consistency-delay = 2s + pubsub-notification = on + first-time-bucket = "20200101T00:00" + } + } + } + + projection.cassandra { + offset-store { + keyspace = ${telegram-bot-delivery.cassandra.keyspace} + } + } +} diff --git a/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala b/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala index 0e934b7..f11b3be 100644 --- a/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala +++ b/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala @@ -3,19 +3,21 @@ package eu.xeppaka.bot import akka.actor.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} +import akka.http.scaladsl.marshalling.Marshal +import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy } import akka.http.scaladsl.Http import akka.http.scaladsl.model._ -import akka.stream.scaladsl.{Sink, Source} -import akka.util.{ByteString, Timeout} +import akka.stream.scaladsl.{ Sink, Source } +import akka.util.{ ByteString, Timeout } import eu.xeppaka.telegram.bot.TelegramEntities._ -import io.circe.Printer -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Await, ExecutionContext } import scala.concurrent.duration._ -import scala.util.{Failure, Success} +import scala.util.{ Failure, Success } object CheckDeliveryDialog { + import de.heikoseeberger.akkahttpjackson.JacksonSupport._ + sealed trait Command sealed trait CommandResult sealed trait DialogCommand extends Command @@ -31,16 +33,14 @@ object CheckDeliveryDialog { object DialogCommand { def parse(text: String): DialogCommand = text match { - case "/add" => AddParcel + case "/add" => AddParcel case "/remove" => RemoveParcel - case "/list" => ListParcels - case "/help" => Help - case _ => Help + case "/list" => ListParcels + case "/help" => Help + case _ => Help } } - // json printer - private val printer = Printer.noSpaces.copy(dropNullValues = true) // internal messages private case class DeliveryStateChanged(state: String) extends Command private val helpMessage = @@ -50,11 +50,9 @@ object CheckDeliveryDialog { |/list - list watched parcels |/remove - remove parcel from a watching list """.stripMargin - private val commandsKeyboard = Some(ReplyKeyboardMarkup( - Seq(Seq(KeyboardButton("/add"), KeyboardButton("/list"), KeyboardButton("/remove"))), - resize_keyboard = Some(true), - one_time_keyboard = Some(true) - )) + private val commandsKeyboard = Some( + ReplyKeyboardMarkup(Seq(Seq(KeyboardButton("/add"), KeyboardButton("/list"), KeyboardButton("/remove"))), resize_keyboard = Some(true), one_time_keyboard = Some(true)) + ) private val removeKeyboard = Some(ReplyKeyboardRemove()) @@ -102,9 +100,9 @@ object CheckDeliveryDialog { implicit val timeout: Timeout = 5.seconds ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) { - case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess + case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception) - case Failure(exception) => AddParcelFailure(exception) + case Failure(exception) => AddParcelFailure(exception) } Behaviors.receiveMessage { @@ -136,7 +134,7 @@ object CheckDeliveryDialog { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcels(ref)) { case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList) - case Failure(exception) => ListParcelsFailure(exception) + case Failure(exception) => ListParcelsFailure(exception) } Behaviors.receiveMessage { @@ -163,7 +161,7 @@ object CheckDeliveryDialog { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcelIds(ref)) { case Success(CzechPostDeliveryCheck.ListParcelIdsResult(parcelsList)) => ListParcelIdsSuccess(parcelsList) - case Failure(exception) => ListParcelIdsFailure(exception) + case Failure(exception) => ListParcelIdsFailure(exception) } Behaviors.receiveMessage { @@ -195,9 +193,9 @@ object CheckDeliveryDialog { implicit val timeout: Timeout = 5.seconds ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) { - case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess + case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception) - case Failure(exception) => RemoveParcelFailure(exception) + case Failure(exception) => RemoveParcelFailure(exception) } Behaviors.receiveMessage { @@ -250,34 +248,31 @@ object CheckDeliveryDialog { def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => Behaviors.setup[Command] { ctx => - import io.circe.generic.auto._ - import io.circe.syntax._ case object SendMessageSuccess extends Command case class SendMessageFailure(exception: Throwable) extends Command - val json = printer.print(message.asJson) - val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json))) + ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, message) - ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, json) + println(message) + println(Await.result(Marshal(message).to[HttpEntity], 2.seconds).asInstanceOf[HttpEntity.Strict].data.utf8String) Source - .single(request) + .future(Marshal(message).to[RequestEntity]) .initialDelay(2.seconds * (attempt - 1)) + .map(requestEntity => HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = requestEntity)) .mapAsync(1) { request => - http - .singleRequest(request) - .transform { - case Success(response) => - if (response.status.isSuccess()) { - Success(SendMessageSuccess) - } else { - Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}."))) - } - case Failure(exception) => - ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId", exception) - Success(SendMessageFailure(exception)) - } + http.singleRequest(request).transform { + case Success(response) => + if (response.status.isSuccess()) { + Success(SendMessageSuccess) + } else { + Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}."))) + } + case Failure(exception) => + ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId", exception) + Success(SendMessageFailure(exception)) + } } .to(Sink.foreach(ctx.self ! _)) .run() diff --git a/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala b/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala index c570b29..28dcd79 100644 --- a/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala +++ b/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala @@ -1,38 +1,31 @@ package eu.xeppaka.bot -import java.security.cert.X509Certificate -import java.text.SimpleDateFormat - import akka.actor.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} -import akka.http.scaladsl.UseHttp2.Negotiated +import akka.actor.typed.{ ActorRef, Behavior, DispatcherSelector } import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.{Accept, `User-Agent`} -import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} +import akka.http.scaladsl.model.headers.{ `User-Agent`, Accept } +import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings } import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.http.scaladsl.{Http, HttpsConnectionContext} +import akka.http.scaladsl.{ ConnectionContext, Http } import akka.persistence.typed.PersistenceId -import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler} -import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior} +import akka.persistence.typed.scaladsl.EventSourcedBehavior.{ CommandHandler, EventHandler } +import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior } +import com.fasterxml.jackson.annotation.{ JsonSubTypes, JsonTypeInfo } import com.typesafe.sslconfig.akka.AkkaSSLConfig -import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ -import io.circe.generic.auto._ -import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager} +import java.security.cert.X509Certificate +import java.text.SimpleDateFormat +import javax.net.ssl.{ KeyManager, SSLContext, X509TrustManager } import scala.collection.immutable import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ -import scala.util.{Failure, Success} +import scala.util.{ Failure, Success } object Entities { - case class Attributes( - parcelType: String, - weight: Double, - currency: String, - ) + case class Attributes(parcelType: String, weight: Double, currency: String) case class State( id: String, @@ -53,17 +46,27 @@ object Entities { } object CzechPostDeliveryCheck { + import de.heikoseeberger.akkahttpjackson.JacksonSupport._ + private val czechPostDateFormat = new SimpleDateFormat("yyyy-MM-dd") private val printDateFormat = new SimpleDateFormat("dd-MM-yyyy") private val entityType = "czechpost" - sealed trait Command - sealed trait CommandResult - sealed trait Event + sealed trait Command extends JsonSerializable + sealed trait CommandResult extends JsonSerializable + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") + @JsonSubTypes( + Array( + new JsonSubTypes.Type(value = classOf[ParcelAttributesChanged], name = "parcel_attributes_changed"), + new JsonSubTypes.Type(value = classOf[ParcelAdded], name = "parcel_added"), + new JsonSubTypes.Type(value = classOf[ParcelRemoved], name = "parcel_removed"), + new JsonSubTypes.Type(value = classOf[ParcelHistoryStateAdded], name = "parcel_history_state_added") + ) + ) + sealed trait Event extends JsonSerializable case class Parcel(comment: String, attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) { def fullStatePrint(parcelId: String): String = { - val statesString = states - .toSeq + val statesString = states.toSeq .sortBy(state => czechPostDateFormat.parse(state.date)) .map(state => s"${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}\n===========================\n") .mkString @@ -74,18 +77,14 @@ object CzechPostDeliveryCheck { } def latestStatePrint(parcelId: String): String = { - latestState - .map(state => s"$parcelId ($comment) - ${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}") - .getOrElse(s"$parcelId ($comment) - NO INFO") + latestState.map(state => s"$parcelId ($comment) - ${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}").getOrElse(s"$parcelId ($comment) - NO INFO") } private def latestState: Option[Entities.State] = states.toSeq.maxByOption(state => czechPostDateFormat.parse(state.date)) } - case class State(parcelStates: Map[String, Parcel] = Map.empty) { - def latestStatesPrint: Seq[String] = parcelStates - .map { case (id, parcel) => parcel.latestStatePrint(id) } - .to(Seq) + case class State(parcelStates: Map[String, Parcel] = Map.empty) extends JsonSerializable { + def latestStatesPrint: Seq[String] = parcelStates.map { case (id, parcel) => parcel.latestStatePrint(id) }.to(Seq) } case class AddParcel(parcelId: String, comment: String, replyTo: ActorRef[CommandResult]) extends Command @@ -106,13 +105,9 @@ object CzechPostDeliveryCheck { private case class ParcelHistoryRetrieved(parcelHistory: Entities.ParcelHistory) extends Command case class DeliveryStateChanged(state: String) - @SerialVersionUID(1L) case class ParcelAdded(parcelId: String, comment: String) extends Event - @SerialVersionUID(1L) case class ParcelRemoved(parcelId: String) extends Event - @SerialVersionUID(1L) case class ParcelHistoryStateAdded(parcelId: String, state: Entities.State) extends Event - @SerialVersionUID(1L) case class ParcelAttributesChanged(parcelId: String, attributes: Entities.Attributes) extends Event private val trustfulSslContext: SSLContext = { @@ -134,32 +129,22 @@ object CzechPostDeliveryCheck { implicit val actorSystem: ActorSystem = ctx.system.toClassic implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) val http = Http() - val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose - .withAcceptAnyCertificate(true) - .withDisableHostnameVerification(true))) + val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withAcceptAnyCertificate(true).withDisableHostnameVerification(true))) val originalCtx = http.createClientHttpsContext(badSslConfig) - val sslContext = new HttpsConnectionContext( - trustfulSslContext, - originalCtx.sslConfig, - originalCtx.enabledCipherSuites, - originalCtx.enabledProtocols, - originalCtx.clientAuth, - originalCtx.sslParameters, - Negotiated - ) + val sslContext = ConnectionContext.httpsClient(trustfulSslContext) val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"))) val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings) - scheduler.startPeriodicTimer("check-delivery-state", CheckParcels, 5.minutes) + scheduler.startTimerAtFixedRate("check-delivery-state", CheckParcels, 5.minutes) + + val log = ctx.log val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { cmd match { case AddParcel(parcelId, comment, replyTo) => val parcelIdUpper = parcelId.toUpperCase if (state.parcelStates.keySet.contains(parcelIdUpper)) { - Effect - .none - .thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper))) + Effect.none.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper))) } else { Effect .persist(ParcelAdded(parcelIdUpper, comment)) @@ -171,75 +156,64 @@ object CzechPostDeliveryCheck { case RemoveParcel(parcelId, replyTo) => val parcelIdUpper = parcelId.toUpperCase if (state.parcelStates.contains(parcelIdUpper)) { - Effect - .persist(ParcelRemoved(parcelIdUpper)) - .thenRun(_ => replyTo ! CommandResultSuccess) + Effect.persist(ParcelRemoved(parcelIdUpper)).thenRun(_ => replyTo ! CommandResultSuccess) } else { - Effect - .none - .thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper))) + Effect.none.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper))) } case ListParcels(replyTo) => - Effect.none - .thenRun { state => - val parcelsList = state.latestStatesPrint - replyTo ! ListParcelsResult(parcelsList) - } + Effect.none.thenRun { state => + val parcelsList = state.latestStatesPrint + replyTo ! ListParcelsResult(parcelsList) + } case ListParcelIds(replyTo) => - Effect.none - .thenRun { state => - replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq) - } + Effect.none.thenRun { state => + replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq) + } case CheckParcels => - Effect - .none - .thenRun { _ => - ctx.log.info("action=check_parcel_state chat_id={}", chatId) - val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id")) + Effect.none.thenRun { _ => + log.info("action=check_parcel_state chat_id={}", chatId) + val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id")) - for (ids <- parcelIds) { - val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz") - val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) + for (ids <- parcelIds) { + val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz") + val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) - ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri) + log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri) - http - .singleRequest(request, connectionContext = sslContext, settings = connectionSettings) - .transform { - case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) - case response: Failure[HttpResponse] => response - } - .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) - .andThen { - case Success(parcelHistories) => - parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory)) - case Failure(exception) => - ctx.log.error("Error checking parcel history.", exception) - } - .andThen { - case Success(_) => ctx.log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri) - case Failure(exception) => ctx.log.error(s"action=check_parcel_state result=failure chat_id=$chatId check_uri=$checkUri", exception) - } - } + http + .singleRequest(request, connectionContext = sslContext, settings = connectionSettings) + .transform { + case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) + case response: Failure[HttpResponse] => response + } + .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) + .andThen { + case Success(parcelHistories) => + parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory)) + case Failure(exception) => + log.error("Error checking parcel history.", exception) + } + .andThen { + case Success(_) => log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri) + case Failure(exception) => log.error(s"action=check_parcel_state result=failure chat_id=$chatId check_uri=$checkUri", exception) + } } + } case ParcelHistoryRetrieved(parcelHistory) => val parcelId = parcelHistory.id val parcelState = state.parcelStates(parcelId) val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty) Some(parcelHistory.attributes) else - parcelState.attributes - .flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None)) + parcelState.attributes.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None)) .map(attributes => ParcelAttributesChanged(parcelId, attributes)) .toSeq val newStates = parcelHistory.states.state.toSet -- parcelState.states - val stateEvents: Seq[Event] = newStates - .map(state => ParcelHistoryStateAdded(parcelId, state)) - .toSeq + val stateEvents: Seq[Event] = newStates.map(state => ParcelHistoryStateAdded(parcelId, state)).toSeq val comment = state.parcelStates(parcelId).comment Effect @@ -268,12 +242,7 @@ object CzechPostDeliveryCheck { } } - EventSourcedBehavior[Command, Event, State]( - persistenceId = PersistenceId(entityType, chatId), - emptyState = State(), - commandHandler = commandHandler, - eventHandler = eventHandler - ) + EventSourcedBehavior[Command, Event, State](persistenceId = PersistenceId(entityType, chatId), emptyState = State(), commandHandler = commandHandler, eventHandler = eventHandler) } } } diff --git a/src/main/scala/eu/xeppaka/bot/JsonSerializable.scala b/src/main/scala/eu/xeppaka/bot/JsonSerializable.scala new file mode 100644 index 0000000..4b0f354 --- /dev/null +++ b/src/main/scala/eu/xeppaka/bot/JsonSerializable.scala @@ -0,0 +1,3 @@ +package eu.xeppaka.bot + +trait JsonSerializable diff --git a/src/main/scala/eu/xeppaka/bot/Main.scala b/src/main/scala/eu/xeppaka/bot/Main.scala index b236d51..778d9f2 100644 --- a/src/main/scala/eu/xeppaka/bot/Main.scala +++ b/src/main/scala/eu/xeppaka/bot/Main.scala @@ -1,21 +1,24 @@ package eu.xeppaka.bot import java.nio.file.Paths - import akka.actor.Scheduler import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.{ActorSystem, DispatcherSelector, SupervisorStrategy} +import akka.actor.typed.{ ActorSystem, DispatcherSelector, SupervisorStrategy } import akka.http.scaladsl.Http import akka.util.Timeout -import akka.{Done, actor} +import akka.{ actor, Done } +import com.fasterxml.jackson.annotation.JsonInclude +import de.heikoseeberger.akkahttpjackson.JacksonSupport import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContextExecutor, Future} +import scala.concurrent.{ Await, ExecutionContextExecutor, Future } import scala.io.StdIn object Main { + JacksonSupport.defaultObjectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY) + def main(args: Array[String]): Unit = { val botId = System.getProperty("botId", "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4") val localPort = 8443 diff --git a/src/main/scala/eu/xeppaka/bot/TelegramBot.scala b/src/main/scala/eu/xeppaka/bot/TelegramBot.scala index 31df1bf..bde41f9 100644 --- a/src/main/scala/eu/xeppaka/bot/TelegramBot.scala +++ b/src/main/scala/eu/xeppaka/bot/TelegramBot.scala @@ -1,7 +1,7 @@ package eu.xeppaka.bot import java.io.InputStream -import java.security.{KeyStore, SecureRandom} +import java.security.{ KeyStore, SecureRandom } import java.util.UUID import akka.Done @@ -11,18 +11,18 @@ import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed._ import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ -import akka.http.scaladsl.server.Directives.{as, complete, entity, extractLog, onComplete, path, post} +import akka.http.scaladsl.server.Directives.{ as, complete, entity, extractLog, onComplete, path, post } import akka.http.scaladsl.server.Route -import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext} -import akka.util.{ByteString, Timeout} +import akka.http.scaladsl.{ ConnectionContext, Http, HttpExt, HttpsConnectionContext } +import akka.util.{ ByteString, Timeout } import eu.xeppaka.telegram.bot.TelegramEntities._ -import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} +import javax.net.ssl.{ KeyManagerFactory, SSLContext, TrustManagerFactory } import scala.collection.immutable import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ import scala.io.Source -import scala.util.{Failure, Success} +import scala.util.{ Failure, Success } object TelegramBot { sealed trait Command @@ -44,27 +44,31 @@ object TelegramBot { val http: HttpExt = Http() val hookId = UUID.randomUUID().toString val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId") - val httpsContext = if (useHttpsServer) Some(createHttpsConnectionContext) else None val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart)) val routes = botRoutes(hookId, dialogManager)(ctx.system.scheduler) - def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx => + def bindServer: Behavior[Command] = Behaviors.setup[Command] { ctx => case class BindingSuccess(binding: Http.ServerBinding) extends Command case class BindingFailure(exception: Throwable) extends Command ctx.log.info("action=bind_server interface={} port={}", interface, localPort) - http - .bindAndHandle(routes, interface, localPort, httpsContext.getOrElse(http.defaultServerHttpContext)) - .onComplete { - case Success(binding) => ctx.self ! BindingSuccess(binding) - case Failure(exception) => ctx.self ! BindingFailure(exception) - } + val serverBuilder = http.newServerAt(interface, localPort) + val bindFuture = if (useHttpsServer) { + serverBuilder.enableHttps(createHttpsConnectionContext).bindFlow(routes) + } else { + serverBuilder.bindFlow(routes) + } + + bindFuture.onComplete { + case Success(binding) => ctx.self ! BindingSuccess(binding) + case Failure(exception) => ctx.self ! BindingFailure(exception) + } Behaviors.receiveMessage[Command] { case BindingSuccess(binding) => ctx.log.info("action=bind_server result=success") - settingWebhook(binding) + setWebhook(binding) case BindingFailure(exception) => ctx.log.error("action=bind_server result=failure", exception) ctx.log.error("action=start_bot result=failure") @@ -75,18 +79,16 @@ object TelegramBot { } } - def unbindingServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx => + def unbindServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx => case object UnbindingSuccess extends Command case class UnbindingFailure(exception: Throwable) extends Command ctx.log.info("action=unbind_server interface={} port={}", interface, localPort) - binding - .unbind() - .onComplete { - case Success(Done) => ctx.self ! UnbindingSuccess - case Failure(exception) => ctx.self ! UnbindingFailure(exception) - } + binding.unbind().onComplete { + case Success(Done) => ctx.self ! UnbindingSuccess + case Failure(exception) => ctx.self ! UnbindingFailure(exception) + } Behaviors.receiveMessage[Command] { case UnbindingSuccess => @@ -101,7 +103,7 @@ object TelegramBot { } } - def settingWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx => + def setWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx => case object SetWebhookSuccess extends Command case class SetWebhookFailure(exception: Throwable) extends Command @@ -124,18 +126,15 @@ object TelegramBot { val formParts = immutable.Seq(urlPart, certificatePart).flatten val formData = Multipart.FormData.Strict(formParts) - Marshal(formData) - .to[RequestEntity] - .flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))) - .onComplete { - case Success(response) => - if (response.status.isSuccess()) - ctx.self ! SetWebhookSuccess - else - ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}.")) - case Failure(exception) => - ctx.self ! SetWebhookFailure(exception) - } + Marshal(formData).to[RequestEntity].flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))).onComplete { + case Success(response) => + if (response.status.isSuccess()) + ctx.self ! SetWebhookSuccess + else + ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}.")) + case Failure(exception) => + ctx.self ! SetWebhookFailure(exception) + } Behaviors.receiveMessage { case SetWebhookSuccess => @@ -145,9 +144,9 @@ object TelegramBot { if (attempt > 20) { ctx.log.error(s"action=set_webhook result=failure attempt=$attempt", exception) ctx.log.error("action=start_bot result=failure") - unbindingServer(binding, None) + unbindServer(binding, None) } else { - settingWebhook(binding, attempt = attempt + 1) + setWebhook(binding, attempt = attempt + 1) } case otherCommand: Command => stashBuffer.stash(otherCommand) @@ -163,25 +162,23 @@ object TelegramBot { implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) - http - .singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)) - .onComplete { - case Success(response) => - if (response.status.isSuccess()) - ctx.self ! DeleteWebhookSuccess - else - ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}")) - case Failure(exception) => - ctx.self ! DeleteWebhookFailure(exception) - } + http.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)).onComplete { + case Success(response) => + if (response.status.isSuccess()) + ctx.self ! DeleteWebhookSuccess + else + ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}")) + case Failure(exception) => + ctx.self ! DeleteWebhookFailure(exception) + } Behaviors.receiveMessage { case DeleteWebhookSuccess => ctx.log.info("action=delete_webhook result=success") - unbindingServer(binding, Some(replyTo)) + unbindServer(binding, Some(replyTo)) case DeleteWebhookFailure(exception) => ctx.log.error("action=delete_webhook result=failure", exception) - unbindingServer(binding, Some(replyTo)) + unbindServer(binding, Some(replyTo)) case _ => Behaviors.unhandled } } @@ -198,14 +195,13 @@ object TelegramBot { } } - bindingServer + bindServer } } private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = { + import de.heikoseeberger.akkahttpjackson.JacksonSupport._ import akka.actor.typed.scaladsl.AskPattern._ - import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ - import io.circe.generic.auto._ implicit val timeout: Timeout = 30.seconds @@ -213,13 +209,16 @@ object TelegramBot { post { extractLog { log => entity(as[Update]) { update => +// log.info("update={}", update) +// complete(StatusCodes.OK) onComplete(updatesProcessor.ask[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) { - case Success(processResult) => processResult match { - case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK)) - case DialogManager.ProcessUpdateFailure(exception) => - log.error(exception, "action=process_update result=failure message={}", update) - complete(HttpResponse(status = StatusCodes.InternalServerError)) - } + case Success(processResult) => + processResult match { + case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK)) + case DialogManager.ProcessUpdateFailure(exception) => + log.error(exception, "action=process_update result=failure message={}", update) + complete(HttpResponse(status = StatusCodes.InternalServerError)) + } case Failure(exception) => log.error(exception, "action=process_update result=failure message={}", update) complete(HttpResponse(status = StatusCodes.InternalServerError))