Cats experiments

This commit is contained in:
Pavel Kachalouski
2023-07-30 21:01:22 +02:00
parent cb49d36849
commit 0c7be3abe6
27 changed files with 1048 additions and 962 deletions

3
.scalafmt.conf Normal file
View File

@@ -0,0 +1,3 @@
version = 3.7.11
maxColumn = 140
runner.dialect = scala3

View File

@@ -1,7 +1,7 @@
import Dependencies._ import Dependencies._
import Versions._ import Versions._
lazy val commonSettings = Seq(organization := "eu.xeppaka", scalaVersion := "2.13.4", mainClass := Some("eu.xeppaka.bot.Main")) lazy val commonSettings = Seq(organization := "eu.xeppaka", scalaVersion := "3.3.0", mainClass := Some("tech.xeppaka.bot.Main"))
inThisBuild(commonSettings) inThisBuild(commonSettings)
@@ -9,21 +9,14 @@ lazy val `telegram-bot-delivery` = (project in file("."))
.settings( .settings(
name := "telegram-bot-delivery", name := "telegram-bot-delivery",
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
akkaTyped, catsCore,
akkaSerializationJackson, catsEffect,
akkaClusterShardingTyped, sttpClient,
akkaHttp, tapirHttp4sServer,
akkaHttpJackson,
akkaStream,
akkaPersistence,
akkaPersistenceCassandra,
akkaPersistenceQuery,
akkaTestkitTyped % Test,
scalaTest % Test, scalaTest % Test,
slibTelegram, slibTelegram,
logback logback
), ),
dependencyOverrides ++= Seq("com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion),
dockerBaseImage := "openjdk:11", dockerBaseImage := "openjdk:11",
dockerExposedPorts := Seq(8443), dockerExposedPorts := Seq(8443),
dockerRepository := Some("registry.xeppaka.eu:443"), dockerRepository := Some("registry.xeppaka.eu:443"),

View File

@@ -4,26 +4,20 @@ import Dependencies.Versions._
object Dependencies { object Dependencies {
object Versions { object Versions {
val akkaVersion = "2.6.10" val catsEffectVersion = "3.5.0"
val akkaHttpVersion = "10.2.2" val catsVersion = "2.9.0"
val akkaHttpJacksonVersion = "1.35.2" val sttpClientVersion = "3.8.16"
val akkaPersistenceCassandraVersion = "1.0.4" val logbackVersion = "1.4.8"
val scalaTestVersion = "3.2.2" val scalaTestVersion = "3.2.16"
val slibTelegramVersion = "0.1.0" val slibTelegramVersion = "0.1.0"
val logbackVersion = "1.2.3" val tapirVersion = "1.6.4"
} }
val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion val catsCore = "org.typelevel" %% "cats-core" % catsVersion
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion val catsEffect = "org.typelevel" %% "cats-effect" % catsEffectVersion
val akkaSerializationJackson = "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion val sttpClient = "com.softwaremill.sttp.client3" %% "core" % sttpClientVersion
val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
val akkaHttpJackson = "de.heikoseeberger" %% "akka-http-jackson" % akkaHttpJacksonVersion
val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion
val akkaClusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion
val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandraVersion
val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion
val akkaTestkitTyped = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion
val slibTelegram = "eu.xeppaka" %% "slib-telegram" % slibTelegramVersion
val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
val slibTelegram = "tech.xeppaka" %% "slib-telegram" % slibTelegramVersion
val tapirHttp4sServer = "com.softwaremill.sttp.tapir" %% "tapir-http4s-server" % tapirVersion
} }

View File

@@ -1 +1 @@
sbt.version=1.4.5 sbt.version=1.9.3

View File

@@ -1,2 +1,3 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.8.0") addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13") addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")

View File

@@ -1,11 +1,12 @@
<configuration debug="true" scan="true" > <?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder> <encoder>
<pattern>%date [%level] %logger: %message%n%xException</pattern> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder> </encoder>
</appender> </appender>
<root level="DEBUG">
<root level="INFO">
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
</root> </root>
</configuration> </configuration>

View File

@@ -1,15 +0,0 @@
package eu.xeppaka.bot
import akka.http.scaladsl.model.Uri
case class BotUri(botId: String) {
private val baseUri = Uri(s"https://api.telegram.org/bot$botId")
val botUri: Uri = baseUri
val getMe: Uri = baseUri.withPath(baseUri.path / "getMe")
val setWebhook: Uri = baseUri.withPath(baseUri.path / "setWebhook")
val deleteWebhook: Uri = baseUri.withPath(baseUri.path / "deleteWebhook")
val getWebhookInfo: Uri = baseUri.withPath(baseUri.path / "getWebhookInfo")
val sendMessage: Uri = baseUri.withPath(baseUri.path / "sendMessage")
val editMessageReplyMarkup: Uri = baseUri.withPath(baseUri.path / "editMessageReplyMarkup")
}

View File

