Upgrade versions, use jackson for json serialization

This commit is contained in:
Pavel Kachalouski
2020-12-27 16:03:35 +01:00
parent d960d6e6f2
commit 4702d3a650
13 changed files with 302 additions and 265 deletions

View File

@@ -1 +0,0 @@
-mem 2048

View File

@@ -1,10 +1,7 @@
import Dependencies._ import Dependencies._
import Versions._
lazy val commonSettings = Seq( lazy val commonSettings = Seq(organization := "eu.xeppaka", scalaVersion := "2.13.4", mainClass := Some("eu.xeppaka.bot.Main"))
organization := "com.example",
scalaVersion := "2.13.1",
mainClass := Some("eu.xeppaka.bot.Main")
)
inThisBuild(commonSettings) inThisBuild(commonSettings)
@@ -12,29 +9,27 @@ lazy val `telegram-bot-delivery` = (project in file("."))
.settings( .settings(
name := "telegram-bot-delivery", name := "telegram-bot-delivery",
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
scalaTest % Test, scalaTest % Test,
akkaTyped, akkaTyped,
akkaClusterShardingTyped, akkaSerializationJackson,
akkaHttp, akkaClusterShardingTyped,
akkaStream, akkaHttp,
akkaPersistence, akkaHttpJackson,
akkaPersistenceCassandra, akkaStream,
akkaPersistenceQuery, akkaPersistence,
levelDbJni, akkaPersistenceCassandra,
circleCore, akkaPersistenceQuery,
circleGeneric, slibTelegram,
circleParser, logback
circeAkkaHttp, ),
slibTelegram, dependencyOverrides ++= Seq("com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion),
logback
),
dockerBaseImage := "openjdk:11", dockerBaseImage := "openjdk:11",
dockerExposedPorts := Seq(8443), dockerExposedPorts := Seq(8443),
dockerRepository := Some("registry.xeppaka.eu:443"), dockerRepository := Some("registry.xeppaka.eu:443"),
Docker / daemonUserUid := Some("1001"), Docker / daemonUserUid := Some("1001"),
Docker / daemonUser := "telegram-bot", Docker / daemonUser := "telegram-bot",
Docker / defaultLinuxInstallLocation := "/opt/telegram-bot-delivery", Docker / defaultLinuxInstallLocation := "/opt/telegram-bot-delivery",
version := "1.1.1" version := "1.1.2"
) )
.enablePlugins(JavaServerAppPackaging) .enablePlugins(JavaServerAppPackaging)
.enablePlugins(DockerPlugin) .enablePlugins(DockerPlugin)

32
msg.json Normal file
View File

@@ -0,0 +1,32 @@
{
"chat_id": 77544923,
"text": "*List of your watched parcels:*\n(empty)",
"parse_mode": "Markdown",
"disable_web_page_preview": null,
"disable_notification": null,
"reply_to_message_id": null,
"reply_markup": {
"keyboard": [
[
{
"text": "/add",
"request_contact": null,
"request_location": null
},
{
"text": "/list",
"request_contact": null,
"request_location": null
},
{
"text": "/remove",
"request_contact": null,
"request_location": null
}
]
],
"resize_keyboard": true,
"one_time_keyboard": true,
"selective": null
}
}

View File

@@ -4,29 +4,24 @@ import Dependencies.Versions._
object Dependencies { object Dependencies {
object Versions { object Versions {
val akkaVersion = "2.6.3" val akkaVersion = "2.6.10"
val akkaHttpVersion = "10.1.11" val akkaHttpVersion = "10.2.2"
val akkaPersistenceCassandraVersion = "0.103" val akkaHttpJacksonVersion = "1.35.2"
val levelDbJniVersion = "1.8" val akkaPersistenceCassandraVersion = "1.0.4"
val circeVersion = "0.13.0" val scalaTestVersion = "3.2.2"
val akkaHttpCirceVersion = "1.31.0"
val scalaTestVersion = "3.2.0-M4"
val slibTelegramVersion = "0.1.0" val slibTelegramVersion = "0.1.0"
val logbackVersion = "1.2.3" val logbackVersion = "1.2.3"
} }
val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
val akkaSerializationJackson = "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion
val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
val akkaHttpJackson = "de.heikoseeberger" %% "akka-http-jackson" % akkaHttpJacksonVersion
val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion
val akkaClusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion val akkaClusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion
val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandraVersion val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandraVersion
val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion
val levelDbJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % levelDbJniVersion
val circleCore = "io.circe" %% "circe-core" % circeVersion
val circleGeneric = "io.circe" %% "circe-generic" % circeVersion
val circleParser = "io.circe" %% "circe-parser" % circeVersion
val circeAkkaHttp = "de.heikoseeberger" %% "akka-http-circe" % akkaHttpCirceVersion
val slibTelegram = "eu.xeppaka" %% "slib-telegram" % slibTelegramVersion val slibTelegram = "eu.xeppaka" %% "slib-telegram" % slibTelegramVersion
val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion val logback = "ch.qos.logback" % "logback-classic" % logbackVersion

View File

@@ -1 +1 @@
sbt.version=1.3.8 sbt.version=1.4.5

View File

@@ -1,3 +1,2 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.4.1") addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.8.0")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.12") addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1")

