Compare commits

..

1 Commits

Author SHA1 Message Date
Pavel Kachalouski
d540c171b1 Fixing docker publishing
Some checks failed
continuous-integration/drone/push Build is failing
2019-05-18 23:42:02 +02:00
18 changed files with 934 additions and 683 deletions

View File

@@ -14,10 +14,8 @@ steps:
- name: docker:publish - name: docker:publish
image: plugins/docker image: plugins/docker
settings: settings:
registry: registry.xeppaka.eu registry: registry.xeppaka.eu:443
repo: registry.xeppaka.eu/telegram-bot-delivery tags: 1.0.0
tags:
- '1.0.0'
username: username:
from_secret: docker_username from_secret: docker_username
password: password:

1
.sbtopts Normal file
View File

@@ -0,0 +1 @@
-mem 2048

View File

@@ -1,7 +1,11 @@
import Dependencies._ import Dependencies._
import Versions._
lazy val commonSettings = Seq(organization := "eu.xeppaka", scalaVersion := "2.13.4", mainClass := Some("eu.xeppaka.bot.Main")) lazy val commonSettings = Seq(
organization := "com.example",
scalaVersion := "2.12.8",
version := "1.0.0",
mainClass := Some("eu.xeppaka.bot.Main")
)
inThisBuild(commonSettings) inThisBuild(commonSettings)
@@ -9,28 +13,24 @@ lazy val `telegram-bot-delivery` = (project in file("."))
.settings( .settings(
name := "telegram-bot-delivery", name := "telegram-bot-delivery",
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
scalaTest % Test,
akka,
akkaTyped, akkaTyped,
akkaSerializationJackson,
akkaClusterShardingTyped,
akkaHttp, akkaHttp,
akkaHttpJackson,
akkaStream, akkaStream,
akkaPersistence, akkaPersistence,
akkaPersistenceCassandra, levelDbJni,
akkaPersistenceQuery, circleCore,
akkaTestkitTyped % Test, circleGeneric,
scalaTest % Test, circleParser,
slibTelegram, circeAkkaHttp
logback
), ),
dependencyOverrides ++= Seq("com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion), dockerBaseImage := "openjdk:13-jdk-oracle",
dockerBaseImage := "openjdk:11",
dockerExposedPorts := Seq(8443), dockerExposedPorts := Seq(8443),
dockerRepository := Some("registry.xeppaka.eu:443"), dockerRepository := Some("registry.xeppaka.eu:443"),
Docker / daemonUserUid := Some("1001"), Docker / daemonUserUid := Some("1001"),
Docker / daemonUser := "telegram-bot", Docker / daemonUser := "telegram-bot",
Docker / defaultLinuxInstallLocation := "/opt/telegram-bot-delivery", Docker / defaultLinuxInstallLocation := "/opt/telegram-bot-delivery",
version := "1.1.3"
) )
.enablePlugins(JavaServerAppPackaging) .enablePlugins(JavaServerAppPackaging)
.enablePlugins(DockerPlugin) .enablePlugins(DockerPlugin)

View File

@@ -1,32 +0,0 @@
{
"chat_id": 77544923,
"text": "*List of your watched parcels:*\n(empty)",
"parse_mode": "Markdown",
"disable_web_page_preview": null,
"disable_notification": null,
"reply_to_message_id": null,
"reply_markup": {
"keyboard": [
[
{
"text": "/add",
"request_contact": null,
"request_location": null
},
{
"text": "/list",
"request_contact": null,
"request_location": null
},
{
"text": "/remove",
"request_contact": null,
"request_location": null
}
]
],
"resize_keyboard": true,
"one_time_keyboard": true,
"selective": null
}
}

View File

@@ -4,26 +4,23 @@ import Dependencies.Versions._
object Dependencies { object Dependencies {
object Versions { object Versions {
val akkaVersion = "2.6.10" val akkaVersion = "2.5.22"
val akkaHttpVersion = "10.2.2" val akkaHttpVersion = "10.1.8"
val akkaHttpJacksonVersion = "1.35.2" val levelDbJniVersion = "1.8"
val akkaPersistenceCassandraVersion = "1.0.4" val circeVersion = "0.11.1"
val scalaTestVersion = "3.2.2" val akkaHttpCirceVersion = "1.23.0"
val slibTelegramVersion = "0.1.0" val scalaTestVersion = "3.0.5"
val logbackVersion = "1.2.3"
} }
val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion lazy val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion lazy val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion
val akkaSerializationJackson = "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion lazy val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion lazy val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
val akkaHttpJackson = "de.heikoseeberger" %% "akka-http-jackson" % akkaHttpJacksonVersion lazy val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion
val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion lazy val levelDbJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % levelDbJniVersion
val akkaClusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion lazy val circleCore = "io.circe" %% "circe-core" % circeVersion
val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandraVersion lazy val circleGeneric = "io.circe" %% "circe-generic" % circeVersion
val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion lazy val circleParser = "io.circe" %% "circe-parser" % circeVersion
val akkaTestkitTyped = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion lazy val circeAkkaHttp = "de.heikoseeberger" %% "akka-http-circe" % akkaHttpCirceVersion
val slibTelegram = "eu.xeppaka" %% "slib-telegram" % slibTelegramVersion lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
} }

View File

@@ -1 +1 @@
sbt.version=1.4.5 sbt.version=1.2.8

View File

@@ -1,2 +1,2 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.8.0") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13") addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.21")

View File

@@ -0,0 +1,18 @@
akka {
loglevel = "INFO"
extensions = [akka.persistence.Persistence]
persistence {
journal {
plugin = "akka.persistence.journal.leveldb"
auto-start-journals = ["akka.persistence.journal.leveldb"]
leveldb.dir = "journal-check-delivery"
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.local"
auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"]
}
}
}

View File

@@ -1,65 +0,0 @@
telegram-bot-delivery {
cassandra.keyspace = "telegram_bot_delivery"
cassandra.autocreate-offset-store = true
}
datastax-java-driver {
advanced {
reconnect-on-init = true
metadata.schema.enabled = false
metadata.token-map.enabled = false
}
basic {
contact-points = ["127.0.0.1:9042"]
load-balancing-policy.local-datacenter = "datacenter1"
}
}
akka {
loglevel = "INFO"
extensions = [akka.persistence.Persistence]
actor {
serialization-bindings {
"eu.xeppaka.bot.JsonSerializable" = jackson-json
}
}
persistence {
journal {
plugin = "akka.persistence.cassandra.journal"
auto-start-journals = ["akka.persistence.cassandra.journal"]
}
snapshot-store {
plugin = "akka.persistence.cassandra.snapshot"
auto-start-snapshot-stores = ["akka.persistence.cassandra.snapshot"]
}
cassandra {
journal {
keyspace-autocreate = true
tables-autocreate = true
keyspace = ${telegram-bot-delivery.cassandra.keyspace}
max-message-batch-size = 30
support-all-persistence-ids = off
}
snapshot {
keyspace-autocreate = true
tables-autocreate = true
keyspace = ${telegram-bot-delivery.cassandra.keyspace}
}
events-by-tag {
max-message-batch-size = 30
eventual-consistency-delay = 2s
pubsub-notification = on
first-time-bucket = "20200101T00:00"
}
}
}
projection.cassandra {
offset-store {
keyspace = ${telegram-bot-delivery.cassandra.keyspace}
}
}
}

View File