@@ -1,302 +0,0 @@
package eu.xeppaka.bot
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.http.scaladsl.marshalling.Marshal
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.{ ByteString, Timeout }
import eu.xeppaka.telegram.bot.TelegramEntities._
import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
object CheckDeliveryDialog {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
sealed trait Command
sealed trait CommandResult
sealed trait DialogCommand extends Command
case class ProcessMessage(msg: Message, replyTo: ActorRef[CommandResult]) extends Command
case object ProcessMessageSuccess extends CommandResult
case class ProcessMessageFailure(exception: Throwable) extends CommandResult
case object AddParcel extends DialogCommand
case object RemoveParcel extends DialogCommand
case object ListParcels extends DialogCommand
case object Help extends DialogCommand
object DialogCommand {
def parse(text: String): DialogCommand = text match {
case "/add" => AddParcel
case "/remove" => RemoveParcel
case "/list" => ListParcels
case "/help" => Help
case _ => Help
}
}
// internal messages
private case class DeliveryStateChanged(state: String) extends Command
private val helpMessage =
"""
|Supported commands:
|/add - add parcel to a list of watched parcels
|/list - list watched parcels
|/remove - remove parcel from a watching list
""".stripMargin
private val commandsKeyboard = Some(
ReplyKeyboardMarkup(Seq(Seq(KeyboardButton("/add"), KeyboardButton("/list"), KeyboardButton("/remove"))), resize_keyboard = Some(true), one_time_keyboard = Some(true))
)
private val removeKeyboard = Some(ReplyKeyboardRemove())
def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
implicit val system: ActorSystem = ctx.system.toClassic
implicit val executionContext: ExecutionContext = ctx.executionContext
val http = Http()
val deliveryStateAdapter: ActorRef[CzechPostDeliveryCheck.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state))
val czechPostDeliveryCheck = ctx.spawnAnonymous(Behaviors.supervise(CzechPostDeliveryCheck.behavior(chatId.toString, deliveryStateAdapter)).onFailure(SupervisorStrategy.restart))
def waitCommand: Behavior[Command] = Behaviors.receiveMessage {
case ProcessMessage(msg, replyTo) =>
val command = msg.text.map(text => DialogCommand.parse(text))
replyTo ! ProcessMessageSuccess
if (command.isDefined) {
ctx.self ! command.get
Behaviors.same
} else {
val message = SendMessage(chatId, "This command is unsupported.")
sendMessage(message, waitCommand, waitCommand)
}
case AddParcel =>
val parcelIdMessage = SendMessage(chatId, "Please enter a parcel ID.", reply_markup = removeKeyboard)
val commentMessage = SendMessage(chatId, "Please enter a comment.", reply_markup = removeKeyboard)
sendMessage(parcelIdMessage, waitTextMessage(parcelId => sendMessage(commentMessage, waitTextMessage(comment => addParcel(parcelId, comment)), waitCommand)), waitCommand)
case RemoveParcel =>
removeParcel(waitCommand, waitCommand)
case ListParcels =>
listParcels
case Help =>
val message = SendMessage(chatId, helpMessage, reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
case DeliveryStateChanged(state) =>
val message = SendMessage(chatId, state, Some("Markdown"))
sendMessage(message, waitCommand, waitCommand)
case _ =>
Behaviors.unhandled
}
def addParcel(parcelId: String, comment: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
Behaviors.setup { ctx =>
case object AddParcelSuccess extends Command
case class AddParcelFailure(exception: Throwable) extends Command
implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) {
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception)
case Failure(exception) => AddParcelFailure(exception)
}
Behaviors.receiveMessage {
case AddParcelSuccess =>
val message = SendMessage(chatId, s"Parcel $parcelId was added to the watch list.", reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
case AddParcelFailure(exception) =>
exception match {
case CzechPostDeliveryCheck.DuplicateParcelId(_) =>
val message = SendMessage(chatId, s"Parcel $parcelId is in the watch list already.", reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
case _ =>
ctx.log.error("action=add_parcel result=failure", exception)
val message = SendMessage(chatId, s"Adding parcel failed. Please try again.", reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
}
case otherMessage =>
stashBuffer.stash(otherMessage)
Behaviors.same
}
}
}
def listParcels: Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
Behaviors.setup { ctx =>
case class ListParcelsSuccess(parcelsList: Seq[String]) extends Command
case class ListParcelsFailure(exception: Throwable) extends Command
implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcels(ref)) {
case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList)
case Failure(exception) => ListParcelsFailure(exception)
}
Behaviors.receiveMessage {
case ListParcelsSuccess(parcelsList) =>
val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.sorted.mkString("\n") else "(empty)")
val message = SendMessage(chatId, messageText, Some("Markdown"), reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
case ListParcelsFailure(exception) =>
ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception)
val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
case otherMessage =>
stashBuffer.stash(otherMessage)
Behaviors.same
}
}
}
def removeParcel(onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
Behaviors.setup { ctx =>
case class ListParcelIdsSuccess(parcelsList: Seq[String]) extends Command
case class ListParcelIdsFailure(exception: Throwable) extends Command
implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcelIds(ref)) {
case Success(CzechPostDeliveryCheck.ListParcelIdsResult(parcelsList)) => ListParcelIdsSuccess(parcelsList)
case Failure(exception) => ListParcelIdsFailure(exception)
}
Behaviors.receiveMessage {
case ListParcelIdsSuccess(parcelsList) =>
if (parcelsList.nonEmpty) {
val keyboardButtons = parcelsList.sorted.grouped(3).map(_.map(id => KeyboardButton(id))).toSeq
val markup = ReplyKeyboardMarkup(keyboard = keyboardButtons, resize_keyboard = Some(true), one_time_keyboard = Some(true))
val message = SendMessage(chatId, "Please enter a parcel id to remove.", reply_markup = Some(markup))
sendMessage(message, waitTextMessage(parcelId => removeParcelId(parcelId)), onFailure)
} else {
val message = SendMessage(chatId, "You don't have watched parcels. There is nothing to remove.", reply_markup = commandsKeyboard)
sendMessage(message, onSuccess, onFailure)
}
case ListParcelIdsFailure(exception) =>
ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception)
val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
case otherMessage =>
stashBuffer.stash(otherMessage)
Behaviors.same
}
}
}
def removeParcelId(parcelId: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
Behaviors.setup { ctx =>
case object RemoveParcelSuccess extends Command
case class RemoveParcelFailure(exception: Throwable) extends Command
implicit val timeout: Timeout = 5.seconds
ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) {
case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess
case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception)
case Failure(exception) => RemoveParcelFailure(exception)
}
Behaviors.receiveMessage {
case RemoveParcelSuccess =>
val message = SendMessage(chatId, s"Parcel $parcelId was removed from the watch list.", reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
case RemoveParcelFailure(exception) =>
exception match {
case CzechPostDeliveryCheck.ParcelIdNotFound(_) =>
val message = SendMessage(chatId, s"Parcel $parcelId is not found in the list of the watched parcels.", reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
case _ =>
ctx.log.error("action=add_parcel result=failure", exception)
val message = SendMessage(chatId, s"Remove of the parcel failed. Please try again.", reply_markup = commandsKeyboard)
sendMessage(message, waitCommand, waitCommand)
}
case otherMessage =>
stashBuffer.stash(otherMessage)
Behaviors.same
}
}
}
// def selectPostType(onFinish: PostType => Behavior[Command]): Behavior[Command] = Behaviors.receiveMessage {
//
// case ProcessMessage(msg, replyTo) =>
// val button1 = KeyboardButton("button1")
// val button2 = KeyboardButton("button2")
// val keyboard = ReplyKeyboardMarkup(Seq(Seq(button1, button2)))
// val message = SendMessage(chatId, "Please enter parcel ID.", reply_markup = Some(keyboard))
// sendMessage(message, waitParcelId(parcelId => addParcel(parcelId)), waitCommand)
// }
def waitTextMessage(onFinish: String => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
Behaviors.receiveMessage {
case ProcessMessage(msg, replyTo) =>
if (msg.text.isDefined) {
val parcelId = msg.text.get
replyTo ! ProcessMessageSuccess
onFinish(parcelId)
} else {
replyTo ! ProcessMessageSuccess
waitTextMessage(onFinish)
}
case otherMsg =>
stashBuffer.stash(otherMsg)
Behaviors.same
}
}
def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
Behaviors.setup[Command] { ctx =>
case object SendMessageSuccess extends Command
case class SendMessageFailure(exception: Throwable) extends Command
ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, message)
println(message)
println(Await.result(Marshal(message).to[HttpEntity], 2.seconds).asInstanceOf[HttpEntity.Strict].data.utf8String)
Source
.future(Marshal(message).to[RequestEntity])
.initialDelay(2.seconds * (attempt - 1))
.map(requestEntity => HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = requestEntity))
.mapAsync(1) { request =>
http.singleRequest(request).transform {
case Success(response) =>
if (response.status.isSuccess()) {
Success(SendMessageSuccess)
} else {
Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}.")))
}
case Failure(exception) =>
ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId", exception)
Success(SendMessageFailure(exception))
}
}
.to(Sink.foreach(ctx.self ! _))
.run()
Behaviors.receiveMessage {
case SendMessageSuccess =>
ctx.log.debug("action=send_message status=finished result=success chat_id={}", chatId)
stashBuffer.unstashAll(onSuccess)
case SendMessageFailure(exception) =>
ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId attempt=$attempt", exception)
if (attempt > 5) {
ctx.log.error("action=send_message result=failure message=attempts threshold exceeded", exception)
stashBuffer.unstashAll(onFailure)
} else {
sendMessage(message, onSuccess, onFailure, attempt + 1)
}
case otherMsg =>
stashBuffer.stash(otherMsg)
Behaviors.same
}
}
}
waitCommand
}
}

View File