View File

@@ -1,17 +0,0 @@
akka {
loglevel = "INFO"
extensions = [akka.persistence.Persistence]
persistence {
journal {
plugin = "cassandra-journal"
}
}
actor.allow-java-serialization = on
}
cassandra-journal {
contact-points = ["cassandra"]
}

View File

@@ -0,0 +1,65 @@
telegram-bot-delivery {
cassandra.keyspace = "telegram_bot_delivery"
cassandra.autocreate-offset-store = true
}
datastax-java-driver {
advanced {
reconnect-on-init = true
metadata.schema.enabled = false
metadata.token-map.enabled = false
}
basic {
contact-points = ["127.0.0.1:9042"]
load-balancing-policy.local-datacenter = "datacenter1"
}
}
akka {
loglevel = "INFO"
extensions = [akka.persistence.Persistence]
actor {
serialization-bindings {
"eu.xeppaka.bot.JsonSerializable" = jackson-json
}
}
persistence {
journal {
plugin = "akka.persistence.cassandra.journal"
auto-start-journals = ["akka.persistence.cassandra.journal"]
}
snapshot-store {
plugin = "akka.persistence.cassandra.snapshot"
auto-start-snapshot-stores = ["akka.persistence.cassandra.snapshot"]
}
cassandra {
journal {
keyspace-autocreate = true
tables-autocreate = true
keyspace = ${telegram-bot-delivery.cassandra.keyspace}
max-message-batch-size = 30
support-all-persistence-ids = off
}
snapshot {
keyspace-autocreate = true
tables-autocreate = true
keyspace = ${telegram-bot-delivery.cassandra.keyspace}
}
events-by-tag {
max-message-batch-size = 30
eventual-consistency-delay = 2s
pubsub-notification = on
first-time-bucket = "20200101T00:00"
}
}
}
projection.cassandra {
offset-store {
keyspace = ${telegram-bot-delivery.cassandra.keyspace}
}
}
}

View File