@@ -1,23 +1,22 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.http.scaladsl.marshalling.Marshal import akka.actor.typed.scaladsl.{Behaviors, StashBuffer}
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy } import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector, SupervisorStrategy}
import akka.http.scaladsl.Http import akka.http.scaladsl.Http
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.stream.scaladsl.{ Sink, Source } import akka.stream.ActorMaterializer
import akka.util.{ ByteString, Timeout } import akka.stream.scaladsl.{Sink, Source}
import eu.xeppaka.telegram.bot.TelegramEntities._ import akka.util.{ByteString, Timeout}
import eu.xeppaka.bot.TelegramEntities._
import eu.xeppaka.bot.TelegramEntitiesDerivations._
import io.circe.Printer
import scala.concurrent.{ Await, ExecutionContext } import scala.concurrent.ExecutionContext
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.{ Failure, Success } import scala.util.{Failure, Success}
object CheckDeliveryDialog { object CheckDeliveryDialog {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
sealed trait Command sealed trait Command
sealed trait CommandResult sealed trait CommandResult
sealed trait DialogCommand extends Command sealed trait DialogCommand extends Command
@@ -37,10 +36,13 @@ object CheckDeliveryDialog {
case "/remove" => RemoveParcel case "/remove" => RemoveParcel
case "/list" => ListParcels case "/list" => ListParcels
case "/help" => Help case "/help" => Help
case "/start" => Help
case _ => Help case _ => Help
} }
} }
// json printer
private val printer = Printer.noSpaces.copy(dropNullValues = true)
// internal messages // internal messages
private case class DeliveryStateChanged(state: String) extends Command private case class DeliveryStateChanged(state: String) extends Command
private val helpMessage = private val helpMessage =
@@ -50,19 +52,22 @@ object CheckDeliveryDialog {
|/list - list watched parcels |/list - list watched parcels
|/remove - remove parcel from a watching list |/remove - remove parcel from a watching list
""".stripMargin """.stripMargin
private val commandsKeyboard = Some( private val commandsKeyboard = Some(ReplyKeyboardMarkup(
ReplyKeyboardMarkup(Seq(Seq(KeyboardButton("/add"), KeyboardButton("/list"), KeyboardButton("/remove"))), resize_keyboard = Some(true), one_time_keyboard = Some(true)) Seq(Seq(KeyboardButton("/add"), KeyboardButton("/list"), KeyboardButton("/remove"))),
) resize_keyboard = Some(true),
one_time_keyboard = Some(true)
private val removeKeyboard = Some(ReplyKeyboardRemove()) ))
def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
implicit val system: ActorSystem = ctx.system.toClassic implicit val materializer: ActorMaterializer = ActorMaterializer()(ctx.system.toUntyped)
implicit val executionContext: ExecutionContext = ctx.executionContext implicit val executionContext: ExecutionContext = ctx.system.dispatchers.lookup(DispatcherSelector.default())
val http = Http() val http = Http()(ctx.system.toUntyped)
val stashBuffer = StashBuffer[Command](100)
val deliveryStateAdapter: ActorRef[CzechPostDeliveryCheck.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state)) val deliveryStateAdapter: ActorRef[CzechPostDeliveryCheck.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state))
val czechPostDeliveryCheck = ctx.spawnAnonymous(Behaviors.supervise(CzechPostDeliveryCheck.behavior(chatId.toString, deliveryStateAdapter)).onFailure(SupervisorStrategy.restart)) val czechPostDeliveryCheck = ctx.spawnAnonymous(Behaviors.supervise(CzechPostDeliveryCheck.behavior(chatId.toString, deliveryStateAdapter)).onFailure(SupervisorStrategy.restart))
def initial: Behavior[Command] = sendMessage(SendMessage(chatId, "Waiting for a command...", reply_markup = commandsKeyboard), waitCommand, initial)
def waitCommand: Behavior[Command] = Behaviors.receiveMessage { def waitCommand: Behavior[Command] = Behaviors.receiveMessage {
case ProcessMessage(msg, replyTo) => case ProcessMessage(msg, replyTo) =>
val command = msg.text.map(text => DialogCommand.parse(text)) val command = msg.text.map(text => DialogCommand.parse(text))
@@ -73,33 +78,31 @@ object CheckDeliveryDialog {
Behaviors.same Behaviors.same
} else { } else {
val message = SendMessage(chatId, "This command is unsupported.") val message = SendMessage(chatId, "This command is unsupported.")
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
} }
case AddParcel => case AddParcel =>
val parcelIdMessage = SendMessage(chatId, "Please enter a parcel ID.", reply_markup = removeKeyboard) val message = SendMessage(chatId, "Please enter a parcel ID.")
val commentMessage = SendMessage(chatId, "Please enter a comment.", reply_markup = removeKeyboard) sendMessage(message, waitParcelId(parcelId => addParcel(parcelId)), initial)
sendMessage(parcelIdMessage, waitTextMessage(parcelId => sendMessage(commentMessage, waitTextMessage(comment => addParcel(parcelId, comment)), waitCommand)), waitCommand)
case RemoveParcel => case RemoveParcel =>
removeParcel(waitCommand, waitCommand) removeParcel(initial, initial)
case ListParcels => case ListParcels =>
listParcels listParcels
case Help => case Help =>
val message = SendMessage(chatId, helpMessage, reply_markup = commandsKeyboard) val message = SendMessage(chatId, helpMessage)
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
case DeliveryStateChanged(state) => case DeliveryStateChanged(state) =>
val message = SendMessage(chatId, state, Some("Markdown")) val message = SendMessage(chatId, state, Some("Markdown"))
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
case _ => case _ =>
Behaviors.unhandled Behaviors.unhandled
} }
def addParcel(parcelId: String, comment: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => def addParcel(parcelId: String): Behavior[Command] = Behaviors.setup { ctx =>
Behaviors.setup { ctx =>
case object AddParcelSuccess extends Command case object AddParcelSuccess extends Command
case class AddParcelFailure(exception: Throwable) extends Command case class AddParcelFailure(exception: Throwable) extends Command
implicit val timeout: Timeout = 5.seconds implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.AddParcel(parcelId, ref)) {
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception) case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception)
case Failure(exception) => AddParcelFailure(exception) case Failure(exception) => AddParcelFailure(exception)
@@ -107,92 +110,87 @@ object CheckDeliveryDialog {
Behaviors.receiveMessage { Behaviors.receiveMessage {
case AddParcelSuccess => case AddParcelSuccess =>
val message = SendMessage(chatId, s"Parcel $parcelId was added to the watch list.", reply_markup = commandsKeyboard) val message = SendMessage(chatId, s"Parcel $parcelId was added to the watch list.")
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
case AddParcelFailure(exception) => case AddParcelFailure(exception) =>
exception match { exception match {
case CzechPostDeliveryCheck.DuplicateParcelId(_) => case CzechPostDeliveryCheck.DuplicateParcelId(_) =>
val message = SendMessage(chatId, s"Parcel $parcelId is in the watch list already.", reply_markup = commandsKeyboard) val message = SendMessage(chatId, s"Parcel $parcelId is in the watch list already.")
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
case _ => case _ =>
ctx.log.error("action=add_parcel result=failure", exception) ctx.log.error(exception, "action=add_parcel result=failure")
val message = SendMessage(chatId, s"Adding parcel failed. Please try again.", reply_markup = commandsKeyboard) val message = SendMessage(chatId, s"Adding parcel failed. Please try again.")
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
} }
case otherMessage => case otherMessage =>
stashBuffer.stash(otherMessage) stashBuffer.stash(otherMessage)
Behaviors.same Behaviors.same
} }
} }
}
def listParcels: Behavior[Command] = Behaviors.withStash(100) { stashBuffer => def listParcels: Behavior[Command] = Behaviors.setup { ctx =>
Behaviors.setup { ctx => case class ListParcelsSuccess(parcelsList: Set[String]) extends Command
case class ListParcelsSuccess(parcelsList: Seq[String]) extends Command
case class ListParcelsFailure(exception: Throwable) extends Command case class ListParcelsFailure(exception: Throwable) extends Command
implicit val timeout: Timeout = 5.seconds implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcels(ref)) { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.ListParcels(ref)) {
case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList) case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList)
case Failure(exception) => ListParcelsFailure(exception) case Failure(exception) => ListParcelsFailure(exception)
} }
Behaviors.receiveMessage { Behaviors.receiveMessage {
case ListParcelsSuccess(parcelsList) => case ListParcelsSuccess(parcelsList) =>
val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.sorted.mkString("\n") else "(empty)") val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.toSeq.sorted.mkString("\n") else "(empty)")
val message = SendMessage(chatId, messageText, Some("Markdown"), reply_markup = commandsKeyboard) val message = SendMessage(chatId, messageText, Some("Markdown"))
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
case ListParcelsFailure(exception) => case ListParcelsFailure(exception) =>
ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception) ctx.log.error(exception, "action=list_parcels result=failure chat_id={}", chatId)
val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard) val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.")
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
case otherMessage => case otherMessage =>
stashBuffer.stash(otherMessage) stashBuffer.stash(otherMessage)
Behaviors.same Behaviors.same
} }
} }
}
def removeParcel(onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => def removeParcel(onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] =
Behaviors.setup { ctx => Behaviors.setup { ctx =>
case class ListParcelIdsSuccess(parcelsList: Seq[String]) extends Command case class ListParcelsSuccess(parcelsList: Set[String]) extends Command
case class ListParcelIdsFailure(exception: Throwable) extends Command case class ListParcelsFailure(exception: Throwable) extends Command
implicit val timeout: Timeout = 5.seconds implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcelIds(ref)) { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.ListParcels(ref)) {
case Success(CzechPostDeliveryCheck.ListParcelIdsResult(parcelsList)) => ListParcelIdsSuccess(parcelsList) case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList)
case Failure(exception) => ListParcelIdsFailure(exception) case Failure(exception) => ListParcelsFailure(exception)
} }
Behaviors.receiveMessage { Behaviors.receiveMessage {
case ListParcelIdsSuccess(parcelsList) => case ListParcelsSuccess(parcelsList) =>
if (parcelsList.nonEmpty) { if (parcelsList.nonEmpty) {
val keyboardButtons = parcelsList.sorted.grouped(3).map(_.map(id => KeyboardButton(id))).toSeq val keyboardButtons = parcelsList.toSeq.sorted.grouped(3).map(_.map(id => KeyboardButton(id))).toSeq
val markup = ReplyKeyboardMarkup(keyboard = keyboardButtons, resize_keyboard = Some(true), one_time_keyboard = Some(true)) val markup = ReplyKeyboardMarkup(keyboard = keyboardButtons, resize_keyboard = Some(true), one_time_keyboard = Some(true))
val message = SendMessage(chatId, "Please enter a parcel id to remove.", reply_markup = Some(markup)) val message = SendMessage(chatId, "Please enter a parcel id to remove.", reply_markup = Some(markup))
sendMessage(message, waitTextMessage(parcelId => removeParcelId(parcelId)), onFailure) sendMessage(message, waitParcelId(parcelId => removeParcelId(parcelId)), onFailure)
} else { } else {
val message = SendMessage(chatId, "You don't have watched parcels. There is nothing to remove.", reply_markup = commandsKeyboard) val message = SendMessage(chatId, "You don't have watched parcels. There is nothing to remove.")
sendMessage(message, onSuccess, onFailure) sendMessage(message, onSuccess, onFailure)
} }
case ListParcelIdsFailure(exception) => case ListParcelsFailure(exception) =>
ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception) ctx.log.error(exception, "action=list_parcels result=failure chat_id={}", chatId)
val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard) val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.")
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
case otherMessage => case otherMessage =>
stashBuffer.stash(otherMessage) stashBuffer.stash(otherMessage)
Behaviors.same Behaviors.same
} }
} }
}
def removeParcelId(parcelId: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => def removeParcelId(parcelId: String): Behavior[Command] = Behaviors.setup { ctx =>
Behaviors.setup { ctx =>
case object RemoveParcelSuccess extends Command case object RemoveParcelSuccess extends Command
case class RemoveParcelFailure(exception: Throwable) extends Command case class RemoveParcelFailure(exception: Throwable) extends Command
implicit val timeout: Timeout = 5.seconds implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) { ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) {
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception) case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception)
case Failure(exception) => RemoveParcelFailure(exception) case Failure(exception) => RemoveParcelFailure(exception)
@@ -200,24 +198,23 @@ object CheckDeliveryDialog {
Behaviors.receiveMessage { Behaviors.receiveMessage {
case RemoveParcelSuccess => case RemoveParcelSuccess =>
val message = SendMessage(chatId, s"Parcel $parcelId was removed from the watch list.", reply_markup = commandsKeyboard) val message = SendMessage(chatId, s"Parcel $parcelId was removed from the watch list.")
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
case RemoveParcelFailure(exception) => case RemoveParcelFailure(exception) =>
exception match { exception match {
case CzechPostDeliveryCheck.ParcelIdNotFound(_) => case CzechPostDeliveryCheck.ParcelIdNotFound(_) =>
val message = SendMessage(chatId, s"Parcel $parcelId is not found in the list of the watched parcels.", reply_markup = commandsKeyboard) val message = SendMessage(chatId, s"Parcel $parcelId is not found in the list of the watched parcels.")
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
case _ => case _ =>
ctx.log.error("action=add_parcel result=failure", exception) ctx.log.error(exception, "action=add_parcel result=failure")
val message = SendMessage(chatId, s"Remove of the parcel failed. Please try again.", reply_markup = commandsKeyboard) val message = SendMessage(chatId, s"Remove of the parcel failed. Please try again.")
sendMessage(message, waitCommand, waitCommand) sendMessage(message, initial, initial)
} }
case otherMessage => case otherMessage =>
stashBuffer.stash(otherMessage) stashBuffer.stash(otherMessage)
Behaviors.same Behaviors.same
} }
} }
}
// def selectPostType(onFinish: PostType => Behavior[Command]): Behavior[Command] = Behaviors.receiveMessage { // def selectPostType(onFinish: PostType => Behavior[Command]): Behavior[Command] = Behaviors.receiveMessage {
// //
@@ -226,11 +223,10 @@ object CheckDeliveryDialog {
// val button2 = KeyboardButton("button2") // val button2 = KeyboardButton("button2")
// val keyboard = ReplyKeyboardMarkup(Seq(Seq(button1, button2))) // val keyboard = ReplyKeyboardMarkup(Seq(Seq(button1, button2)))
// val message = SendMessage(chatId, "Please enter parcel ID.", reply_markup = Some(keyboard)) // val message = SendMessage(chatId, "Please enter parcel ID.", reply_markup = Some(keyboard))
// sendMessage(message, waitParcelId(parcelId => addParcel(parcelId)), waitCommand) // sendMessage(message, waitParcelId(parcelId => addParcel(parcelId)), initial)
// } // }
def waitTextMessage(onFinish: String => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => def waitParcelId(onFinish: String => Behavior[Command]): Behavior[Command] = Behaviors.receiveMessage {
Behaviors.receiveMessage {
case ProcessMessage(msg, replyTo) => case ProcessMessage(msg, replyTo) =>
if (msg.text.isDefined) { if (msg.text.isDefined) {
val parcelId = msg.text.get val parcelId = msg.text.get
@@ -238,31 +234,32 @@ object CheckDeliveryDialog {
onFinish(parcelId) onFinish(parcelId)
} else { } else {
replyTo ! ProcessMessageSuccess replyTo ! ProcessMessageSuccess
waitTextMessage(onFinish) waitParcelId(onFinish)
} }
case otherMsg => case otherMsg =>
stashBuffer.stash(otherMsg) stashBuffer.stash(otherMsg)
Behaviors.same Behaviors.same
} }
}
def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 0): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.setup[Command] { ctx => import io.circe.generic.auto._
import io.circe.syntax._
case object SendMessageSuccess extends Command case object SendMessageSuccess extends Command
case class SendMessageFailure(exception: Throwable) extends Command case class SendMessageFailure(exception: Throwable) extends Command
ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, message) val json = printer.pretty(message.asJson)
val request = HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = HttpEntity.Strict(ContentTypes.`application/json`, ByteString(json)))
println(message) ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, json)
println(Await.result(Marshal(message).to[HttpEntity], 2.seconds).asInstanceOf[HttpEntity.Strict].data.utf8String)
Source Source
.future(Marshal(message).to[RequestEntity]) .single(request)
.initialDelay(2.seconds * (attempt - 1)) .initialDelay(2.seconds * attempt)
.map(requestEntity => HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = requestEntity))
.mapAsync(1) { request => .mapAsync(1) { request =>
http.singleRequest(request).transform { http
.singleRequest(request)
.transform {
case Success(response) => case Success(response) =>
if (response.status.isSuccess()) { if (response.status.isSuccess()) {
Success(SendMessageSuccess) Success(SendMessageSuccess)
@@ -270,7 +267,7 @@ object CheckDeliveryDialog {
Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}."))) Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}.")))
} }
case Failure(exception) => case Failure(exception) =>
ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId", exception) ctx.log.error(exception, "action=send_message status=finished result=failure chat_id={}", chatId)
Success(SendMessageFailure(exception)) Success(SendMessageFailure(exception))
} }
} }
@@ -280,13 +277,13 @@ object CheckDeliveryDialog {
Behaviors.receiveMessage { Behaviors.receiveMessage {
case SendMessageSuccess => case SendMessageSuccess =>
ctx.log.debug("action=send_message status=finished result=success chat_id={}", chatId) ctx.log.debug("action=send_message status=finished result=success chat_id={}", chatId)
stashBuffer.unstashAll(onSuccess) stashBuffer.unstashAll(ctx, onSuccess)
case SendMessageFailure(exception) => case SendMessageFailure(exception) =>
ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId attempt=$attempt", exception) ctx.log.error(exception, "action=send_message status=finished result=failure chat_id={} attempt={}", chatId, attempt)
if (attempt > 5) { if (attempt >= 5) {
ctx.log.error("action=send_message result=failure message=attempts threshold exceeded", exception) ctx.log.error(exception, "action=send_message result=failure message=attempts threshold exceeded")
stashBuffer.unstashAll(onFailure) stashBuffer.unstashAll(ctx, onFailure)
} else { } else {
sendMessage(message, onSuccess, onFailure, attempt + 1) sendMessage(message, onSuccess, onFailure, attempt + 1)
} }
@@ -295,8 +292,7 @@ object CheckDeliveryDialog {
Behaviors.same Behaviors.same
} }
} }
}
waitCommand initial
} }
} }