@@ -1,248 +0,0 @@
package eu.xeppaka.bot
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ ActorRef, Behavior, DispatcherSelector }
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{ `User-Agent`, Accept }
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings }
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{ ConnectionContext, Http }
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.{ CommandHandler, EventHandler }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import com.fasterxml.jackson.annotation.{ JsonSubTypes, JsonTypeInfo }
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import java.security.cert.X509Certificate
import java.text.SimpleDateFormat
import javax.net.ssl.{ KeyManager, SSLContext, X509TrustManager }
import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
object Entities {
case class Attributes(parcelType: String, weight: Double, currency: String)
case class State(
id: String,
date: String,
text: String,
postcode: Option[String],
postoffice: Option[String],
idIcon: Option[Int],
publicAccess: Int,
latitude: Option[Double],
longitude: Option[Double],
timeDeliveryAttempt: Option[String]
)
case class States(state: Seq[State])
case class ParcelHistory(id: String, attributes: Attributes, states: States)
}
object CzechPostDeliveryCheck {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
private val czechPostDateFormat = new SimpleDateFormat("yyyy-MM-dd")
private val printDateFormat = new SimpleDateFormat("dd-MM-yyyy")
private val entityType = "czechpost"
sealed trait Command extends JsonSerializable
sealed trait CommandResult extends JsonSerializable
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
Array(
new JsonSubTypes.Type(value = classOf[ParcelAttributesChanged], name = "parcel_attributes_changed"),
new JsonSubTypes.Type(value = classOf[ParcelAdded], name = "parcel_added"),
new JsonSubTypes.Type(value = classOf[ParcelRemoved], name = "parcel_removed"),
new JsonSubTypes.Type(value = classOf[ParcelHistoryStateAdded], name = "parcel_history_state_added")
)
)
sealed trait Event extends JsonSerializable
case class Parcel(comment: String, attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) {
def fullStatePrint(parcelId: String): String = {
val statesString = states.toSeq
.sortBy(state => czechPostDateFormat.parse(state.date))
.map(state => s"${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}\n===========================\n")
.mkString
s"""|*New state(s) of the parcel $parcelId ($comment):*
|===========================
|$statesString""".stripMargin
}
def latestStatePrint(parcelId: String): String = {
latestState.map(state => s"$parcelId ($comment) - ${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}").getOrElse(s"$parcelId ($comment) - NO INFO")
}
private def latestState: Option[Entities.State] = states.toSeq.maxByOption(state => czechPostDateFormat.parse(state.date))
}
case class State(parcelStates: Map[String, Parcel] = Map.empty) extends JsonSerializable {
def latestStatesPrint: Seq[String] = parcelStates.map { case (id, parcel) => parcel.latestStatePrint(id) }.to(Seq)
}
case class AddParcel(parcelId: String, comment: String, replyTo: ActorRef[CommandResult]) extends Command
case class RemoveParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command
case class ListParcels(replyTo: ActorRef[ListParcelsResult]) extends Command
case class ListParcelsResult(parcelsList: Seq[String])
case class ListParcelIds(replyTo: ActorRef[ListParcelIdsResult]) extends Command
case class ListParcelIdsResult(parcelIds: Seq[String])
case object CommandResultSuccess extends CommandResult
case class CommandResultFailure(exception: Throwable) extends CommandResult
case class ParcelIdNotFound(parcelId: String) extends Exception
case class DuplicateParcelId(parcelId: String) extends Exception
// internal commands
private case object CheckParcels extends Command
private case class ParcelHistoryRetrieved(parcelHistory: Entities.ParcelHistory) extends Command
case class DeliveryStateChanged(state: String)
case class ParcelAdded(parcelId: String, comment: String) extends Event
case class ParcelRemoved(parcelId: String) extends Event
case class ParcelHistoryStateAdded(parcelId: String, state: Entities.State) extends Event
case class ParcelAttributesChanged(parcelId: String, attributes: Entities.Attributes) extends Event
private val trustfulSslContext: SSLContext = {
object NoCheckX509TrustManager extends X509TrustManager {
override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]()
}
val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null)
context
}
def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = checkParcel(chatId, stateReporter)
private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.withTimers { scheduler =>
Behaviors.setup { ctx =>
implicit val actorSystem: ActorSystem = ctx.system.toClassic
implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
val http = Http()
val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withAcceptAnyCertificate(true).withDisableHostnameVerification(true)))
val originalCtx = http.createClientHttpsContext(badSslConfig)
val sslContext = ConnectionContext.httpsClient(trustfulSslContext)
val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0")))
val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings)
scheduler.startTimerAtFixedRate("check-delivery-state", CheckParcels, 5.minutes)
val log = ctx.log
val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
cmd match {
case AddParcel(parcelId, comment, replyTo) =>
val parcelIdUpper = parcelId.toUpperCase
if (state.parcelStates.keySet.contains(parcelIdUpper)) {
Effect.none.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper)))
} else {
Effect
.persist(ParcelAdded(parcelIdUpper, comment))
.thenRun(_ => {
replyTo ! CommandResultSuccess
ctx.self ! CheckParcels
})
}
case RemoveParcel(parcelId, replyTo) =>
val parcelIdUpper = parcelId.toUpperCase
if (state.parcelStates.contains(parcelIdUpper)) {
Effect.persist(ParcelRemoved(parcelIdUpper)).thenRun(_ => replyTo ! CommandResultSuccess)
} else {
Effect.none.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper)))
}
case ListParcels(replyTo) =>
Effect.none.thenRun { state =>
val parcelsList = state.latestStatesPrint
replyTo ! ListParcelsResult(parcelsList)
}
case ListParcelIds(replyTo) =>
Effect.none.thenRun { state =>
replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq)
}
case CheckParcels =>
Effect.none.thenRun { _ =>
log.info("action=check_parcel_state chat_id={}", chatId)
val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id"))
for (ids <- parcelIds) {
val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz")
val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`)))
log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri)
http
.singleRequest(request, connectionContext = sslContext, settings = connectionSettings)
.transform {
case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}."))
case response: Failure[HttpResponse] => response
}
.flatMap(response => Unmarshal(response).to[Seq[Entities.ParcelHistory]])
.andThen {
case Success(parcelHistories) =>
parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory))
case Failure(exception) =>
log.error("Error checking parcel history.", exception)
}
.andThen {
case Success(_) => log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri)
case Failure(exception) => log.error(s"action=check_parcel_state result=failure chat_id=$chatId check_uri=$checkUri", exception)
}
}
}
case ParcelHistoryRetrieved(parcelHistory) =>
val parcelId = parcelHistory.id
val parcelState = state.parcelStates(parcelId)
val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty)
Some(parcelHistory.attributes)
else
parcelState.attributes.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None))
.map(attributes => ParcelAttributesChanged(parcelId, attributes))
.toSeq
val newStates = parcelHistory.states.state.toSet -- parcelState.states
val stateEvents: Seq[Event] = newStates.map(state => ParcelHistoryStateAdded(parcelId, state)).toSeq
val comment = state.parcelStates(parcelId).comment
Effect
.persist(attributesChangedEvents ++ stateEvents)
.thenRun(_ => {
if (newStates.nonEmpty) {
stateReporter ! DeliveryStateChanged(Parcel(comment, None, newStates).fullStatePrint(parcelId))
}
})
}
}
val eventHandler: EventHandler[State, Event] = (state, evt) => {
evt match {
case ParcelAdded(parcelId, comment) =>
state.copy(parcelStates = state.parcelStates + (parcelId -> Parcel(comment)))
case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId)
case ParcelHistoryStateAdded(parcelId, newState) =>
val parcelState = state.parcelStates(parcelId)
val newParcelState = parcelState.copy(states = parcelState.states + newState)
state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
case ParcelAttributesChanged(parcelId, newAttributes) =>
val parcelState = state.parcelStates(parcelId)
val newParcelState = parcelState.copy(attributes = Some(newAttributes))
state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
}
}
EventSourcedBehavior[Command, Event, State](persistenceId = PersistenceId(entityType, chatId), emptyState = State(), commandHandler = commandHandler, eventHandler = eventHandler)
}
}
}

View File

@@ -1,54 +0,0 @@
package eu.xeppaka.bot
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.util.Timeout
import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess}
import eu.xeppaka.telegram.bot.TelegramEntities._
import scala.concurrent.duration._
import scala.util.{Failure, Success}
object DialogManager {
sealed trait Command
sealed trait CommandResult
case class ProcessUpdate(update: Update, replyTo: ActorRef[CommandResult]) extends Command
case object ProcessUpdateSuccess extends CommandResult
case class ProcessUpdateFailure(exception: Throwable) extends CommandResult
// internal messages
private case class DialogResponseSuccess(dialogId: Long, replyTo: ActorRef[CommandResult]) extends Command
private case class DialogResponseFailure(dialogId: Long, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command
def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.receiveMessagePartial {
case ProcessUpdate(update, replyTo) =>
if (update.message.isDefined) {
val chatId = update.message.get.chat.id
ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get)
val msg = update.message.get
val dialogActor = ctx.child(chatId.toString).getOrElse(ctx.spawn(CheckDeliveryDialog.behavior(chatId, botUri), chatId.toString)).unsafeUpcast[CheckDeliveryDialog.Command]
ctx.log.info("action=ask_dialog id={}", chatId)
implicit val timeout: Timeout = 5.seconds
ctx.ask[CheckDeliveryDialog.Command, CheckDeliveryDialog.CommandResult](dialogActor, replyTo => CheckDeliveryDialog.ProcessMessage(msg, replyTo)) {
case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo)
case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo)
case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo)
}
} else {
ctx.log.debug("action=process_update result=success message=update message is empty")
}
Behaviors.same
case DialogResponseSuccess(dialogId, replyTo) =>
ctx.log.info("action=ask_dialog id={} result=success", dialogId)
replyTo ! ProcessUpdateSuccess
Behaviors.same
case DialogResponseFailure(dialogId, exception, replyTo) =>
ctx.log.error(s"action=ask_dialog id=$dialogId result=failure", exception)
replyTo ! ProcessUpdateFailure(exception)
Behaviors.same
}
}
}

View File

@@ -1,49 +0,0 @@
package eu.xeppaka.bot
import java.nio.file.Paths
import akka.actor.Scheduler
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ ActorSystem, DispatcherSelector, SupervisorStrategy }
import akka.http.scaladsl.Http
import akka.util.Timeout
import akka.{ actor, Done }
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.DeserializationFeature
import de.heikoseeberger.akkahttpjackson.JacksonSupport
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContextExecutor, Future }
import scala.io.StdIn
object Main {
JacksonSupport.defaultObjectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
JacksonSupport.defaultObjectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def main(args: Array[String]): Unit = {
val botId = System.getProperty("botId", "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4")
val localPort = 8443
val hookDomain = System.getProperty("hookDomain", "xeppaka.eu")
val hookPort = System.getProperty("hookPort", "8443").toInt
val useHttpsServer = System.getProperty("useHttpsServer", "true").toBoolean
val botBehavior = Behaviors.supervise(TelegramBot.behavior(botId, "0.0.0.0", localPort, hookDomain, hookPort, useHttpsServer)).onFailure(SupervisorStrategy.restart)
val telegramBot = ActorSystem(botBehavior, "telegram-bot-delivery")
// implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped
// implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default())
// implicit val scheduler: Scheduler = telegramBot.scheduler
// implicit val timeout: Timeout = 10.seconds
// println("Press enter to finish bot...")
// StdIn.readLine()
//
// val stopFuture: Future[Done] = telegramBot ? (ref => TelegramBot.Stop(ref))
//
// val terminateFuture = stopFuture
// .andThen { case _ => Http().shutdownAllConnectionPools() }
// .andThen { case _ => telegramBot.terminate() }
//
// Await.ready(terminateFuture, 20.seconds)
}
}

View File

@@ -1,252 +0,0 @@
package eu.xeppaka.bot
import java.io.InputStream
import java.security.{ KeyStore, SecureRandom }
import java.util.UUID
import akka.Done
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed._
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives.{ as, complete, entity, extractLog, onComplete, path, post }
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.{ ConnectionContext, Http, HttpExt, HttpsConnectionContext }
import akka.util.{ ByteString, Timeout }
import eu.xeppaka.telegram.bot.TelegramEntities._
import javax.net.ssl.{ KeyManagerFactory, SSLContext, TrustManagerFactory }
import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.io.Source
import scala.util.{ Failure, Success }
object TelegramBot {
sealed trait Command
sealed trait CommandResult
sealed trait StopResult extends CommandResult
case class Stop(replyTo: ActorRef[Done]) extends Command
case object GetBotInfo
case object GetWebhookInfo
def behavior(botId: String, interface: String, localPort: Int, hookDomain: String, hookPort: Int, useHttpsServer: Boolean = true): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
Behaviors.setup[Command] { ctx =>
ctx.log.info("action=start_bot")
implicit val untypedSystem: ActorSystem = ctx.system.toClassic
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
val botUri = BotUri(botId)
val http: HttpExt = Http()
val hookId = UUID.randomUUID().toString
val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId")
val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart))
val routes = botRoutes(hookId, dialogManager)(ctx.system.scheduler)
def bindServer: Behavior[Command] = Behaviors.setup[Command] { ctx =>
case class BindingSuccess(binding: Http.ServerBinding) extends Command
case class BindingFailure(exception: Throwable) extends Command
ctx.log.info("action=bind_server interface={} port={}", interface, localPort)
val serverBuilder = http.newServerAt(interface, localPort)
val bindFuture = if (useHttpsServer) {
serverBuilder.enableHttps(createHttpsConnectionContext).bindFlow(routes)
} else {
serverBuilder.bindFlow(routes)
}
bindFuture.onComplete {
case Success(binding) => ctx.self ! BindingSuccess(binding)
case Failure(exception) => ctx.self ! BindingFailure(exception)
}
Behaviors.receiveMessage[Command] {
case BindingSuccess(binding) =>
ctx.log.info("action=bind_server result=success")
setWebhook(binding)
case BindingFailure(exception) =>
ctx.log.error("action=bind_server result=failure", exception)
ctx.log.error("action=start_bot result=failure")
Behaviors.stopped
case otherCommand: Command =>
stashBuffer.stash(otherCommand)
Behaviors.same
}
}
def unbindServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object UnbindingSuccess extends Command
case class UnbindingFailure(exception: Throwable) extends Command
ctx.log.info("action=unbind_server interface={} port={}", interface, localPort)
binding.unbind().onComplete {
case Success(Done) => ctx.self ! UnbindingSuccess
case Failure(exception) => ctx.self ! UnbindingFailure(exception)
}
Behaviors.receiveMessage[Command] {
case UnbindingSuccess =>
ctx.log.info("action=unbind_server result=success")
replyTo.foreach(_ ! Done)
Behaviors.stopped
case UnbindingFailure(exception) =>
ctx.log.error("action=unbind_server result=failure", exception)
replyTo.foreach(_ ! Done)
Behaviors.stopped
case _ => Behaviors.unhandled
}
}
def setWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object SetWebhookSuccess extends Command
case class SetWebhookFailure(exception: Throwable) extends Command
ctx.log.info("action=set_webhook url={} webhook={}", botUri.setWebhook, webhookUri)
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString()))
val urlPart = Some(Multipart.FormData.BodyPart.Strict("url", urlEntity))
val certificatePart = if (useHttpsServer) {
val certificate = ByteString(Source.fromResource("telegram-bot.pem").mkString)
val certificateEntity = HttpEntity.Strict(ContentTypes.`application/octet-stream`, certificate)
Some(Multipart.FormData.BodyPart.Strict("certificate", certificateEntity, Map("filename" -> "cert.pem")))
} else {
None
}
val formParts = immutable.Seq(urlPart, certificatePart).flatten
val formData = Multipart.FormData.Strict(formParts)
Marshal(formData).to[RequestEntity].flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))).onComplete {
case Success(response) =>
if (response.status.isSuccess())
ctx.self ! SetWebhookSuccess
else
ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}."))
case Failure(exception) =>
ctx.self ! SetWebhookFailure(exception)
}
Behaviors.receiveMessage {
case SetWebhookSuccess =>
ctx.log.info("action=set_webhook result=success")
stashBuffer.unstashAll(started(binding))
case SetWebhookFailure(exception) =>
if (attempt > 20) {
ctx.log.error(s"action=set_webhook result=failure attempt=$attempt", exception)
ctx.log.error("action=start_bot result=failure")
unbindServer(binding, None)
} else {
setWebhook(binding, attempt = attempt + 1)
}
case otherCommand: Command =>
stashBuffer.stash(otherCommand)
Behaviors.same
}
}
def deletingWebhook(binding: Http.ServerBinding, replyTo: ActorRef[Done]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
case object DeleteWebhookSuccess extends Command
case class DeleteWebhookFailure(exception: Throwable) extends Command
ctx.log.info("action=delete_webhook url={} webhook={}", botUri.deleteWebhook, webhookUri)
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
http.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)).onComplete {
case Success(response) =>
if (response.status.isSuccess())
ctx.self ! DeleteWebhookSuccess
else
ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}"))
case Failure(exception) =>
ctx.self ! DeleteWebhookFailure(exception)
}
Behaviors.receiveMessage {
case DeleteWebhookSuccess =>
ctx.log.info("action=delete_webhook result=success")
unbindServer(binding, Some(replyTo))
case DeleteWebhookFailure(exception) =>
ctx.log.error("action=delete_webhook result=failure", exception)
unbindServer(binding, Some(replyTo))
case _ => Behaviors.unhandled
}
}
def started(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx =>
ctx.log.info("action=start_bot result=success")
Behaviors.receiveMessage[Command] {
case Stop(replyTo) =>
ctx.log.info("action=stop_bot")
deletingWebhook(binding, replyTo)
case _ =>
Behaviors.unhandled
}
}
bindServer
}
}
private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = {
import de.heikoseeberger.akkahttpjackson.JacksonSupport._
import akka.actor.typed.scaladsl.AskPattern._
implicit val timeout: Timeout = 30.seconds
path(hookId) {
post {
extractLog { log =>
entity(as[Update]) { update =>
// log.info("update={}", update)
// complete(StatusCodes.OK)
onComplete(updatesProcessor.ask[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) {
case Success(processResult) =>
processResult match {
case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK))
case DialogManager.ProcessUpdateFailure(exception) =>
log.error(exception, "action=process_update result=failure message={}", update)
complete(HttpResponse(status = StatusCodes.InternalServerError))
}
case Failure(exception) =>
log.error(exception, "action=process_update result=failure message={}", update)
complete(HttpResponse(status = StatusCodes.InternalServerError))
}
}
}
}
}
}
private def createHttpsConnectionContext: HttpsConnectionContext = {
val password: Array[Char] = "".toCharArray // do not store passwords in code, read them from somewhere safe!
val ks: KeyStore = KeyStore.getInstance("PKCS12")
val keystore: InputStream = getClass.getResourceAsStream("/telegram-bot.p12")
require(keystore != null, "Keystore required!")
ks.load(keystore, password)
val keyManagerFactory: KeyManagerFactory = KeyManagerFactory.getInstance("SunX509")
keyManagerFactory.init(ks, password)
val tmf: TrustManagerFactory = TrustManagerFactory.getInstance("SunX509")
tmf.init(ks)
val sslContext: SSLContext = SSLContext.getInstance("TLS")
sslContext.init(keyManagerFactory.getKeyManagers, tmf.getTrustManagers, new SecureRandom)
ConnectionContext.https(sslContext)
}
}

View File

@@ -0,0 +1,16 @@
package eu.xeppaka.bot
import sttp.client3._
import sttp.model.Uri
case class BotUri(botId: String) {
private val baseUri = uri"https://api.telegram.org/bot$botId"
val botUri: Uri = baseUri
val getMe: Uri = baseUri.addPath("getMe")
val setWebhook: Uri = baseUri.addPath("setWebhook")
val deleteWebhook: Uri = baseUri.addPath("deleteWebhook")
val getWebhookInfo: Uri = baseUri.addPath("getWebhookInfo")
val sendMessage: Uri = baseUri.addPath("sendMessage")
val editMessageReplyMarkup: Uri = baseUri.addPath("editMessageReplyMarkup")
}

View File

@@ -0,0 +1,302 @@
package eu.xeppaka.bot
// import akka.actor.ActorSystem
// import akka.actor.typed.scaladsl.Behaviors
// import akka.actor.typed.scaladsl.adapter._
// import akka.http.scaladsl.marshalling.Marshal
// import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
// import akka.http.scaladsl.Http
// import akka.http.scaladsl.model._
// import akka.stream.scaladsl.{Sink, Source}
// import akka.util.{ByteString, Timeout}
// import eu.xeppaka.telegram.bot.TelegramEntities._
// import scala.concurrent.{Await, ExecutionContext}
// import scala.concurrent.duration._
// import scala.util.{Failure, Success}
object CheckDeliveryDialog {
// import de.heikoseeberger.akkahttpjackson.JacksonSupport._
// sealed trait Command
// sealed trait CommandResult
// sealed trait DialogCommand extends Command
// case class ProcessMessage(msg: Message, replyTo: ActorRef[CommandResult]) extends Command
// case object ProcessMessageSuccess extends CommandResult
// case class ProcessMessageFailure(exception: Throwable) extends CommandResult
// case object AddParcel extends DialogCommand
// case object RemoveParcel extends DialogCommand
// case object ListParcels extends DialogCommand
// case object Help extends DialogCommand
// object DialogCommand {
// def parse(text: String): DialogCommand = text match {
// case "/add" => AddParcel
// case "/remove" => RemoveParcel
// case "/list" => ListParcels
// case "/help" => Help
// case _ => Help
// }
// }
// // internal messages
// private case class DeliveryStateChanged(state: String) extends Command
// private val helpMessage =
// """
// |Supported commands:
// |/add - add parcel to a list of watched parcels
// |/list - list watched parcels
// |/remove - remove parcel from a watching list
// """.stripMargin
// private val commandsKeyboard = Some(
// ReplyKeyboardMarkup(Seq(Seq(KeyboardButton("/add"), KeyboardButton("/list"), KeyboardButton("/remove"))), resize_keyboard = Some(true), one_time_keyboard = Some(true))
// )
// private val removeKeyboard = Some(ReplyKeyboardRemove())
// def behavior(chatId: Long, botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
// implicit val system: ActorSystem = ctx.system.toClassic
// implicit val executionContext: ExecutionContext = ctx.executionContext
// val http = Http()
// val deliveryStateAdapter: ActorRef[CzechPostDeliveryCheck.DeliveryStateChanged] = ctx.messageAdapter(stateChanged => DeliveryStateChanged(stateChanged.state))
// val czechPostDeliveryCheck = ctx.spawnAnonymous(Behaviors.supervise(CzechPostDeliveryCheck.behavior(chatId.toString, deliveryStateAdapter)).onFailure(SupervisorStrategy.restart))
// def waitCommand: Behavior[Command] = Behaviors.receiveMessage {
// case ProcessMessage(msg, replyTo) =>
// val command = msg.text.map(text => DialogCommand.parse(text))
// replyTo ! ProcessMessageSuccess
// if (command.isDefined) {
// ctx.self ! command.get
// Behaviors.same
// } else {
// val message = SendMessage(chatId, "This command is unsupported.")
// sendMessage(message, waitCommand, waitCommand)
// }
// case AddParcel =>
// val parcelIdMessage = SendMessage(chatId, "Please enter a parcel ID.", reply_markup = removeKeyboard)
// val commentMessage = SendMessage(chatId, "Please enter a comment.", reply_markup = removeKeyboard)
// sendMessage(parcelIdMessage, waitTextMessage(parcelId => sendMessage(commentMessage, waitTextMessage(comment => addParcel(parcelId, comment)), waitCommand)), waitCommand)
// case RemoveParcel =>
// removeParcel(waitCommand, waitCommand)
// case ListParcels =>
// listParcels
// case Help =>
// val message = SendMessage(chatId, helpMessage, reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// case DeliveryStateChanged(state) =>
// val message = SendMessage(chatId, state, Some("Markdown"))
// sendMessage(message, waitCommand, waitCommand)
// case _ =>
// Behaviors.unhandled
// }
// def addParcel(parcelId: String, comment: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
// Behaviors.setup { ctx =>
// case object AddParcelSuccess extends Command
// case class AddParcelFailure(exception: Throwable) extends Command
// implicit val timeout: Timeout = 5.seconds
// ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.AddParcel(parcelId, comment, ref)) {
// case Success(CzechPostDeliveryCheck.CommandResultSuccess) => AddParcelSuccess
// case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => AddParcelFailure(exception)
// case Failure(exception) => AddParcelFailure(exception)
// }
// Behaviors.receiveMessage {
// case AddParcelSuccess =>
// val message = SendMessage(chatId, s"Parcel $parcelId was added to the watch list.", reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// case AddParcelFailure(exception) =>
// exception match {
// case CzechPostDeliveryCheck.DuplicateParcelId(_) =>
// val message = SendMessage(chatId, s"Parcel $parcelId is in the watch list already.", reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// case _ =>
// ctx.log.error("action=add_parcel result=failure", exception)
// val message = SendMessage(chatId, s"Adding parcel failed. Please try again.", reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// }
// case otherMessage =>
// stashBuffer.stash(otherMessage)
// Behaviors.same
// }
// }
// }
// def listParcels: Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
// Behaviors.setup { ctx =>
// case class ListParcelsSuccess(parcelsList: Seq[String]) extends Command
// case class ListParcelsFailure(exception: Throwable) extends Command
// implicit val timeout: Timeout = 5.seconds
// ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcels(ref)) {
// case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList)
// case Failure(exception) => ListParcelsFailure(exception)
// }
// Behaviors.receiveMessage {
// case ListParcelsSuccess(parcelsList) =>
// val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.sorted.mkString("\n") else "(empty)")
// val message = SendMessage(chatId, messageText, Some("Markdown"), reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// case ListParcelsFailure(exception) =>
// ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception)
// val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// case otherMessage =>
// stashBuffer.stash(otherMessage)
// Behaviors.same
// }
// }
// }
// def removeParcel(onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
// Behaviors.setup { ctx =>
// case class ListParcelIdsSuccess(parcelsList: Seq[String]) extends Command
// case class ListParcelIdsFailure(exception: Throwable) extends Command
// implicit val timeout: Timeout = 5.seconds
// ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.ListParcelIds(ref)) {
// case Success(CzechPostDeliveryCheck.ListParcelIdsResult(parcelsList)) => ListParcelIdsSuccess(parcelsList)
// case Failure(exception) => ListParcelIdsFailure(exception)
// }
// Behaviors.receiveMessage {
// case ListParcelIdsSuccess(parcelsList) =>
// if (parcelsList.nonEmpty) {
// val keyboardButtons = parcelsList.sorted.grouped(3).map(_.map(id => KeyboardButton(id))).toSeq
// val markup = ReplyKeyboardMarkup(keyboard = keyboardButtons, resize_keyboard = Some(true), one_time_keyboard = Some(true))
// val message = SendMessage(chatId, "Please enter a parcel id to remove.", reply_markup = Some(markup))
// sendMessage(message, waitTextMessage(parcelId => removeParcelId(parcelId)), onFailure)
// } else {
// val message = SendMessage(chatId, "You don't have watched parcels. There is nothing to remove.", reply_markup = commandsKeyboard)
// sendMessage(message, onSuccess, onFailure)
// }
// case ListParcelIdsFailure(exception) =>
// ctx.log.error(s"action=list_parcels result=failure chat_id=$chatId", exception)
// val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// case otherMessage =>
// stashBuffer.stash(otherMessage)
// Behaviors.same
// }
// }
// }
// def removeParcelId(parcelId: String): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
// Behaviors.setup { ctx =>
// case object RemoveParcelSuccess extends Command
// case class RemoveParcelFailure(exception: Throwable) extends Command
// implicit val timeout: Timeout = 5.seconds
// ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.CommandResult](czechPostDeliveryCheck, ref => CzechPostDeliveryCheck.RemoveParcel(parcelId, ref)) {
// case Success(CzechPostDeliveryCheck.CommandResultSuccess) => RemoveParcelSuccess
// case Success(CzechPostDeliveryCheck.CommandResultFailure(exception)) => RemoveParcelFailure(exception)
// case Failure(exception) => RemoveParcelFailure(exception)
// }
// Behaviors.receiveMessage {
// case RemoveParcelSuccess =>
// val message = SendMessage(chatId, s"Parcel $parcelId was removed from the watch list.", reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// case RemoveParcelFailure(exception) =>
// exception match {
// case CzechPostDeliveryCheck.ParcelIdNotFound(_) =>
// val message = SendMessage(chatId, s"Parcel $parcelId is not found in the list of the watched parcels.", reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// case _ =>
// ctx.log.error("action=add_parcel result=failure", exception)
// val message = SendMessage(chatId, s"Remove of the parcel failed. Please try again.", reply_markup = commandsKeyboard)
// sendMessage(message, waitCommand, waitCommand)
// }
// case otherMessage =>
// stashBuffer.stash(otherMessage)
// Behaviors.same
// }
// }
// }
// // def selectPostType(onFinish: PostType => Behavior[Command]): Behavior[Command] = Behaviors.receiveMessage {
// //
// // case ProcessMessage(msg, replyTo) =>
// // val button1 = KeyboardButton("button1")
// // val button2 = KeyboardButton("button2")
// // val keyboard = ReplyKeyboardMarkup(Seq(Seq(button1, button2)))
// // val message = SendMessage(chatId, "Please enter parcel ID.", reply_markup = Some(keyboard))
// // sendMessage(message, waitParcelId(parcelId => addParcel(parcelId)), waitCommand)
// // }
// def waitTextMessage(onFinish: String => Behavior[Command]): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
// Behaviors.receiveMessage {
// case ProcessMessage(msg, replyTo) =>
// if (msg.text.isDefined) {
// val parcelId = msg.text.get
// replyTo ! ProcessMessageSuccess
// onFinish(parcelId)
// } else {
// replyTo ! ProcessMessageSuccess
// waitTextMessage(onFinish)
// }
// case otherMsg =>
// stashBuffer.stash(otherMsg)
// Behaviors.same
// }
// }
// def sendMessage(message: SendMessage, onSuccess: => Behavior[Command], onFailure: => Behavior[Command], attempt: Int = 1): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
// Behaviors.setup[Command] { ctx =>
// case object SendMessageSuccess extends Command
// case class SendMessageFailure(exception: Throwable) extends Command
// ctx.log.debug("action=send_message status=started chat_id={} message={}", chatId, message)
// println(message)
// println(Await.result(Marshal(message).to[HttpEntity], 2.seconds).asInstanceOf[HttpEntity.Strict].data.utf8String)
// Source
// .future(Marshal(message).to[RequestEntity])
// .initialDelay(2.seconds * (attempt - 1))
// .map(requestEntity => HttpRequest(HttpMethods.POST, uri = botUri.sendMessage, entity = requestEntity))
// .mapAsync(1) { request =>
// http.singleRequest(request).transform {
// case Success(response) =>
// if (response.status.isSuccess()) {
// Success(SendMessageSuccess)
// } else {
// Success(SendMessageFailure(new RuntimeException(s"Error while sending message. HTTP status: ${response.status}.")))
// }
// case Failure(exception) =>
// ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId", exception)
// Success(SendMessageFailure(exception))
// }
// }
// .to(Sink.foreach(ctx.self ! _))
// .run()
// Behaviors.receiveMessage {
// case SendMessageSuccess =>
// ctx.log.debug("action=send_message status=finished result=success chat_id={}", chatId)
// stashBuffer.unstashAll(onSuccess)
// case SendMessageFailure(exception) =>
// ctx.log.error(s"action=send_message status=finished result=failure chat_id=$chatId attempt=$attempt", exception)
// if (attempt > 5) {
// ctx.log.error("action=send_message result=failure message=attempts threshold exceeded", exception)
// stashBuffer.unstashAll(onFailure)
// } else {
// sendMessage(message, onSuccess, onFailure, attempt + 1)
// }
// case otherMsg =>
// stashBuffer.stash(otherMsg)
// Behaviors.same
// }
// }
// }
// waitCommand
// }
}

View File

@@ -0,0 +1,248 @@
package eu.xeppaka.bot
// import akka.actor.ActorSystem
// import akka.actor.typed.scaladsl.Behaviors
// import akka.actor.typed.scaladsl.adapter._
// import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
// import akka.http.scaladsl.model._
// import akka.http.scaladsl.model.headers.{`User-Agent`, Accept}
// import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
// import akka.http.scaladsl.unmarshalling.Unmarshal
// import akka.http.scaladsl.{ConnectionContext, Http}
// import akka.persistence.typed.PersistenceId
// import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler}
// import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
// import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
// import com.typesafe.sslconfig.akka.AkkaSSLConfig
// import java.security.cert.X509Certificate
// import java.text.SimpleDateFormat
// import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager}
// import scala.collection.immutable
// import scala.concurrent.ExecutionContextExecutor
// import scala.concurrent.duration._
// import scala.util.{Failure, Success}
object Entities {
// case class Attributes(parcelType: String, weight: Double, currency: String)
// case class State(
// id: String,
// date: String,
// text: String,
// postcode: Option[String],
// postoffice: Option[String],
// idIcon: Option[Int],
// publicAccess: Int,
// latitude: Option[Double],
// longitude: Option[Double],
// timeDeliveryAttempt: Option[String]
// )
// case class States(state: Seq[State])
// case class ParcelHistory(id: String, attributes: Attributes, states: States)
// }
// object CzechPostDeliveryCheck {
// import de.heikoseeberger.akkahttpjackson.JacksonSupport._
// private val czechPostDateFormat = new SimpleDateFormat("yyyy-MM-dd")
// private val printDateFormat = new SimpleDateFormat("dd-MM-yyyy")
// private val entityType = "czechpost"
// sealed trait Command extends JsonSerializable
// sealed trait CommandResult extends JsonSerializable
// @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
// @JsonSubTypes(
// Array(
// new JsonSubTypes.Type(value = classOf[ParcelAttributesChanged], name = "parcel_attributes_changed"),
// new JsonSubTypes.Type(value = classOf[ParcelAdded], name = "parcel_added"),
// new JsonSubTypes.Type(value = classOf[ParcelRemoved], name = "parcel_removed"),
// new JsonSubTypes.Type(value = classOf[ParcelHistoryStateAdded], name = "parcel_history_state_added")
// )
// )
// sealed trait Event extends JsonSerializable
// case class Parcel(comment: String, attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) {
// def fullStatePrint(parcelId: String): String = {
// val statesString = states.toSeq
// .sortBy(state => czechPostDateFormat.parse(state.date))
// .map(state => s"${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}\n===========================\n")
// .mkString
// s"""|*New state(s) of the parcel $parcelId ($comment):*
// |===========================
// |$statesString""".stripMargin
// }
// def latestStatePrint(parcelId: String): String = {
// latestState.map(state => s"$parcelId ($comment) - ${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}").getOrElse(s"$parcelId ($comment) - NO INFO")
// }
// private def latestState: Option[Entities.State] = states.toSeq.maxByOption(state => czechPostDateFormat.parse(state.date))
// }
// case class State(parcelStates: Map[String, Parcel] = Map.empty) extends JsonSerializable {
// def latestStatesPrint: Seq[String] = parcelStates.map { case (id, parcel) => parcel.latestStatePrint(id) }.to(Seq)
// }
// case class AddParcel(parcelId: String, comment: String, replyTo: ActorRef[CommandResult]) extends Command
// case class RemoveParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command
// case class ListParcels(replyTo: ActorRef[ListParcelsResult]) extends Command
// case class ListParcelsResult(parcelsList: Seq[String])
// case class ListParcelIds(replyTo: ActorRef[ListParcelIdsResult]) extends Command
// case class ListParcelIdsResult(parcelIds: Seq[String])
// case object CommandResultSuccess extends CommandResult
// case class CommandResultFailure(exception: Throwable) extends CommandResult
// case class ParcelIdNotFound(parcelId: String) extends Exception
// case class DuplicateParcelId(parcelId: String) extends Exception
// // internal commands
// private case object CheckParcels extends Command
// private case class ParcelHistoryRetrieved(parcelHistory: Entities.ParcelHistory) extends Command
// case class DeliveryStateChanged(state: String)
// case class ParcelAdded(parcelId: String, comment: String) extends Event
// case class ParcelRemoved(parcelId: String) extends Event
// case class ParcelHistoryStateAdded(parcelId: String, state: Entities.State) extends Event
// case class ParcelAttributesChanged(parcelId: String, attributes: Entities.Attributes) extends Event
// private val trustfulSslContext: SSLContext = {
// object NoCheckX509TrustManager extends X509TrustManager {
// override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
// override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
// override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]()
// }
// val context = SSLContext.getInstance("TLS")
// context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null)
// context
// }
// def behavior(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = checkParcel(chatId, stateReporter)
// private def checkParcel(chatId: String, stateReporter: ActorRef[DeliveryStateChanged]): Behavior[Command] = Behaviors.withTimers { scheduler =>
// Behaviors.setup { ctx =>
// implicit val actorSystem: ActorSystem = ctx.system.toClassic
// implicit val executionContext: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
// val http = Http()
// val badSslConfig = AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withAcceptAnyCertificate(true).withDisableHostnameVerification(true)))
// val originalCtx = http.createClientHttpsContext(badSslConfig)
// val sslContext = ConnectionContext.httpsClient(trustfulSslContext)
// val clientConnectionSettings = ClientConnectionSettings(actorSystem).withUserAgentHeader(Some(`User-Agent`("Mozilla/5.0 (X11; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0")))
// val connectionSettings = ConnectionPoolSettings(actorSystem).withConnectionSettings(clientConnectionSettings)
// scheduler.startTimerAtFixedRate("check-delivery-state", CheckParcels, 5.minutes)
// val log = ctx.log
// val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
// cmd match {
// case AddParcel(parcelId, comment, replyTo) =>
// val parcelIdUpper = parcelId.toUpperCase
// if (state.parcelStates.keySet.contains(parcelIdUpper)) {
// Effect.none.thenRun(_ => replyTo ! CommandResultFailure(DuplicateParcelId(parcelIdUpper)))
// } else {
// Effect
// .persist(ParcelAdded(parcelIdUpper, comment))
// .thenRun(_ => {
// replyTo ! CommandResultSuccess
// ctx.self ! CheckParcels
// })
// }
// case RemoveParcel(parcelId, replyTo) =>
// val parcelIdUpper = parcelId.toUpperCase
// if (state.parcelStates.contains(parcelIdUpper)) {
// Effect.persist(ParcelRemoved(parcelIdUpper)).thenRun(_ => replyTo ! CommandResultSuccess)
// } else {
// Effect.none.thenRun(_ => replyTo ! CommandResultFailure(ParcelIdNotFound(parcelIdUpper)))
// }
// case ListParcels(replyTo) =>
// Effect.none.thenRun { state =>
// val parcelsList = state.latestStatesPrint
// replyTo ! ListParcelsResult(parcelsList)
// }
// case ListParcelIds(replyTo) =>
// Effect.none.thenRun { state =>
// replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq)
// }
// case CheckParcels =>
// Effect.none.thenRun { _ =>
// log.info("action=check_parcel_state chat_id={}", chatId)
// val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id"))
// for (ids <- parcelIds) {
// val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz")
// val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`)))
// log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri)
// http
// .singleRequest(request, connectionContext = sslContext, settings = connectionSettings)
// .transform {
// case Success(response) => if (response.status.isSuccess()) Success(response) else Failure(new Exception(s"Check parcel returned HTTP status: ${response.status.value}."))
// case response: Failure[HttpResponse] => response
// }
// .flatMap(response => Unmarshal(response).to[Seq[Entities.ParcelHistory]])
// .andThen {
// case Success(parcelHistories) =>
// parcelHistories.foreach(parcelHistory => ctx.self ! ParcelHistoryRetrieved(parcelHistory))
// case Failure(exception) =>
// log.error("Error checking parcel history.", exception)
// }
// .andThen {
// case Success(_) => log.info("action=check_parcel_state result=success chat_id={} check_uri={}", chatId, checkUri)
// case Failure(exception) => log.error(s"action=check_parcel_state result=failure chat_id=$chatId check_uri=$checkUri", exception)
// }
// }
// }
// case ParcelHistoryRetrieved(parcelHistory) =>
// val parcelId = parcelHistory.id
// val parcelState = state.parcelStates(parcelId)
// val attributesChangedEvents: Seq[Event] = (if (parcelState.attributes.isEmpty)
// Some(parcelHistory.attributes)
// else
// parcelState.attributes.flatMap(oldAttributes => if (oldAttributes != parcelHistory.attributes) Some(parcelHistory.attributes) else None))
// .map(attributes => ParcelAttributesChanged(parcelId, attributes))
// .toSeq
// val newStates = parcelHistory.states.state.toSet -- parcelState.states
// val stateEvents: Seq[Event] = newStates.map(state => ParcelHistoryStateAdded(parcelId, state)).toSeq
// val comment = state.parcelStates(parcelId).comment
// Effect
// .persist(attributesChangedEvents ++ stateEvents)
// .thenRun(_ => {
// if (newStates.nonEmpty) {
// stateReporter ! DeliveryStateChanged(Parcel(comment, None, newStates).fullStatePrint(parcelId))
// }
// })
// }
// }
// val eventHandler: EventHandler[State, Event] = (state, evt) => {
// evt match {
// case ParcelAdded(parcelId, comment) =>
// state.copy(parcelStates = state.parcelStates + (parcelId -> Parcel(comment)))
// case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId)
// case ParcelHistoryStateAdded(parcelId, newState) =>
// val parcelState = state.parcelStates(parcelId)
// val newParcelState = parcelState.copy(states = parcelState.states + newState)
// state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
// case ParcelAttributesChanged(parcelId, newAttributes) =>
// val parcelState = state.parcelStates(parcelId)
// val newParcelState = parcelState.copy(attributes = Some(newAttributes))
// state.copy(parcelStates = state.parcelStates.updated(parcelId, newParcelState))
// }
// }
// EventSourcedBehavior[Command, Event, State](persistenceId = PersistenceId(entityType, chatId), emptyState = State(), commandHandler = commandHandler, eventHandler = eventHandler)
// }
// }
}