@@ -3,19 +3,21 @@ package eu.xeppaka.bot
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} import akka.http.scaladsl.marshalling.Marshal
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy }
import akka.http.scaladsl.Http import akka.http.scaladsl.Http
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.stream.scaladsl.{Sink, Source} import akka.stream.scaladsl.{ Sink, Source }
import akka.util.{ByteString, Timeout} import akka.util.{ ByteString, Timeout }
import eu.xeppaka.telegram.bot.TelegramEntities._ import eu.xeppaka.telegram.bot.TelegramEntities._
import io.circe.Printer
import scala.concurrent.ExecutionContext import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.{Failure, Success} import scala.util.{ Failure, Success }
object CheckDeliveryDialog { object CheckDeliveryDialog {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
sealed trait Command sealed trait Command
sealed trait CommandResult sealed trait CommandResult
sealed trait DialogCommand extends Command sealed trait DialogCommand extends Command
@@ -31,16 +33,14 @@ object CheckDeliveryDialog {
object DialogCommand { object DialogCommand {
def parse(text: String): DialogCommand = text match { def parse(text: String): DialogCommand = text match {
case "/add" => AddParcel case "/add" => AddParcel
case "/remove" => RemoveParcel case "/remove" => RemoveParcel
case "/list" => ListParcels case "/list" => ListParcels
case "/help" => Help case "/help" => Help
case _ => Help case _ => Help
} }
} }
// json printer
private val printer = Printer.noSpaces.copy(dropNullValues = true)
// internal messages // internal messages
private case class DeliveryStateChanged(state: String) extends Command private case class DeliveryStateChanged(state: String) extends Command
private val helpMessage = private val helpMessage =
@@ -50,11 +50,9 @@ object CheckDeliveryDialog {
|/list - list watched parcels |/list - list watched parcels
|/remove - remove parcel from a watching list |/remove - remove parcel from a watching list
""".stripMargin """.stripMargin
private val commandsKeyboard = Some(ReplyKeyboardMarkup( private val commandsKeyboard = Some(
Seq(Seq(KeyboardButton("/add"), KeyboardButton("/list"), KeyboardButton("/remove"))), ReplyKeyboardMarkup(Seq(Seq(KeyboardButton("/add"), KeyboardButton("/list"), KeyboardButton("/remove"))), resize_keyboard = Some(true), one_time_keyboard = Some(true))
resize_keyboard = Some(true), )
one_time_keyboard = Some(true)
))
private val removeKeyboard = Some(ReplyKeyboardRemove()) private val removeKeyboard = Some(ReplyKeyboardRemove())
@@ -102,9 +100,9 @@ object CheckDeliveryDialog {
implicit val timeout: Timeout = 5.seconds implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) {
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception) case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception)
case Failure(exception) => AddParcelFailure(exception) case Failure(exception) => AddParcelFailure(exception)
} }
Behaviors.receiveMessage { Behaviors.receiveMessage {
@@ -136,7 +134,7 @@ object CheckDeliveryDialog {
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcels(ref)) { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcels(ref)) {
case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList) case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList)
case Failure(exception) => ListParcelsFailure(exception) case Failure(exception) => ListParcelsFailure(exception)
} }
Behaviors.receiveMessage { Behaviors.receiveMessage {
@@ -163,7 +161,7 @@ object CheckDeliveryDialog {
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcelIds(ref)) { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcelIds(ref)) {
case Success(CzechPostDeliveryCheck.ListParcelIdsResult(parcelsList)) => ListParcelIdsSuccess(parcelsList) case Success(CzechPostDeliveryCheck.ListParcelIdsResult(parcelsList)) => ListParcelIdsSuccess(parcelsList)
case Failure(exception) => ListParcelIdsFailure(exception) case Failure(exception) => ListParcelIdsFailure(exception)
} }
Behaviors.receiveMessage { Behaviors.receiveMessage {
@@ -195,9 +193,9 @@ object CheckDeliveryDialog {
implicit val timeout: Timeout = 5.seconds implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) {
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception) case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception)
case Failure(exception) => RemoveParcelFailure(exception) case Failure(exception) => RemoveParcelFailure(exception)
} }
Behaviors.receiveMessage { Behaviors.receiveMessage {
@@ -250,34 +248,31 @@ object CheckDeliveryDialog {
def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
Behaviors.setup[Command] { ctx => Behaviors.setup[Command] { ctx =>
import io.circe.generic.auto._
import io.circe.syntax._
case object SendMessageSuccess extends Command case object SendMessageSuccess extends Command
case class SendMessageFailure(exception: Throwable) extends Command case class SendMessageFailure(exception: Throwable) extends Command
val json = printer.print(message.asJson) ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, message)
val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json)))
ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, json) println(message)
println(Await.result(Marshal(message).to[HttpEntity], 2.seconds).asInstanceOf[HttpEntity.Strict].data.utf8String)
Source Source
.single(request) .future(Marshal(message).to[RequestEntity])
.initialDelay(2.seconds * (attempt - 1)) .initialDelay(2.seconds * (attempt - 1))
.map(requestEntity => HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = requestEntity))
.mapAsync(1) { request => .mapAsync(1) { request =>
http http.singleRequest(request).transform {
.singleRequest(request) case Success(response) =>
.transform { if (response.status.isSuccess()) {
case Success(response) => Success(SendMessageSuccess)
if (response.status.isSuccess()) { } else {
Success(SendMessageSuccess) Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}.")))
} else { }
Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}."))) case Failure(exception) =>
} ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId", exception)
case Failure(exception) => Success(SendMessageFailure(exception))
ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId", exception) }
Success(SendMessageFailure(exception))
}
} }
.to(Sink.foreach(ctx.self ! _)) .to(Sink.foreach(ctx.self ! _))
.run() .run()

View File