View File

@@ -1,31 +1,39 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ ActorRef, Behavior, DispatcherSelector }
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{ `User-Agent`, Accept }
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings }
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{ ConnectionContext, Http }
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.{ CommandHandler, EventHandler }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import com.fasterxml.jackson.annotation.{ JsonSubTypes, JsonTypeInfo }
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import javax.net.ssl.{ KeyManager, SSLContext, X509TrustManager }
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.http.scaladsl.UseHttp2.Negotiated
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Accept, `User-Agent`}
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{Http, HttpsConnectionContext}
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler}
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
import akka.stream.ActorMaterializer
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._
import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager}
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.{ Failure, Success } import scala.util.{Failure, Success}
object Entities { object Entities {
case class Attributes(parcelType: String, weight: Double, currency: String) case class Attributes(
parcelType: String,
weight: Double,
currency: String,
)
case class State( case class State(
id: String, id: String,
@@ -46,53 +54,31 @@ object Entities {
} }
object CzechPostDeliveryCheck { object CzechPostDeliveryCheck {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
private val czechPostDateFormat = new SimpleDateFormat("yyyy-MM-dd") private val czechPostDateFormat = new SimpleDateFormat("yyyy-MM-dd")
private val printDateFormat = new SimpleDateFormat("dd-MM-yyyy") private val printDateFormat = new SimpleDateFormat("dd-MM-yyyy")
private val entityType = "czechpost"
sealed trait Command extends JsonSerializable sealed trait Command
sealed trait CommandResult extends JsonSerializable sealed trait CommandResult
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") sealed trait Event
@JsonSubTypes( case class ParcelState(attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) {
Array( def prettyPrint(parcelId: String): String = {
new JsonSubTypes.Type(value = classOf[ParcelAttributesChanged], name = "parcel_attributes_changed"), val statesString = states
new JsonSubTypes.Type(value = classOf[ParcelAdded], name = "parcel_added"), .toSeq
new JsonSubTypes.Type(value = classOf[ParcelRemoved], name = "parcel_removed"),
new JsonSubTypes.Type(value = classOf[ParcelHistoryStateAdded], name = "parcel_history_state_added")
)
)
sealed trait Event extends JsonSerializable
case class Parcel(comment: String, attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) {
def fullStatePrint(parcelId: String): String = {
val statesString = states.toSeq
.sortBy(state => czechPostDateFormat.parse(state.date)) .sortBy(state => czechPostDateFormat.parse(state.date))
.map(state => s"${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}\n===========================\n") .map(state => s"${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}\n===========================\n")
.mkString .mkString
s"""|*New state(s) of the parcel $parcelId ($comment):* s"""|*New state(s) of the parcel $parcelId:*
|=========================== |===========================
|$statesString""".stripMargin |$statesString""".stripMargin
} }
def latestStatePrint(parcelId: String): String = {
latestState.map(state => s"$parcelId ($comment) - ${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}").getOrElse(s"$parcelId ($comment) - NO INFO")
} }
case class State(parcelStates: Map[String, ParcelState] = Map.empty)
private def latestState: Option[Entities.State] = states.toSeq.maxByOption(state => czechPostDateFormat.parse(state.date)) case class AddParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command
}
case class State(parcelStates: Map[String, Parcel] = Map.empty) extends JsonSerializable {
def latestStatesPrint: Seq[String] = parcelStates.map { case (id, parcel) => parcel.latestStatePrint(id) }.to(Seq)
}
case class AddParcel(parcelId: String, comment: String, replyTo: ActorRef[CommandResult]) extends Command
case class RemoveParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command case class RemoveParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command
case class ListParcels(replyTo: ActorRef[ListParcelsResult]) extends Command case class ListParcels(replyTo: ActorRef[ListParcelsResult]) extends Command
case class ListParcelsResult(parcelsList: Seq[String]) case class ListParcelsResult(parcelsList: Set[String])
case class ListParcelIds(replyTo: ActorRef[ListParcelIdsResult]) extends Command
case class ListParcelIdsResult(parcelIds: Seq[String])
case object CommandResultSuccess extends CommandResult case object CommandResultSuccess extends CommandResult
case class CommandResultFailure(exception: Throwable) extends CommandResult case class CommandResultFailure(exception: Throwable) extends CommandResult
@@ -105,7 +91,7 @@ object CzechPostDeliveryCheck {
private case class ParcelHistoryRetrieved(parcelHistory: Entities.ParcelHistory) extends Command private case class ParcelHistoryRetrieved(parcelHistory: Entities.ParcelHistory) extends Command
case class DeliveryStateChanged(state: String) case class DeliveryStateChanged(state: String)
case class ParcelAdded(parcelId: String, comment: String) extends Event case class ParcelAdded(parcelId: String) extends Event
case class ParcelRemoved(parcelId: String) extends Event case class ParcelRemoved(parcelId: String) extends Event
case class ParcelHistoryStateAdded(parcelId: String, state: Entities.State) extends Event case class ParcelHistoryStateAdded(parcelId: String, state: Entities.State) extends Event
case class ParcelAttributesChanged(parcelId: String, attributes: Entities.Attributes) extends Event case class ParcelAttributesChanged(parcelId: String, attributes: Entities.Attributes) extends Event
@@ -122,32 +108,44 @@ object CzechPostDeliveryCheck {
context context
} }
def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = checkParcel(chatId, stateReporter) def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.withTimers(scheduler => checkParcel(chatId, stateReporter, scheduler))
}
private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.withTimers { scheduler => private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged], scheduler: TimerScheduler[Command]): Behavior[Command] = Behaviors.setup { ctx =>
Behaviors.setup { ctx => implicit val actorSystem: ActorSystem = ctx.system.toUntyped
implicit val actorSystem: ActorSystem = ctx.system.toClassic
implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
implicit val materializer: ActorMaterializer = ActorMaterializer()
val http = Http() val http = Http()
val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withAcceptAnyCertificate(true).withDisableHostnameVerification(true))) val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose
.withAcceptAnyCertificate(true)
.withDisableHostnameVerification(true)))
val originalCtx = http.createClientHttpsContext(badSslConfig) val originalCtx = http.createClientHttpsContext(badSslConfig)
val sslContext = ConnectionContext.httpsClient(trustfulSslContext) val sslContext = new HttpsConnectionContext(
trustfulSslContext,
originalCtx.sslConfig,
originalCtx.enabledCipherSuites,
originalCtx.enabledProtocols,
originalCtx.clientAuth,
originalCtx.sslParameters,
Negotiated
)
val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"))) val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0")))
val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings) val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings)
scheduler.startTimerAtFixedRate("check-delivery-state", CheckParcels, 5.minutes) scheduler.startPeriodicTimer("check-delivery-state", CheckParcels, 5.minutes)
val log = ctx.log
val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => { val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
cmd match { cmd match {
case AddParcel(parcelId, comment, replyTo) => case AddParcel(parcelId, replyTo) =>
val parcelIdUpper = parcelId.toUpperCase val parcelIdUpper = parcelId.toUpperCase
if (state.parcelStates.keySet.contains(parcelIdUpper)) { if (state.parcelStates.keySet.contains(parcelIdUpper)) {
Effect.none.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper))) Effect
.none
.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper)))
} else { } else {
Effect Effect
.persist(ParcelAdded(parcelIdUpper, comment)) .persist(ParcelAdded(parcelIdUpper))
.thenRun(_ => { .thenRun(_ => {
replyTo ! CommandResultSuccess replyTo ! CommandResultSuccess
ctx.self ! CheckParcels ctx.self ! CheckParcels
@@ -156,32 +154,34 @@ object CzechPostDeliveryCheck {
case RemoveParcel(parcelId, replyTo) => case RemoveParcel(parcelId, replyTo) =>
val parcelIdUpper = parcelId.toUpperCase val parcelIdUpper = parcelId.toUpperCase
if (state.parcelStates.contains(parcelIdUpper)) { if (state.parcelStates.contains(parcelIdUpper)) {
Effect.persist(ParcelRemoved(parcelIdUpper)).thenRun(_ => replyTo ! CommandResultSuccess) Effect
.persist(ParcelRemoved(parcelIdUpper))
.thenRun(_ => replyTo ! CommandResultSuccess)
} else { } else {
Effect.none.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper))) Effect
.none
.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper)))
} }
case ListParcels(replyTo) => case ListParcels(replyTo) =>
Effect.none.thenRun { state => Effect.none
val parcelsList = state.latestStatesPrint .thenRun { state =>
val parcelsList = state.parcelStates.keySet
replyTo ! ListParcelsResult(parcelsList) replyTo ! ListParcelsResult(parcelsList)
} }
case ListParcelIds(replyTo) =>
Effect.none.thenRun { state =>
replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq)
}
case CheckParcels => case CheckParcels =>
Effect.none.thenRun { _ => Effect
log.info("action=check_parcel_state chat_id={}", chatId) .none
.thenRun { _ =>
ctx.log.info("action=check_parcel_state chat_id={}", chatId)
val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id")) val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id"))
for (ids <- parcelIds) { for (ids <- parcelIds) {
val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz") val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz")
val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`))) val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`)))
log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri) ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri)
http http
.singleRequest(request, connectionContext = sslContext, settings = connectionSettings) .singleRequest(request, connectionContext = sslContext, settings = connectionSettings)
@@ -189,38 +189,37 @@ object CzechPostDeliveryCheck {
case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}.")) case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}."))
case response: Failure[HttpResponse] => response case response: Failure[HttpResponse] => response
} }
.flatMap(response => Unmarshal(response).to[Seq[Entities.ParcelHistory]]) .flatMap(response => Unmarshal(response).to[Array[Entities.ParcelHistory]])
.andThen { .andThen {
case Success(parcelHistories) => case Success(parcelHistories) =>
parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory)) parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory))
case Failure(exception) => case Failure(exception) =>
log.error("Error checking parcel history.", exception) ctx.log.error(exception, "Error checking parcel history.")
} }
.andThen { .andThen {
case Success(_) => log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri) case Success(_) => ctx.log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri)
case Failure(exception) => log.error(s"action=check_parcel_state result=failure chat_id=$chatId check_uri=$checkUri", exception) case Failure(exception) => ctx.log.error(exception, "action=check_parcel_state result=failure chat_id={} check_uri={}", chatId, checkUri)
} }
} }
} }
case ParcelHistoryRetrieved(parcelHistory) => case ParcelHistoryRetrieved(parcelHistory) =>
val parcelId = parcelHistory.id val parcelId = parcelHistory.id
val parcelState = state.parcelStates(parcelId) val parcelState = state.parcelStates(parcelId)
val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty) val attributesChangedEvent = (if (parcelState.attributes.isEmpty)
Some(parcelHistory.attributes) Some(parcelHistory.attributes)
else else
parcelState.attributes.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None)) parcelState.attributes
.map(attributes => ParcelAttributesChanged(parcelId, attributes)) .flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None))
.toSeq .map(attributes => ParcelAttributesChanged(parcelId, attributes)).to[collection.immutable.Seq]
val newStates = parcelHistory.states.state.toSet -- parcelState.states val newStates = parcelHistory.states.state.toSet -- parcelState.states
val stateEvents: Seq[Event] = newStates.map(state => ParcelHistoryStateAdded(parcelId, state)).toSeq val stateEvents: Seq[Event] = newStates.map(state => ParcelHistoryStateAdded(parcelId, state)).to[collection.immutable.Seq]
val comment = state.parcelStates(parcelId).comment
Effect Effect
.persist(attributesChangedEvents ++ stateEvents) .persist(attributesChangedEvent ++ stateEvents)
.thenRun(_ => { .thenRun(_ => {
if (newStates.nonEmpty) { if (newStates.nonEmpty) {
stateReporter ! DeliveryStateChanged(Parcel(comment, None, newStates).fullStatePrint(parcelId)) stateReporter ! DeliveryStateChanged(ParcelState(None, newStates).prettyPrint(parcelId))
} }
}) })
} }
@@ -228,8 +227,7 @@ object CzechPostDeliveryCheck {
val eventHandler: EventHandler[State, Event] = (state, evt) => { val eventHandler: EventHandler[State, Event] = (state, evt) => {
evt match { evt match {
case ParcelAdded(parcelId, comment) => case ParcelAdded(parcelId) => state.copy(parcelStates = state.parcelStates + (parcelId -> ParcelState()))
state.copy(parcelStates = state.parcelStates + (parcelId -> Parcel(comment)))
case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId) case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId)
case ParcelHistoryStateAdded(parcelId, newState) => case ParcelHistoryStateAdded(parcelId, newState) =>
val parcelState = state.parcelStates(parcelId) val parcelState = state.parcelStates(parcelId)
@@ -242,7 +240,11 @@ object CzechPostDeliveryCheck {
} }
} }
EventSourcedBehavior[Command, Event, State](persistenceId = PersistenceId(entityType, chatId), emptyState = State(), commandHandler = commandHandler, eventHandler = eventHandler) EventSourcedBehavior[Command, Event, State](
} persistenceId = PersistenceId(s"$chatId-czechpost"),
emptyState = State(),
commandHandler = commandHandler,
eventHandler = eventHandler
)
} }
} }