View File

@@ -0,0 +1,54 @@
package eu.xeppaka.bot
// import akka.actor.typed.scaladsl.Behaviors
// import akka.actor.typed.{ActorRef, Behavior}
// import akka.util.Timeout
// import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess}
// import eu.xeppaka.telegram.bot.TelegramEntities._
// import scala.concurrent.duration._
// import scala.util.{Failure, Success}
object DialogManager {
// sealed trait Command
// sealed trait CommandResult
// case class ProcessUpdate(update: Update, replyTo: ActorRef[CommandResult]) extends Command
// case object ProcessUpdateSuccess extends CommandResult
// case class ProcessUpdateFailure(exception: Throwable) extends CommandResult
// // internal messages
// private case class DialogResponseSuccess(dialogId: Long, replyTo: ActorRef[CommandResult]) extends Command
// private case class DialogResponseFailure(dialogId: Long, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command
// def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
// Behaviors.receiveMessagePartial {
// case ProcessUpdate(update, replyTo) =>
// if (update.message.isDefined) {
// val chatId = update.message.get.chat.id
// ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get)
// val msg = update.message.get
// val dialogActor = ctx.child(chatId.toString).getOrElse(ctx.spawn(CheckDeliveryDialog.behavior(chatId, botUri), chatId.toString)).unsafeUpcast[CheckDeliveryDialog.Command]
// ctx.log.info("action=ask_dialog id={}", chatId)
// implicit val timeout: Timeout = 5.seconds
// ctx.ask[CheckDeliveryDialog.Command, CheckDeliveryDialog.CommandResult](dialogActor, replyTo => CheckDeliveryDialog.ProcessMessage(msg, replyTo)) {
// case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo)
// case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo)
// case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo)
// }
// } else {
// ctx.log.debug("action=process_update result=success message=update message is empty")
// }
// Behaviors.same
// case DialogResponseSuccess(dialogId, replyTo) =>
// ctx.log.info("action=ask_dialog id={} result=success", dialogId)
// replyTo ! ProcessUpdateSuccess
// Behaviors.same
// case DialogResponseFailure(dialogId, exception, replyTo) =>
// ctx.log.error(s"action=ask_dialog id=$dialogId result=failure", exception)
// replyTo ! ProcessUpdateFailure(exception)
// Behaviors.same
// }
// }
}