@@ -1,38 +1,31 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import java.security.cert.X509Certificate
import java.text.SimpleDateFormat
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector} import akka.actor.typed.{ ActorRef, Behavior, DispatcherSelector }
import akka.http.scaladsl.UseHttp2.Negotiated
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Accept, `User-Agent`} import akka.http.scaladsl.model.headers.{ `User-Agent`, Accept }
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings }
import akka.http.scaladsl.unmarshalling.Unmarshal import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{Http, HttpsConnectionContext} import akka.http.scaladsl.{ ConnectionContext, Http }
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler} import akka.persistence.typed.scaladsl.EventSourcedBehavior.{ CommandHandler, EventHandler }
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior} import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import com.fasterxml.jackson.annotation.{ JsonSubTypes, JsonTypeInfo }
import com.typesafe.sslconfig.akka.AkkaSSLConfig import com.typesafe.sslconfig.akka.AkkaSSLConfig
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._
import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager}
import java.security.cert.X509Certificate
import java.text.SimpleDateFormat
import javax.net.ssl.{ KeyManager, SSLContext, X509TrustManager }
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.{Failure, Success} import scala.util.{ Failure, Success }
object Entities { object Entities {
case class Attributes( case class Attributes(parcelType: String, weight: Double, currency: String)
parcelType: String,
weight: Double,
currency: String,
)
case class State( case class State(
id: String, id: String,
@@ -53,17 +46,27 @@ object Entities {
} }
object CzechPostDeliveryCheck { object CzechPostDeliveryCheck {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
private val czechPostDateFormat = new SimpleDateFormat("yyyy-MM-dd") private val czechPostDateFormat = new SimpleDateFormat("yyyy-MM-dd")
private val printDateFormat = new SimpleDateFormat("dd-MM-yyyy") private val printDateFormat = new SimpleDateFormat("dd-MM-yyyy")
private val entityType = "czechpost" private val entityType = "czechpost"
sealed trait Command sealed trait Command extends JsonSerializable
sealed trait CommandResult sealed trait CommandResult extends JsonSerializable
sealed trait Event @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
Array(
new JsonSubTypes.Type(value = classOf[ParcelAttributesChanged], name = "parcel_attributes_changed"),
new JsonSubTypes.Type(value = classOf[ParcelAdded], name = "parcel_added"),
new JsonSubTypes.Type(value = classOf[ParcelRemoved], name = "parcel_removed"),
new JsonSubTypes.Type(value = classOf[ParcelHistoryStateAdded], name = "parcel_history_state_added")
)
)
sealed trait Event extends JsonSerializable
case class Parcel(comment: String, attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) { case class Parcel(comment: String, attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) {
def fullStatePrint(parcelId: String): String = { def fullStatePrint(parcelId: String): String = {
val statesString = states val statesString = states.toSeq
.toSeq
.sortBy(state => czechPostDateFormat.parse(state.date)) .sortBy(state => czechPostDateFormat.parse(state.date))
.map(state => s"${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}\n===========================\n") .map(state => s"${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}\n===========================\n")
.mkString .mkString
@@ -74,18 +77,14 @@ object CzechPostDeliveryCheck {
} }
def latestStatePrint(parcelId: String): String = { def latestStatePrint(parcelId: String): String = {
latestState latestState.map(state => s"$parcelId ($comment) - ${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}").getOrElse(s"$parcelId ($comment) - NO INFO")
.map(state => s"$parcelId ($comment) - ${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}")
.getOrElse(s"$parcelId ($comment) - NO INFO")
} }
private def latestState: Option[Entities.State] = states.toSeq.maxByOption(state => czechPostDateFormat.parse(state.date)) private def latestState: Option[Entities.State] = states.toSeq.maxByOption(state => czechPostDateFormat.parse(state.date))
} }
case class State(parcelStates: Map[String, Parcel] = Map.empty) { case class State(parcelStates: Map[String, Parcel] = Map.empty) extends JsonSerializable {
def latestStatesPrint: Seq[String] = parcelStates def latestStatesPrint: Seq[String] = parcelStates.map { case (id, parcel) => parcel.latestStatePrint(id) }.to(Seq)
.map { case (id, parcel) => parcel.latestStatePrint(id) }
.to(Seq)
} }
case class AddParcel(parcelId: String, comment: String, replyTo: ActorRef[CommandResult]) extends Command case class AddParcel(parcelId: String, comment: String, replyTo: ActorRef[CommandResult]) extends Command
@@ -106,13 +105,9 @@ object CzechPostDeliveryCheck {
private case class ParcelHistoryRetrieved(parcelHistory: Entities.ParcelHistory) extends Command private case class ParcelHistoryRetrieved(parcelHistory: Entities.ParcelHistory) extends Command
case class DeliveryStateChanged(state: String) case class DeliveryStateChanged(state: String)
@SerialVersionUID(1L)
case class ParcelAdded(parcelId: String, comment: String) extends Event case class ParcelAdded(parcelId: String, comment: String) extends Event
@SerialVersionUID(1L)
case class ParcelRemoved(parcelId: String) extends Event case class ParcelRemoved(parcelId: String) extends Event
@SerialVersionUID(1L)
case class ParcelHistoryStateAdded(parcelId: String, state: Entities.State) extends Event case class ParcelHistoryStateAdded(parcelId: String, state: Entities.State) extends Event
@SerialVersionUID(1L)
case class ParcelAttributesChanged(parcelId: String, attributes: Entities.Attributes) extends Event case class ParcelAttributesChanged(parcelId: String, attributes: Entities.Attributes) extends Event
private val trustfulSslContext: SSLContext = { private val trustfulSslContext: SSLContext = {
@@ -134,32 +129,22 @@ object CzechPostDeliveryCheck {
implicit val actorSystem: ActorSystem = ctx.system.toClassic implicit val actorSystem: ActorSystem = ctx.system.toClassic
implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
val http = Http() val http = Http()
val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withAcceptAnyCertificate(true).withDisableHostnameVerification(true)))
.withAcceptAnyCertificate(true)
.withDisableHostnameVerification(true)))
val originalCtx = http.createClientHttpsContext(badSslConfig) val originalCtx = http.createClientHttpsContext(badSslConfig)
val sslContext = new HttpsConnectionContext( val sslContext = ConnectionContext.httpsClient(trustfulSslContext)
trustfulSslContext,
originalCtx.sslConfig,
originalCtx.enabledCipherSuites,
originalCtx.enabledProtocols,
originalCtx.clientAuth,
originalCtx.sslParameters,
Negotiated
)
val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"))) val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0")))
val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings) val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings)
scheduler.startPeriodicTimer("check-delivery-state", CheckParcels, 5.minutes) scheduler.startTimerAtFixedRate("check-delivery-state", CheckParcels, 5.minutes)
val log = ctx.log
val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
cmd match { cmd match {
case AddParcel(parcelId, comment, replyTo) => case AddParcel(parcelId, comment, replyTo) =>
val parcelIdUpper = parcelId.toUpperCase val parcelIdUpper = parcelId.toUpperCase
if (state.parcelStates.keySet.contains(parcelIdUpper)) { if (state.parcelStates.keySet.contains(parcelIdUpper)) {
Effect Effect.none.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper)))
.none
.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper)))
} else { } else {
Effect Effect
.persist(ParcelAdded(parcelIdUpper, comment)) .persist(ParcelAdded(parcelIdUpper, comment))
@@ -171,75 +156,64 @@ object CzechPostDeliveryCheck {
case RemoveParcel(parcelId, replyTo) => case RemoveParcel(parcelId, replyTo) =>
val parcelIdUpper = parcelId.toUpperCase val parcelIdUpper = parcelId.toUpperCase
if (state.parcelStates.contains(parcelIdUpper)) { if (state.parcelStates.contains(parcelIdUpper)) {
Effect Effect.persist(ParcelRemoved(parcelIdUpper)).thenRun(_ => replyTo ! CommandResultSuccess)
.persist(ParcelRemoved(parcelIdUpper))
.thenRun(_ => replyTo ! CommandResultSuccess)
} else { } else {
Effect Effect.none.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper)))
.none
.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper)))
} }
case ListParcels(replyTo) => case ListParcels(replyTo) =>
Effect.none Effect.none.thenRun { state =>
.thenRun { state => val parcelsList = state.latestStatesPrint
val parcelsList = state.latestStatesPrint replyTo ! ListParcelsResult(parcelsList)
replyTo ! ListParcelsResult(parcelsList) }
}
case ListParcelIds(replyTo) => case ListParcelIds(replyTo) =>
Effect.none Effect.none.thenRun { state =>
.thenRun { state => replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq)
replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq) }
}
case CheckParcels => case CheckParcels =>
Effect Effect.none.thenRun { _ =>
.none log.info("action=check_parcel_state chat_id={}", chatId)
.thenRun { _ => val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id"))
ctx.log.info("action=check_parcel_state chat_id={}", chatId)
val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id"))
for (ids <- parcelIds) { for (ids <- parcelIds) {
val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz") val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz")
val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`)))
ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri) log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri)
http http
.singleRequest(request, connectionContext = sslContext, settings = connectionSettings) .singleRequest(request, connectionContext = sslContext, settings = connectionSettings)
.transform { .transform {
case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}."))
case response: Failure[HttpResponse] => response case response: Failure[HttpResponse] => response
} }
.flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]]) .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]])
.andThen { .andThen {
case Success(parcelHistories) => case Success(parcelHistories) =>
parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory)) parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory))
case Failure(exception) => case Failure(exception) =>
ctx.log.error("Error checking parcel history.", exception) log.error("Error checking parcel history.", exception)
} }
.andThen { .andThen {
case Success(_) => ctx.log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri) case Success(_) => log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri)
case Failure(exception) => ctx.log.error(s"action=check_parcel_state result=failure chat_id=$chatId check_uri=$checkUri", exception) case Failure(exception) => log.error(s"action=check_parcel_state result=failure chat_id=$chatId check_uri=$checkUri", exception)
} }
}
} }
}
case ParcelHistoryRetrieved(parcelHistory) => case ParcelHistoryRetrieved(parcelHistory) =>
val parcelId = parcelHistory.id val parcelId = parcelHistory.id
val parcelState = state.parcelStates(parcelId) val parcelState = state.parcelStates(parcelId)
val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty) val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty)
Some(parcelHistory.attributes) Some(parcelHistory.attributes)
else else
parcelState.attributes parcelState.attributes.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None))
.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None))
.map(attributes => ParcelAttributesChanged(parcelId, attributes)) .map(attributes => ParcelAttributesChanged(parcelId, attributes))
.toSeq .toSeq
val newStates = parcelHistory.states.state.toSet -- parcelState.states val newStates = parcelHistory.states.state.toSet -- parcelState.states
val stateEvents: Seq[Event] = newStates val stateEvents: Seq[Event] = newStates.map(state => ParcelHistoryStateAdded(parcelId, state)).toSeq
.map(state => ParcelHistoryStateAdded(parcelId, state))
.toSeq
val comment = state.parcelStates(parcelId).comment val comment = state.parcelStates(parcelId).comment
Effect Effect
@@ -268,12 +242,7 @@ object CzechPostDeliveryCheck {
} }
} }
EventSourcedBehavior[Command, Event, State]( EventSourcedBehavior[Command, Event, State](persistenceId = PersistenceId(entityType, chatId), emptyState = State(), commandHandler = commandHandler, eventHandler = eventHandler)
persistenceId = PersistenceId(entityType, chatId),
emptyState = State(),
commandHandler = commandHandler,
eventHandler = eventHandler
)
} }
} }
} }

View File

@@ -0,0 +1,3 @@
package eu.xeppaka.bot
trait JsonSerializable

View File

@@ -1,21 +1,24 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import java.nio.file.Paths import java.nio.file.Paths
import akka.actor.Scheduler import akka.actor.Scheduler
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorSystem, DispatcherSelector, SupervisorStrategy} import akka.actor.typed.{ ActorSystem, DispatcherSelector, SupervisorStrategy }
import akka.http.scaladsl.Http import akka.http.scaladsl.Http
import akka.util.Timeout import akka.util.Timeout
import akka.{Done, actor} import akka.{ actor, Done }
import com.fasterxml.jackson.annotation.JsonInclude
import de.heikoseeberger.akkahttpjackson.JacksonSupport
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future} import scala.concurrent.{ Await, ExecutionContextExecutor, Future }
import scala.io.StdIn import scala.io.StdIn
object Main { object Main {
JacksonSupport.defaultObjectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val botId = System.getProperty("botId", "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4") val botId = System.getProperty("botId", "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4")
val localPort = 8443 val localPort = 8443

View File

@@ -1,7 +1,7 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import java.io.InputStream import java.io.InputStream
import java.security.{KeyStore, SecureRandom} import java.security.{ KeyStore, SecureRandom }
import java.util.UUID import java.util.UUID
import akka.Done import akka.Done
@@ -11,18 +11,18 @@ import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed._ import akka.actor.typed._
import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives.{as, complete, entity, extractLog, onComplete, path, post} import akka.http.scaladsl.server.Directives.{ as, complete, entity, extractLog, onComplete, path, post }
import akka.http.scaladsl.server.Route import akka.http.scaladsl.server.Route
import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext} import akka.http.scaladsl.{ ConnectionContext, Http, HttpExt, HttpsConnectionContext }
import akka.util.{ByteString, Timeout} import akka.util.{ ByteString, Timeout }
import eu.xeppaka.telegram.bot.TelegramEntities._ import eu.xeppaka.telegram.bot.TelegramEntities._
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} import javax.net.ssl.{ KeyManagerFactory, SSLContext, TrustManagerFactory }
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.io.Source import scala.io.Source
import scala.util.{Failure, Success} import scala.util.{ Failure, Success }
object TelegramBot { object TelegramBot {
sealed trait Command sealed trait Command
@@ -44,27 +44,31 @@ object TelegramBot {
val http: HttpExt = Http() val http: HttpExt = Http()
val hookId = UUID.randomUUID().toString val hookId = UUID.randomUUID().toString
val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId") val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId")
val httpsContext = if (useHttpsServer) Some(createHttpsConnectionContext) else None
val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart)) val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart))
val routes = botRoutes(hookId, dialogManager)(ctx.system.scheduler) val routes = botRoutes(hookId, dialogManager)(ctx.system.scheduler)
def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx => def bindServer: Behavior[Command] = Behaviors.setup[Command] { ctx =>
case class BindingSuccess(binding: Http.ServerBinding) extends Command case class BindingSuccess(binding: Http.ServerBinding) extends Command
case class BindingFailure(exception: Throwable) extends Command case class BindingFailure(exception: Throwable) extends Command
ctx.log.info("action=bind_server interface={} port={}", interface, localPort) ctx.log.info("action=bind_server interface={} port={}", interface, localPort)
http val serverBuilder = http.newServerAt(interface, localPort)
.bindAndHandle(routes, interface, localPort, httpsContext.getOrElse(http.defaultServerHttpContext)) val bindFuture = if (useHttpsServer) {
.onComplete { serverBuilder.enableHttps(createHttpsConnectionContext).bindFlow(routes)
case Success(binding) => ctx.self ! BindingSuccess(binding) } else {
case Failure(exception) => ctx.self ! BindingFailure(exception) serverBuilder.bindFlow(routes)
} }
bindFuture.onComplete {
case Success(binding) => ctx.self ! BindingSuccess(binding)
case Failure(exception) => ctx.self ! BindingFailure(exception)
}
Behaviors.receiveMessage[Command] { Behaviors.receiveMessage[Command] {
case BindingSuccess(binding) => case BindingSuccess(binding) =>
ctx.log.info("action=bind_server result=success") ctx.log.info("action=bind_server result=success")
settingWebhook(binding) setWebhook(binding)
case BindingFailure(exception) => case BindingFailure(exception) =>
ctx.log.error("action=bind_server result=failure", exception) ctx.log.error("action=bind_server result=failure", exception)
ctx.log.error("action=start_bot result=failure") ctx.log.error("action=start_bot result=failure")
@@ -75,18 +79,16 @@ object TelegramBot {
} }
} }
def unbindingServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx => def unbindServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object UnbindingSuccess extends Command case object UnbindingSuccess extends Command
case class UnbindingFailure(exception: Throwable) extends Command case class UnbindingFailure(exception: Throwable) extends Command
ctx.log.info("action=unbind_server interface={} port={}", interface, localPort) ctx.log.info("action=unbind_server interface={} port={}", interface, localPort)
binding binding.unbind().onComplete {
.unbind() case Success(Done) => ctx.self ! UnbindingSuccess
.onComplete { case Failure(exception) => ctx.self ! UnbindingFailure(exception)
case Success(Done) => ctx.self ! UnbindingSuccess }
case Failure(exception) => ctx.self ! UnbindingFailure(exception)
}
Behaviors.receiveMessage[Command] { Behaviors.receiveMessage[Command] {
case UnbindingSuccess => case UnbindingSuccess =>
@@ -101,7 +103,7 @@ object TelegramBot {
} }
} }
def settingWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx => def setWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object SetWebhookSuccess extends Command case object SetWebhookSuccess extends Command
case class SetWebhookFailure(exception: Throwable) extends Command case class SetWebhookFailure(exception: Throwable) extends Command
@@ -124,18 +126,15 @@ object TelegramBot {
val formParts = immutable.Seq(urlPart, certificatePart).flatten val formParts = immutable.Seq(urlPart, certificatePart).flatten
val formData = Multipart.FormData.Strict(formParts) val formData = Multipart.FormData.Strict(formParts)
Marshal(formData) Marshal(formData).to[RequestEntity].flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))).onComplete {
.to[RequestEntity] case Success(response) =>
.flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))) if (response.status.isSuccess())
.onComplete { ctx.self ! SetWebhookSuccess
case Success(response) => else
if (response.status.isSuccess()) ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}."))
ctx.self ! SetWebhookSuccess case Failure(exception) =>
else ctx.self ! SetWebhookFailure(exception)
ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}.")) }
case Failure(exception) =>
ctx.self ! SetWebhookFailure(exception)
}
Behaviors.receiveMessage { Behaviors.receiveMessage {
case SetWebhookSuccess => case SetWebhookSuccess =>
@@ -145,9 +144,9 @@ object TelegramBot {
if (attempt > 20) { if (attempt > 20) {
ctx.log.error(s"action=set_webhook result=failure attempt=$attempt", exception) ctx.log.error(s"action=set_webhook result=failure attempt=$attempt", exception)
ctx.log.error("action=start_bot result=failure") ctx.log.error("action=start_bot result=failure")
unbindingServer(binding, None) unbindServer(binding, None)
} else { } else {
settingWebhook(binding, attempt = attempt + 1) setWebhook(binding, attempt = attempt + 1)
} }
case otherCommand: Command => case otherCommand: Command =>
stashBuffer.stash(otherCommand) stashBuffer.stash(otherCommand)
@@ -163,25 +162,23 @@ object TelegramBot {
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
http http.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)).onComplete {
.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)) case Success(response) =>
.onComplete { if (response.status.isSuccess())
case Success(response) => ctx.self ! DeleteWebhookSuccess
if (response.status.isSuccess()) else
ctx.self ! DeleteWebhookSuccess ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}"))
else case Failure(exception) =>
ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}")) ctx.self ! DeleteWebhookFailure(exception)
case Failure(exception) => }
ctx.self ! DeleteWebhookFailure(exception)
}
Behaviors.receiveMessage { Behaviors.receiveMessage {
case DeleteWebhookSuccess => case DeleteWebhookSuccess =>
ctx.log.info("action=delete_webhook result=success") ctx.log.info("action=delete_webhook result=success")
unbindingServer(binding, Some(replyTo)) unbindServer(binding, Some(replyTo))
case DeleteWebhookFailure(exception) => case DeleteWebhookFailure(exception) =>
ctx.log.error("action=delete_webhook result=failure", exception) ctx.log.error("action=delete_webhook result=failure", exception)
unbindingServer(binding, Some(replyTo)) unbindServer(binding, Some(replyTo))
case _ => Behaviors.unhandled case _ => Behaviors.unhandled
} }
} }
@@ -198,14 +195,13 @@ object TelegramBot {
} }
} }
bindingServer bindServer
} }
} }
private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = { private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._
implicit val timeout: Timeout = 30.seconds implicit val timeout: Timeout = 30.seconds
@@ -213,13 +209,16 @@ object TelegramBot {
post { post {
extractLog { log => extractLog { log =>
entity(as[Update]) { update => entity(as[Update]) { update =>
// log.info("update={}", update)
// complete(StatusCodes.OK)
onComplete(updatesProcessor.ask[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) { onComplete(updatesProcessor.ask[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) {
case Success(processResult) => processResult match { case Success(processResult) =>
case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK)) processResult match {
case DialogManager.ProcessUpdateFailure(exception) => case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK))
log.error(exception, "action=process_update result=failure message={}", update) case DialogManager.ProcessUpdateFailure(exception) =>
complete(HttpResponse(status = StatusCodes.InternalServerError)) log.error(exception, "action=process_update result=failure message={}", update)
} complete(HttpResponse(status = StatusCodes.InternalServerError))
}
case Failure(exception) => case Failure(exception) =>
log.error(exception, "action=process_update result=failure message={}", update) log.error(exception, "action=process_update result=failure message={}", update)
complete(HttpResponse(status = StatusCodes.InternalServerError)) complete(HttpResponse(status = StatusCodes.InternalServerError))