View File

@@ -1,10 +1,13 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior} import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler}
import akka.persistence.typed.scaladsl.{Effect, EffectBuilder, EventSourcedBehavior}
import akka.util.Timeout import akka.util.Timeout
import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess} import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess}
import eu.xeppaka.telegram.bot.TelegramEntities._ import eu.xeppaka.bot.TelegramEntities.Update
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
@@ -21,34 +24,77 @@ object DialogManager {
private case class DialogResponseSuccess(dialogId: Long, replyTo: ActorRef[CommandResult]) extends Command private case class DialogResponseSuccess(dialogId: Long, replyTo: ActorRef[CommandResult]) extends Command
private case class DialogResponseFailure(dialogId: Long, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command private case class DialogResponseFailure(dialogId: Long, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command
sealed trait Event
private case class DialogAdded(chatId: Long) extends Event
case class State(dialogs: Map[Long, ActorRef[CheckDeliveryDialog.Command]] = Map.empty)
def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx => def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.receiveMessagePartial { val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
cmd match {
case ProcessUpdate(update, replyTo) => case ProcessUpdate(update, replyTo) =>
if (update.message.isDefined) { if (update.message.isDefined) {
val chatId = update.message.get.chat.id val chatId = update.message.get.chat.id
ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get)
val effect: EffectBuilder[Event, State] = if (state.dialogs.contains(chatId)) {
Effect.none
} else {
Effect.persist(DialogAdded(chatId))
}
effect
.thenRun(_ => ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get))
.thenRun { state =>
val msg = update.message.get val msg = update.message.get
val dialogActor = ctx.child(chatId.toString).getOrElse(ctx.spawn(CheckDeliveryDialog.behavior(chatId, botUri), chatId.toString)).unsafeUpcast[CheckDeliveryDialog.Command] val dialogActor = state.dialogs(chatId)
ctx.log.info("action=ask_dialog id={}", chatId) ctx.log.info("action=ask_dialog id={}", chatId)
implicit val timeout: Timeout = 5.seconds implicit val timeout: Timeout = 20.seconds
ctx.ask[CheckDeliveryDialog.Command, CheckDeliveryDialog.CommandResult](dialogActor, replyTo => CheckDeliveryDialog.ProcessMessage(msg, replyTo)) { ctx.ask(dialogActor)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) {
case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo) case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo)
case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo) case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo)
case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo) case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo)
} }
}
} else { } else {
Effect
.none
.thenRun { _ =>
ctx.log.debug("action=process_update result=success message=update message is empty") ctx.log.debug("action=process_update result=success message=update message is empty")
} }
Behaviors.same }
case DialogResponseSuccess(dialogId, replyTo) => case DialogResponseSuccess(dialogId, replyTo) =>
Effect
.none
.thenRun { _ =>
ctx.log.info("action=ask_dialog id={} result=success", dialogId) ctx.log.info("action=ask_dialog id={} result=success", dialogId)
replyTo ! ProcessUpdateSuccess replyTo ! ProcessUpdateSuccess
Behaviors.same
case DialogResponseFailure(dialogId, exception, replyTo) =>
ctx.log.error(s"action=ask_dialog id=$dialogId result=failure", exception)
replyTo ! ProcessUpdateFailure(exception)
Behaviors.same
} }
case DialogResponseFailure(dialogId, exception, replyTo) =>
Effect
.none
.thenRun { _ =>
ctx.log.error(exception, "action=ask_dialog id={} result=failure", dialogId)
replyTo ! ProcessUpdateFailure(exception)
}
}
}
val eventHandler: EventHandler[State, Event] = (state, evt) => {
evt match {
case DialogAdded(chatId) =>
val dialogActor = ctx.spawn(Behaviors.supervise(CheckDeliveryDialog.behavior(chatId, botUri)).onFailure(SupervisorStrategy.restart), s"delivery-check-$chatId")
state.copy(dialogs = state.dialogs.updated(chatId, dialogActor))
}
}
EventSourcedBehavior(
persistenceId = PersistenceId("dialog-manager"),
emptyState = State(),
commandHandler = commandHandler,
eventHandler = eventHandler
)
} }
} }