View File

@@ -0,0 +1,50 @@
package eu.xeppaka.bot
// import java.nio.file.Paths
// import akka.actor.Scheduler
// import akka.actor.typed.scaladsl.AskPattern._
// import akka.actor.typed.scaladsl.Behaviors
// import akka.actor.typed.scaladsl.adapter._
// import akka.actor.typed.{ActorSystem, DispatcherSelector, SupervisorStrategy}
// import akka.http.scaladsl.Http
// import akka.util.Timeout
// import akka.{actor, Done}
// import com.fasterxml.jackson.annotation.JsonInclude
// import com.fasterxml.jackson.databind.DeserializationFeature
// import de.heikoseeberger.akkahttpjackson.JacksonSupport
// import scala.concurrent.duration._
// import scala.concurrent.{Await, ExecutionContextExecutor, Future}
// import scala.io.StdIn
object Main {
// JacksonSupport.defaultObjectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
// JacksonSupport.defaultObjectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def main(args: Array[String]): Unit = {
// val botId = System.getProperty("botId", "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4")
// val localPort = 8443
// val hookDomain = System.getProperty("hookDomain", "xeppaka.eu")
// val hookPort = System.getProperty("hookPort", "8443").toInt
// val useHttpsServer = System.getProperty("useHttpsServer", "true").toBoolean
// val botBehavior = Behaviors.supervise(TelegramBot.behavior(botId, "0.0.0.0", localPort, hookDomain, hookPort, useHttpsServer)).onFailure(SupervisorStrategy.restart)
// val telegramBot = ActorSystem(botBehavior, "telegram-bot-delivery")
// implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped
// implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default())
// implicit val scheduler: Scheduler = telegramBot.scheduler
// implicit val timeout: Timeout = 10.seconds
// println("Press enter to finish bot...")
// StdIn.readLine()
//
// val stopFuture: Future[Done] = telegramBot ? (ref => TelegramBot.Stop(ref))
//
// val terminateFuture = stopFuture
// .andThen { case _ => Http().shutdownAllConnectionPools() }
// .andThen { case _ => telegramBot.terminate() }
//
// Await.ready(terminateFuture, 20.seconds)
}
}

View File

@@ -0,0 +1,252 @@
package eu.xeppaka.bot
import java.io.InputStream
import java.security.{KeyStore, SecureRandom}
import java.util.UUID
// import akka.Done
// import akka.actor.ActorSystem
// import akka.actor.typed.scaladsl.Behaviors
// import akka.actor.typed.scaladsl.adapter._
// import akka.actor.typed._
// import akka.http.scaladsl.marshalling.Marshal
// import akka.http.scaladsl.model._
// import akka.http.scaladsl.server.Directives.{as, complete, entity, extractLog, onComplete, path, post}
// import akka.http.scaladsl.server.Route
// import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext}
// import akka.util.{ByteString, Timeout}
// import eu.xeppaka.telegram.bot.TelegramEntities._
// import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
// import scala.collection.immutable
// import scala.concurrent.ExecutionContextExecutor
// import scala.concurrent.duration._
// import scala.io.Source
// import scala.util.{Failure, Success}
object TelegramBot {
// sealed trait Command
// sealed trait CommandResult
// sealed trait StopResult extends CommandResult
// case class Stop(replyTo: ActorRef[Done]) extends Command
// case object GetBotInfo
// case object GetWebhookInfo
// def behavior(botId: String, interface: String, localPort: Int, hookDomain: String, hookPort: Int, useHttpsServer: Boolean = true): Behavior[Command] = Behaviors.withStash(100) { stashBuffer =>
// Behaviors.setup[Command] { ctx =>
// ctx.log.info("action=start_bot")
// implicit val untypedSystem: ActorSystem = ctx.system.toClassic
// implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
// val botUri = BotUri(botId)
// val http: HttpExt = Http()
// val hookId = UUID.randomUUID().toString
// val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId")
// val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart))
// val routes = botRoutes(hookId, dialogManager)(ctx.system.scheduler)
// def bindServer: Behavior[Command] = Behaviors.setup[Command] { ctx =>
// case class BindingSuccess(binding: Http.ServerBinding) extends Command
// case class BindingFailure(exception: Throwable) extends Command
// ctx.log.info("action=bind_server interface={} port={}", interface, localPort)
// val serverBuilder = http.newServerAt(interface, localPort)
// val bindFuture = if (useHttpsServer) {
// serverBuilder.enableHttps(createHttpsConnectionContext).bindFlow(routes)
// } else {
// serverBuilder.bindFlow(routes)
// }
// bindFuture.onComplete {
// case Success(binding) => ctx.self ! BindingSuccess(binding)
// case Failure(exception) => ctx.self ! BindingFailure(exception)
// }
// Behaviors.receiveMessage[Command] {
// case BindingSuccess(binding) =>
// ctx.log.info("action=bind_server result=success")
// setWebhook(binding)
// case BindingFailure(exception) =>
// ctx.log.error("action=bind_server result=failure", exception)
// ctx.log.error("action=start_bot result=failure")
// Behaviors.stopped
// case otherCommand: Command =>
// stashBuffer.stash(otherCommand)
// Behaviors.same
// }
// }
// def unbindServer(binding: Http.ServerBinding, replyTo: Option[ActorRef[Done]]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
// case object UnbindingSuccess extends Command
// case class UnbindingFailure(exception: Throwable) extends Command
// ctx.log.info("action=unbind_server interface={} port={}", interface, localPort)
// binding.unbind().onComplete {
// case Success(Done) => ctx.self ! UnbindingSuccess
// case Failure(exception) => ctx.self ! UnbindingFailure(exception)
// }
// Behaviors.receiveMessage[Command] {
// case UnbindingSuccess =>
// ctx.log.info("action=unbind_server result=success")
// replyTo.foreach(_ ! Done)
// Behaviors.stopped
// case UnbindingFailure(exception) =>
// ctx.log.error("action=unbind_server result=failure", exception)
// replyTo.foreach(_ ! Done)
// Behaviors.stopped
// case _ => Behaviors.unhandled
// }
// }
// def setWebhook(binding: Http.ServerBinding, attempt: Int = 1): Behavior[Command] = Behaviors.setup[Command] { ctx =>
// case object SetWebhookSuccess extends Command
// case class SetWebhookFailure(exception: Throwable) extends Command
// ctx.log.info("action=set_webhook url={} webhook={}", botUri.setWebhook, webhookUri)
// implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
// val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString()))
// val urlPart = Some(Multipart.FormData.BodyPart.Strict("url", urlEntity))
// val certificatePart = if (useHttpsServer) {
// val certificate = ByteString(Source.fromResource("telegram-bot.pem").mkString)
// val certificateEntity = HttpEntity.Strict(ContentTypes.`application/octet-stream`, certificate)
// Some(Multipart.FormData.BodyPart.Strict("certificate", certificateEntity, Map("filename" -> "cert.pem")))
// } else {
// None
// }
// val formParts = immutable.Seq(urlPart, certificatePart).flatten
// val formData = Multipart.FormData.Strict(formParts)
// Marshal(formData).to[RequestEntity].flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))).onComplete {
// case Success(response) =>
// if (response.status.isSuccess())
// ctx.self ! SetWebhookSuccess
// else
// ctx.self ! SetWebhookFailure(new RuntimeException(s"Set webhook HTTP response status: ${response.status.value}."))
// case Failure(exception) =>
// ctx.self ! SetWebhookFailure(exception)
// }
// Behaviors.receiveMessage {
// case SetWebhookSuccess =>
// ctx.log.info("action=set_webhook result=success")
// stashBuffer.unstashAll(started(binding))
// case SetWebhookFailure(exception) =>
// if (attempt > 20) {
// ctx.log.error(s"action=set_webhook result=failure attempt=$attempt", exception)
// ctx.log.error("action=start_bot result=failure")
// unbindServer(binding, None)
// } else {
// setWebhook(binding, attempt = attempt + 1)
// }
// case otherCommand: Command =>
// stashBuffer.stash(otherCommand)
// Behaviors.same
// }
// }
// def deletingWebhook(binding: Http.ServerBinding, replyTo: ActorRef[Done]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
// case object DeleteWebhookSuccess extends Command
// case class DeleteWebhookFailure(exception: Throwable) extends Command
// ctx.log.info("action=delete_webhook url={} webhook={}", botUri.deleteWebhook, webhookUri)
// implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
// http.singleRequest(HttpRequest(uri = botUri.deleteWebhook, method = HttpMethods.POST)).onComplete {
// case Success(response) =>
// if (response.status.isSuccess())
// ctx.self ! DeleteWebhookSuccess
// else
// ctx.self ! DeleteWebhookFailure(new RuntimeException(s"Delete webhook HTTP response status: ${response.status.value}"))
// case Failure(exception) =>
// ctx.self ! DeleteWebhookFailure(exception)
// }
// Behaviors.receiveMessage {
// case DeleteWebhookSuccess =>
// ctx.log.info("action=delete_webhook result=success")
// unbindServer(binding, Some(replyTo))
// case DeleteWebhookFailure(exception) =>
// ctx.log.error("action=delete_webhook result=failure", exception)
// unbindServer(binding, Some(replyTo))
// case _ => Behaviors.unhandled
// }
// }
// def started(binding: Http.ServerBinding): Behavior[Command] = Behaviors.setup[Command] { ctx =>
// ctx.log.info("action=start_bot result=success")
// Behaviors.receiveMessage[Command] {
// case Stop(replyTo) =>
// ctx.log.info("action=stop_bot")
// deletingWebhook(binding, replyTo)
// case _ =>
// Behaviors.unhandled
// }
// }
// bindServer
// }
// }
// private def botRoutes(hookId: String, updatesProcessor: ActorRef[DialogManager.ProcessUpdate])(implicit scheduler: Scheduler): Route = {
// import de.heikoseeberger.akkahttpjackson.JacksonSupport._
// import akka.actor.typed.scaladsl.AskPattern._
// implicit val timeout: Timeout = 30.seconds
// path(hookId) {
// post {
// extractLog { log =>
// entity(as[Update]) { update =>
// // log.info("update={}", update)
// // complete(StatusCodes.OK)
// onComplete(updatesProcessor.ask[DialogManager.CommandResult](ref => DialogManager.ProcessUpdate(update, ref))) {
// case Success(processResult) =>
// processResult match {
// case DialogManager.ProcessUpdateSuccess => complete(HttpResponse(status = StatusCodes.OK))
// case DialogManager.ProcessUpdateFailure(exception) =>
// log.error(exception, "action=process_update result=failure message={}", update)
// complete(HttpResponse(status = StatusCodes.InternalServerError))
// }
// case Failure(exception) =>
// log.error(exception, "action=process_update result=failure message={}", update)
// complete(HttpResponse(status = StatusCodes.InternalServerError))
// }
// }
// }
// }
// }
// }
// private def createHttpsConnectionContext: HttpsConnectionContext = {
// val password: Array[Char] = "".toCharArray // do not store passwords in code, read them from somewhere safe!
// val ks: KeyStore = KeyStore.getInstance("PKCS12")
// val keystore: InputStream = getClass.getResourceAsStream("/telegram-bot.p12")
// require(keystore != null, "Keystore required!")
// ks.load(keystore, password)
// val keyManagerFactory: KeyManagerFactory = KeyManagerFactory.getInstance("SunX509")
// keyManagerFactory.init(ks, password)
// val tmf: TrustManagerFactory = TrustManagerFactory.getInstance("SunX509")
// tmf.init(ks)
// val sslContext: SSLContext = SSLContext.getInstance("TLS")
// sslContext.init(keyManagerFactory.getKeyManagers, tmf.getTrustManagers, new SecureRandom)
// ConnectionContext.https(sslContext)
// }
}