View File

@@ -1,3 +0,0 @@
package eu.xeppaka.bot
trait JsonSerializable

View File

@@ -1,26 +1,21 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import java.nio.file.Paths import java.nio.file.Paths
import akka.actor.Scheduler import akka.actor.Scheduler
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ ActorSystem, DispatcherSelector, SupervisorStrategy } import akka.actor.typed.{ActorSystem, DispatcherSelector, SupervisorStrategy}
import akka.http.scaladsl.Http import akka.http.scaladsl.Http
import akka.util.Timeout import akka.util.Timeout
import akka.{ actor, Done } import akka.{Done, actor}
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.DeserializationFeature
import de.heikoseeberger.akkahttpjackson.JacksonSupport
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContextExecutor, Future } import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.io.StdIn import scala.io.StdIn
object Main { object Main {
JacksonSupport.defaultObjectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
JacksonSupport.defaultObjectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val botId = System.getProperty("botId", "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4") val botId = System.getProperty("botId", "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4")
val localPort = 8443 val localPort = 8443

View File

@@ -1,28 +1,30 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import java.io.InputStream import java.io.InputStream
import java.security.{ KeyStore, SecureRandom } import java.nio.file.Path
import java.security.{KeyStore, SecureRandom}
import java.util.UUID import java.util.UUID
import akka.Done import akka.Done
import akka.actor.ActorSystem import akka.actor.{ActorSystem, Scheduler}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed._ import akka.actor.typed.scaladsl.{Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector, SupervisorStrategy}
import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives.{ as, complete, entity, extractLog, onComplete, path, post } import akka.http.scaladsl.server.Directives.{as, complete, entity, extractLog, onComplete, path, post}
import akka.http.scaladsl.server.Route import akka.http.scaladsl.server.Route
import akka.http.scaladsl.{ ConnectionContext, Http, HttpExt, HttpsConnectionContext } import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext}
import akka.util.{ ByteString, Timeout } import akka.stream.ActorMaterializer
import eu.xeppaka.telegram.bot.TelegramEntities._ import akka.util.{ByteString, Timeout}
import javax.net.ssl.{ KeyManagerFactory, SSLContext, TrustManagerFactory } import eu.xeppaka.bot.TelegramEntities._
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.io.Source import scala.io.Source
import scala.util.{ Failure, Success } import scala.util.{Failure, Success}
object TelegramBot { object TelegramBot {
sealed trait Command sealed trait Command
@@ -33,34 +35,31 @@ object TelegramBot {
case object GetBotInfo case object GetBotInfo
case object GetWebhookInfo case object GetWebhookInfo
def behavior(botId: String, interface: String, localPort: Int, hookDomain: String, hookPort: Int, useHttpsServer: Boolean = true): Behavior[Command] = Behaviors.withStash(100) { stashBuffer => def behavior(botId: String, interface: String, localPort: Int, hookDomain: String, hookPort: Int, useHttpsServer: Boolean = true): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.setup[Command] { ctx =>
ctx.log.info("action=start_bot") ctx.log.info("action=start_bot")
implicit val untypedSystem: ActorSystem = ctx.system.toClassic implicit val untypedSystem: ActorSystem = ctx.system.toUntyped
implicit val actorMaterializer: ActorMaterializer = ActorMaterializer()
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
val botUri = BotUri(botId) val botUri = BotUri(botId)
val http: HttpExt = Http() val http: HttpExt = Http()
val hookId = UUID.randomUUID().toString val hookId = UUID.randomUUID().toString
val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId") val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId")
val httpsContext = if (useHttpsServer) Some(createHttpsConnectionContext) else None
val stashBuffer = StashBuffer[Command](10)
val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart)) val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart))
val routes = botRoutes(hookId, dialogManager)(ctx.system.scheduler) val routes = botRoutes(hookId, dialogManager)(untypedSystem.scheduler)
def bindServer: Behavior[Command] = Behaviors.setup[Command] { ctx => def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx =>
case class BindingSuccess(binding: Http.ServerBinding) extends Command case class BindingSuccess(binding: Http.ServerBinding) extends Command
case class BindingFailure(exception: Throwable) extends Command case class BindingFailure(exception: Throwable) extends Command
ctx.log.info("action=bind_server interface={} port={}", interface, localPort) ctx.log.info("action=bind_server interface={} port={}", interface, localPort)
val serverBuilder = http.newServerAt(interface, localPort) http
val bindFuture = if (useHttpsServer) { .bindAndHandle(routes, interface, localPort, httpsContext.getOrElse(http.defaultServerHttpContext))
serverBuilder.enableHttps(createHttpsConnectionContext).bindFlow(routes) .onComplete {
} else {
serverBuilder.bindFlow(routes)
}
bindFuture.onComplete {
case Success(binding) => ctx.self ! BindingSuccess(binding) case Success(binding) => ctx.self ! BindingSuccess(binding)
case Failure(exception) => ctx.self ! BindingFailure(exception) case Failure(exception) => ctx.self ! BindingFailure(exception)
} }
@@ -68,7 +67,7 @@ object TelegramBot {
Behaviors.receiveMessage[Command] { Behaviors.receiveMessage[Command] {
case BindingSuccess(binding) => case BindingSuccess(binding) =>
ctx.log.info("action=bind_server result=success") ctx.log.info("action=bind_server result=success")
setWebhook(binding) settingWebhook(binding)
case BindingFailure(exception) => case BindingFailure(exception) =>
ctx.log.error("action=bind_server result=failure", exception) ctx.log.error("action=bind_server result=failure", exception)
ctx.log.error("action=start_bot result=failure") ctx.log.error("action=start_bot result=failure")
@@ -79,13 +78,15 @@ object TelegramBot {
} }
} }
def unbindServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx => def unbindingServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object UnbindingSuccess extends Command case object UnbindingSuccess extends Command
case class UnbindingFailure(exception: Throwable) extends Command case class UnbindingFailure(exception: Throwable) extends Command
ctx.log.info("action=unbind_server interface={} port={}", interface, localPort) ctx.log.info("action=unbind_server interface={} port={}", interface, localPort)
binding.unbind().onComplete { binding
.unbind()
.onComplete {
case Success(Done) => ctx.self ! UnbindingSuccess case Success(Done) => ctx.self ! UnbindingSuccess
case Failure(exception) => ctx.self ! UnbindingFailure(exception) case Failure(exception) => ctx.self ! UnbindingFailure(exception)
} }
@@ -103,7 +104,7 @@ object TelegramBot {
} }
} }
def setWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx => def settingWebhook(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object SetWebhookSuccess extends Command case object SetWebhookSuccess extends Command
case class SetWebhookFailure(exception: Throwable) extends Command case class SetWebhookFailure(exception: Throwable) extends Command
@@ -126,7 +127,10 @@ object TelegramBot {
val formParts = immutable.Seq(urlPart, certificatePart).flatten val formParts = immutable.Seq(urlPart, certificatePart).flatten
val formData = Multipart.FormData.Strict(formParts) val formData = Multipart.FormData.Strict(formParts)
Marshal(formData).to[RequestEntity].flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))).onComplete { Marshal(formData)
.to[RequestEntity]
.flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity)))
.onComplete {
case Success(response) => case Success(response) =>
if (response.status.isSuccess()) if (response.status.isSuccess())
ctx.self ! SetWebhookSuccess ctx.self ! SetWebhookSuccess
@@ -139,15 +143,11 @@ object TelegramBot {
Behaviors.receiveMessage { Behaviors.receiveMessage {
case SetWebhookSuccess => case SetWebhookSuccess =>
ctx.log.info("action=set_webhook result=success") ctx.log.info("action=set_webhook result=success")
stashBuffer.unstashAll(started(binding)) stashBuffer.unstashAll(ctx, started(binding))
case SetWebhookFailure(exception) => case SetWebhookFailure(exception) =>
if (attempt > 20) { ctx.log.error("action=set_webhook result=failure", exception)
ctx.log.error(s"action=set_webhook result=failure attempt=$attempt", exception)
ctx.log.error("action=start_bot result=failure") ctx.log.error("action=start_bot result=failure")
unbindServer(binding, None) unbindingServer(binding, None)
} else {
setWebhook(binding, attempt = attempt + 1)
}
case otherCommand: Command => case otherCommand: Command =>
stashBuffer.stash(otherCommand) stashBuffer.stash(otherCommand)
Behaviors.same Behaviors.same
@@ -162,7 +162,9 @@ object TelegramBot {
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
http.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)).onComplete { http
.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST))
.onComplete {
case Success(response) => case Success(response) =>
if (response.status.isSuccess()) if (response.status.isSuccess())
ctx.self ! DeleteWebhookSuccess ctx.self ! DeleteWebhookSuccess
@@ -175,10 +177,10 @@ object TelegramBot {
Behaviors.receiveMessage { Behaviors.receiveMessage {
case DeleteWebhookSuccess => case DeleteWebhookSuccess =>
ctx.log.info("action=delete_webhook result=success") ctx.log.info("action=delete_webhook result=success")
unbindServer(binding, Some(replyTo)) unbindingServer(binding, Some(replyTo))
case DeleteWebhookFailure(exception) => case DeleteWebhookFailure(exception) =>
ctx.log.error("action=delete_webhook result=failure", exception) ctx.log.error("action=delete_webhook result=failure", exception)
unbindServer(binding, Some(replyTo)) unbindingServer(binding, Some(replyTo))
case _ => Behaviors.unhandled case _ => Behaviors.unhandled
} }
} }
@@ -187,7 +189,7 @@ object TelegramBot {
ctx.log.info("action=start_bot result=success") ctx.log.info("action=start_bot result=success")
Behaviors.receiveMessage[Command] { Behaviors.receiveMessage[Command] {
case Stop(replyTo) => case stopCommand@Stop(replyTo) =>
ctx.log.info("action=stop_bot") ctx.log.info("action=stop_bot")
deletingWebhook(binding, replyTo) deletingWebhook(binding, replyTo)
case _ => case _ =>
@@ -195,13 +197,13 @@ object TelegramBot {
} }
} }
bindServer bindingServer
}
} }
private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = { private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._
implicit val timeout: Timeout = 30.seconds implicit val timeout: Timeout = 30.seconds
@@ -209,11 +211,8 @@ object TelegramBot {
post { post {
extractLog { log => extractLog { log =>
entity(as[Update]) { update => entity(as[Update]) { update =>
// log.info("update={}", update) onComplete(updatesProcessor.?[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) {
// complete(StatusCodes.OK) case Success(processResult) => processResult match {
onComplete(updatesProcessor.ask[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) {
case Success(processResult) =>
processResult match {
case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK)) case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK))
case DialogManager.ProcessUpdateFailure(exception) => case DialogManager.ProcessUpdateFailure(exception) =>
log.error(exception, "action=process_update result=failure message={}", update) log.error(exception, "action=process_update result=failure message={}", update)

View File

@@ -0,0 +1,313 @@
package eu.xeppaka.bot
object TelegramEntities {
case class Response[T](ok: Boolean,
description: Option[String] = None,
error_code: Option[Int] = None,
result: T
)
case class GetMe(id: Int, is_bot: Boolean, first_name: String, username: String)
case class KeyboardButton(text: String,
request_contact: Option[Boolean] = None,
request_location: Option[Boolean] = None
)
case class InlineKeyboardButton(text: String,
url: Option[String] = None,
callback_data: Option[String] = None,
switch_inline_query: Option[String] = None,
switch_inline_query_current_chat: Option[String] = None,
callback_game: Option[String] = None,
pay: Option[Boolean] = None
)
sealed trait ReplyMarkup
case class ReplyKeyboardRemove(remove_keyboard: Boolean = true, selective: Option[Boolean] = None) extends ReplyMarkup
case class ReplyKeyboardMarkup(keyboard: Seq[Seq[KeyboardButton]],
resize_keyboard: Option[Boolean] = None,
one_time_keyboard: Option[Boolean] = None,
selective: Option[Boolean] = None
) extends ReplyMarkup
case class InlineKeyboardMarkup(inline_keyboard: Seq[Seq[InlineKeyboardButton]])
extends ReplyMarkup
case class ForceReply(force_reply: Boolean = true, selective: Option[Boolean] = None) extends ReplyMarkup
case class InlineQuery(id: String,
from: User,
location: Location,
query: String,
offset: String
)
case class Location(longitude: Float,
latitude: Float
)
case class Update(update_id: Int,
message: Option[Message] = None,
edited_message: Option[Message] = None,
channel_post: Option[Message] = None,
edited_channel_post: Option[Message] = None,
inline_query: Option[InlineQuery] = None,
chosen_inline_result: Option[ChosenInlineResult] = None,
callback_query: Option[CallbackQuery] = None,
shipping_query: Option[ShippingQuery] = None,
pre_checkout_query: Option[PreCheckoutQuery] = None
)
case class ChosenInlineResult(result_id: String,
from: User,
location: Option[Location] = None,
inline_message_id: Option[String] = None,
query: String
)
case class CallbackQuery(id: String,
from: User,
message: Option[Message] = None,
inline_message_id: Option[String] = None,
chat_instance: String,
data: Option[String] = None,
game_short_name: Option[String] = None
)
case class ShippingQuery(id: String,
from: User,
invoice_payload: String,
shipping_address: ShippingAddress
)
case class ShippingAddress(country_code: String,
state: String,
city: String,
street_line1: String,
street_line2: String,
post_code: String
)
case class PreCheckoutQuery(id: String,
from: User,
currency: String,
total_amount: Int,
invoice_payload: String,
shipping_option_id: Option[String] = None,
order_info: Option[OrderInfo] = None
)
case class OrderInfo(name: Option[String] = None,
phone_number: Option[String] = None,
email: Option[String] = None,
shipping_address: Option[ShippingAddress] = None
)
case class User(id: Int,
is_bot: Boolean,
first_name: String,
last_name: Option[String] = None,
username: Option[String] = None,
language_code: Option[String] = None
)
case class EditMessageReplyMarkup(chat_id: Option[Long],
message_id: Option[Int],
inline_message_id: Option[String],
reply_markup: Option[InlineKeyboardMarkup]
)
case class SendMessage(chat_id: Long,
text: String,
parse_mode: Option[String] = None,
disable_web_page_preview: Option[Boolean] = None,
disable_notification: Option[Boolean] = None,
reply_to_message_id: Option[Int] = None,
reply_markup: Option[ReplyMarkup] = None
)
case class Message(message_id: Int,
from: Option[User] = None,
date: Int,
chat: Chat,
forward_from: Option[User] = None,
forward_from_chat: Option[User] = None,
forward_from_message_id: Option[Int] = None,
forward_signature: Option[String] = None,
forward_date: Option[Int] = None,
reply_to_message: Option[Message] = None,
edit_date: Option[Int] = None,
media_group_id: Option[String] = None,
author_signature: Option[String] = None,
text: Option[String] = None,
entities: Option[Seq[MessageEntity]] = None,
caption_entities: Option[Seq[MessageEntity]] = None,
audio: Option[Audio] = None,
document: Option[Document] = None,
game: Option[Game] = None,
photo: Option[Seq[PhotoSize]] = None,
sticker: Option[Sticker] = None,
video: Option[Video] = None,
voice: Option[Voice] = None,
video_note: Option[VideoNote] = None,
caption: Option[String] = None,
contact: Option[Contact] = None,
location: Option[Location] = None,
venue: Option[Venue] = None,
new_chat_members: Option[Seq[User]] = None,
left_chat_member: Option[Seq[User]] = None,
new_chat_title: Option[String] = None,
new_chat_photo: Option[Seq[PhotoSize]] = None,
delete_chat_photo: Option[Boolean] = None,
group_chat_created: Option[Boolean] = None,
supergroup_chat_created: Option[Boolean] = None,
channel_chat_created: Option[Boolean] = None,
migrate_to_chat_id: Option[Int] = None,
migrate_from_chat_id: Option[Int] = None,
pinned_message: Option[Message] = None,
invoice: Option[Invoice] = None,
successful_payment: Option[SuccessfulPayment] = None,
connected_website: Option[String] = None
)
case class MessageEntity(`type`: String,
offset: Int,
length: Int,
url: Option[String] = None,
user: Option[User] = None
)
case class Contact(phone_number: String,
first_name: String,
last_name: Option[String] = None,
user_id: Option[Int] = None
)
case class Sticker(file_id: String,
width: Int,
height: Int,
thumb: Option[PhotoSize] = None,
emoji: Option[String] = None,
set_name: Option[String] = None,
mask_position: Option[String] = None,
file_size: Option[Int] = None
)
case class Video(file_id: String,
width: Int,
height: Int,
duration: Int,
thumb: Option[PhotoSize] = None,
mime_type: Option[String] = None,
file_size: Option[Int] = None
)
case class Audio(file_id: String,
duration: Int,
performer: Option[String] = None,
title: Option[String] = None,
mime_type: Option[String] = None,
file_size: Option[Int] = None
)
case class Document(file_id: String,
thumb: Option[PhotoSize] = None,
file_name: Option[String] = None,
mime_type: Option[String] = None,
file_size: Option[Int] = None
)
case class PhotoSize(file_id: String,
width: Int,
height: Int,
file_size: Option[Int] = None
)
case class Voice(file_id: String,
duration: Int,
mime_type: Option[String] = None,
file_size: Option[Int] = None
)
case class VideoNote(file_id: String,
length: Int,
duration: Int,
thumb: Option[PhotoSize] = None,
file_size: Option[Int] = None
)
case class ChatPhoto(small_file_id: String, big_file_id: String)
case class Chat(id: Long,
`type`: String,
title: Option[String] = None,
username: Option[String] = None,
first_name: Option[String] = None,
last_name: Option[String] = None,
all_members_are_administrators: Option[Boolean] = None,
photo: Option[ChatPhoto] = None,
description: Option[String] = None,
invite_link: Option[String] = None,
pinned_message: Option[Message] = None,
sticker_set_name: Option[String] = None,
can_set_sticker_set: Option[Boolean] = None
)
case class Game(title: String,
description: String,
photo: Seq[PhotoSize],
text: Option[String] = None,
text_entities: Option[Seq[MessageEntity]] = None,
animation: Option[Animation] = None
)
case class Animation(file_id: String,
thumb: Option[PhotoSize] = None,
file_name: Option[String] = None,
mime_type: Option[String] = None,
file_size: Option[Int] = None
)
case class InputFile()
case class Venue(location: Location,
title: String,
address: String,
foursquare_id: Option[String] = None
)
case class Invoice(title: String,
description: String,
start_parameter: String,
currency: String,
total_amount: Int
)
case class SuccessfulPayment(currency: String,
total_amount: Int,
invoice_payload: String,
shipping_option_id: Option[String] = None,
order_info: Option[OrderInfo] = None,
telegram_payment_charge_id: String,
provider_payment_charge_id: String
)
case class Webhook(url: String,
certificate: Option[InputFile] = None,
max_connections: Option[Int] = None,
allowed_updates: Option[Seq[String]] = None
)
case class WebhookInfo(url: String,
has_custom_certificate: Boolean,
pending_update_count: Int,
last_error_date: Option[Int] = None,
last_error_message: Option[String] = None,
max_connections: Option[Int] = None,
allowed_updates: Option[Seq[String]] = None
)
}

View File

@@ -0,0 +1,24 @@
package eu.xeppaka.bot
import cats.syntax.functor._
import eu.xeppaka.bot.TelegramEntities._
import io.circe.{Decoder, Encoder}
import io.circe.generic.auto._
import io.circe.syntax._
object TelegramEntitiesDerivations {
implicit val encodeReplyMarkup: Encoder[ReplyMarkup] = Encoder.instance {
case replyKeyboardMarkup: ReplyKeyboardMarkup => replyKeyboardMarkup.asJson
case replyKeyboardRemove: ReplyKeyboardRemove => replyKeyboardRemove.asJson
case inlineKeyboardMarkup: InlineKeyboardMarkup => inlineKeyboardMarkup.asJson
case forceReply: ForceReply => forceReply.asJson
}
implicit val decodeReplyMarkup: Decoder[ReplyMarkup] =
List[Decoder[ReplyMarkup]](
Decoder[ReplyKeyboardMarkup].widen,
Decoder[ReplyKeyboardRemove].widen,
Decoder[InlineKeyboardMarkup].widen,
Decoder[ForceReply].widen
).reduceLeft(_ or _)
}

View File

@@ -1,38 +0,0 @@
package eu.xeppaka.bot
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed.ActorSystem
import akka.http.scaladsl.model.{ ContentTypes, HttpEntity }
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.util.ByteString
import com.fasterxml.jackson.databind.DeserializationFeature
import de.heikoseeberger.akkahttpjackson.JacksonSupport
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
class UnmarshalSpec extends AnyFlatSpec with BeforeAndAfterAll {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
private var testkit: ActorTestKit = _
override protected def beforeAll(): Unit = {
JacksonSupport.defaultObjectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
testkit = ActorTestKit()
}
override protected def afterAll(): Unit = {
testkit.shutdownTestKit()
}
"test unmarshal" should "ok" in {
implicit val system: ActorSystem[Nothing] = testkit.system
val json =
"[{\"attributes\":{\"parcelType\":\"1 \",\"weight\":0,\"currency\":\"\",\"telefonTyp\":null,\"telefonNazev\":null,\"telefonCislo\":null,\"dobirka\":0,\"kusu\":null,\"ulozeniDo\":null,\"ulozniDoba\":null,\"zemePuvodu\":null,\"zemeUrceni\":null,\"dorucovaniDate\":null,\"dorucovaniOd\":null,\"dorucovaniDo\":null},\"states\":{\"state\":[{\"id\":\"-4\",\"date\":\"2020-12-27\",\"text\":\"Pro tento druh zásilek Česká pošta informace nezobrazuje.\",\"postcode\":null,\"postoffice\":null,\"idIcon\":null,\"publicAccess\":0,\"latitude\":null,\"longitude\":null,\"timeDeliveryAttempt\":null}]},\"id\":\"123456\"}]"
val entity = HttpEntity(contentType = ContentTypes.`application/json`, data = ByteString(json))
val result = Await.result(Unmarshal(json).to[Seq[Entities.ParcelHistory]], 2.seconds)
println(result)
}
}