View File

@@ -0,0 +1,17 @@
package tech.xeppaka.bot.cats
import cats.data.Validated
object BotCommand {
def validateCommand(command: String): Validated[Errors.Error, BotCommand] = {
command match {
case "/add" => Validated.Valid(AddDelivery)
case "/remove" => Validated.Valid(RemoveDelivery)
case _ => Validated.Invalid(Errors.InvalidCommand(command))
}
}
}
enum BotCommand {
case AddDelivery, RemoveDelivery
}

View File

@@ -0,0 +1,6 @@
package tech.xeppaka.bot.cats
trait Delivery {
def idDelivery: IdDelivery
def states: Seq[String]
}

View File

@@ -0,0 +1,41 @@
package tech.xeppaka.bot.cats
import cats.effect.IO
object Dialogs {
enum Command {
case AddDelivery, RemoveDelivery
}
sealed trait Dialog {
def processCommand(command: Command): Either[Error, Dialog]
def processText(text: String): Either[Error, Dialog]
}
case class InitialDialog() extends Dialog {
def processCommand(command: Command): Either[Error, Dialog] = {
Right(InitialDialog())
}
def processText(text: String): Either[Error, Dialog] = {
Right(InitialDialog())
}
}
case class AddDeliveryDialog() extends Dialog {
def processCommand(command: Command): Either[Error, Dialog] = {
Right(InitialDialog())
}
def processText(text: String): Either[Error, Dialog] = {
Right(InitialDialog())
}
}
case class RemoveDeliveryDialog() extends Dialog {
def processCommand(command: Command): Either[Error, Dialog] = {
Right(InitialDialog())
}
def processText(text: String): Either[Error, Dialog] = {
Right(InitialDialog())
}
}
}

View File

@@ -0,0 +1,6 @@
package tech.xeppaka.bot.cats
object Errors {
sealed trait Error
case class InvalidCommand(command: String) extends Error
}

View File

@@ -0,0 +1,5 @@
package tech.xeppaka.bot.cats
final case class IdDelivery(id: String) extends AnyVal {
override def toString(): String = id
}

View File

@@ -0,0 +1,14 @@
package tech.xeppaka.bot.cats
import cats.effect.IOApp
import cats.effect.IO
object TelegramBot extends IOApp.Simple {
val botId = System.getProperty("botId", "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4")
def run: IO[Unit] = {
val command = "/add1"
BotCommand.validateCommand(command)
IO.unit
}
}

View File

@@ -0,0 +1,3 @@
package tech.xeppaka.bot.cats
final case class User()