Added persistence for active chat ids which recovers actors during restart

This commit is contained in:
Pavel Kachalouski
2018-12-16 12:27:42 +01:00
parent 14ff0a20c6
commit 526af6f0e1
7 changed files with 144 additions and 139 deletions

View File

@@ -15,39 +15,39 @@
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="sbt: com.chuusai:shapeless_2.12:2.3.3:jar" level="project" />
<orderEntry type="library" name="sbt: org.scala-lang:scala-reflect:2.12.8:jar" level="project" />
<orderEntry type="library" name="sbt: org.scala-lang:scala-library:2.12.8:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe:ssl-config-core_2.12:0.3.6:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:macro-compat_2.12:1.1.1:jar" level="project" />
<orderEntry type="library" scope="TEST" name="sbt: org.scalatest:scalatest_2.12:3.0.5:jar" level="project" />
<orderEntry type="library" scope="TEST" name="sbt: org.scalactic:scalactic_2.12:3.0.5:jar" level="project" />
<orderEntry type="library" scope="TEST" name="sbt: org.scala-lang.modules:scala-xml_2.12:1.0.6:jar" level="project" />
<orderEntry type="library" name="sbt: org.scala-lang.modules:scala-java8-compat_2.12:0.8.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.reactivestreams:reactive-streams:1.0.2:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe:config:1.3.3:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-http-core_2.12:10.1.5:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-http_2.12:10.1.5:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-parsing_2.12:10.1.5:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-core_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-generic_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-jawn_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-numbers_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-parser_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.scala-lang.modules:scala-parser-combinators_2.12:1.1.1:jar" level="project" />
<orderEntry type="library" name="sbt: org.spire-math:jawn-parser_2.12:0.13.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:cats-core_2.12:1.4.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:cats-kernel_2.12:1.4.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:cats-macros_2.12:1.4.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:machinist_2.12:0.6.5:jar" level="project" />
<orderEntry type="library" name="sbt: de.heikoseeberger:akka-http-circe_2.12:1.22.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.fusesource.leveldbjni:leveldbjni-all:1.8:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-actor-testkit-typed_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-actor-typed_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-actor_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-persistence-typed_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-persistence_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-protobuf_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-stream_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-testkit_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-stream_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-protobuf_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-persistence_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-persistence-typed_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-actor_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-actor-typed_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-actor-testkit-typed_2.12:2.5.19:jar" level="project" />
<orderEntry type="library" name="sbt: org.fusesource.leveldbjni:leveldbjni-all:1.8:jar" level="project" />
<orderEntry type="library" name="sbt: de.heikoseeberger:akka-http-circe_2.12:1.22.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:machinist_2.12:0.6.5:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:cats-macros_2.12:1.4.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:cats-kernel_2.12:1.4.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:cats-core_2.12:1.4.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.spire-math:jawn-parser_2.12:0.13.0:jar" level="project" />
<orderEntry type="library" name="sbt: org.scala-lang.modules:scala-parser-combinators_2.12:1.1.1:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-parser_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-numbers_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-jawn_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-generic_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: io.circe:circe-core_2.12:0.10.0:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-parsing_2.12:10.1.5:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-http_2.12:10.1.5:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe.akka:akka-http-core_2.12:10.1.5:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe:config:1.3.3:jar" level="project" />
<orderEntry type="library" name="sbt: org.reactivestreams:reactive-streams:1.0.2:jar" level="project" />
<orderEntry type="library" name="sbt: org.scala-lang.modules:scala-java8-compat_2.12:0.8.0:jar" level="project" />
<orderEntry type="library" scope="TEST" name="sbt: org.scala-lang.modules:scala-xml_2.12:1.0.6:jar" level="project" />
<orderEntry type="library" scope="TEST" name="sbt: org.scalactic:scalactic_2.12:3.0.5:jar" level="project" />
<orderEntry type="library" scope="TEST" name="sbt: org.scalatest:scalatest_2.12:3.0.5:jar" level="project" />
<orderEntry type="library" name="sbt: org.typelevel:macro-compat_2.12:1.1.1:jar" level="project" />
<orderEntry type="library" name="sbt: com.typesafe:ssl-config-core_2.12:0.3.6:jar" level="project" />
<orderEntry type="library" name="sbt: org.scala-lang:scala-library:2.12.8:jar" level="project" />
<orderEntry type="library" name="sbt: org.scala-lang:scala-reflect:2.12.8:jar" level="project" />
</component>
</module>

View File

@@ -25,10 +25,3 @@ lazy val `telegram-bot` = (project in file("telegram-bot"))
circeAkkaHttp
)
)
assemblyMergeStrategy in assembly := {
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}

View File

@@ -153,7 +153,7 @@ object CzechPostDeliveryCheck {
}
case RemoveParcel(parcelId, replyTo) =>
val parcelIdUpper = parcelId.toUpperCase
if (state.parcelStates.keySet.contains(parcelIdUpper)) {
if (state.parcelStates.contains(parcelIdUpper)) {
Effect
.persist(ParcelRemoved(parcelIdUpper))
.thenRun(_ => replyTo ! CommandResultSuccess)
@@ -164,16 +164,21 @@ object CzechPostDeliveryCheck {
}
case ListParcels(replyTo) =>
val parcelsList = state.parcelStates.keySet
Effect.none
.thenRun(_ => replyTo ! ListParcelsResult(parcelsList))
.thenRun { state =>
val parcelsList = state.parcelStates.keySet
replyTo ! ListParcelsResult(parcelsList)
}
case CheckParcels =>
Effect
.none
.thenRun { _ =>
ctx.log.info("action=check_parcel_state chat_id={}", chatId)
val parcelIds = state.parcelStates.keys.grouped(10).map(ids => ids.foldLeft("")((acc, id) => if (acc.isEmpty) id else s"$acc;$id"))
for (ids <- parcelIds) {
val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=en")
val checkUri = Uri(s"https://b2c.cpost.cz/services/ParcelHistory/getDataAsJson?idParcel=$ids&language=cz")
val request = HttpRequest(uri = checkUri, headers = immutable.Seq(Accept(MediaTypes.`application/json`)))
ctx.log.info("action=check_parcel_state chat_id={} check_uri={}", chatId, checkUri)
@@ -196,8 +201,7 @@ object CzechPostDeliveryCheck {
case Failure(exception) => ctx.log.error(exception, "action=check_parcel_state result=failure chat_id={} check_uri={}", chatId, checkUri)
}
}
Effect.none
}
case ParcelHistoryRetrieved(parcelHistory) =>
val parcelId = parcelHistory.id
val parcelState = state.parcelStates(parcelId)

View File

@@ -1,11 +1,13 @@
package eu.xeppaka.bot
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler}
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
import akka.util.Timeout
import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess}
import eu.xeppaka.bot.TelegramEntities.{Message, Update}
import eu.xeppaka.bot.TelegramEntities.Update
import scala.concurrent.duration._
import scala.util.{Failure, Success}
@@ -19,60 +21,80 @@ object DialogManager {
case class ProcessUpdateFailure(exception: Throwable) extends CommandResult
// internal messages
private case class ReceptionistListingWrapper(chatId: Long, key: ServiceKey[CheckDeliveryDialog.Command], listing: Receptionist.Listing, msg: Message, replyTo: ActorRef[CommandResult]) extends Command
private case class DialogResponseSuccess(dialogId: String, replyTo: ActorRef[CommandResult]) extends Command
private case class DialogResponseFailure(dialogId: String, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command
private case class DialogResponseSuccess(dialogId: Long, replyTo: ActorRef[CommandResult]) extends Command
private case class DialogResponseFailure(dialogId: Long, exception: Throwable, replyTo: ActorRef[CommandResult]) extends Command
sealed trait Event
private case class DialogAdded(chatId: Long) extends Event
case class State(dialogs: Map[Long, ActorRef[CheckDeliveryDialog.Command]] = Map.empty)
def behavior(botUri: BotUri): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.receiveMessage {
val commandHandler: CommandHandler[Command, Event, State] = (state, cmd) => {
cmd match {
case ProcessUpdate(update, replyTo) =>
ctx.log.debug("action=update_received update={}", update)
if (update.message.isDefined) {
val chatId = update.message.get.chat.id
val dialogKey = ServiceKey[CheckDeliveryDialog.Command](chatId.toString)
val receptionistAdapter: ActorRef[Receptionist.Listing] = ctx.messageAdapter(listing => ReceptionistListingWrapper(chatId, dialogKey, listing, update.message.get, replyTo))
ctx.log.debug(s"action=find_dialog id=${chatId.toString}")
ctx.system.receptionist ! Receptionist.Find(dialogKey, receptionistAdapter)
}
Behaviors.same
case ReceptionistListingWrapper(chatId, dialogKey, listing, msg, replyTo) =>
listing match {
case dialogKey.Listing(dialogs) =>
if (dialogs.isEmpty) {
ctx.log.debug("action=find_dialog id={} result=not_found", dialogKey.id)
val effect: Effect[Event, State] = if (state.dialogs.contains(chatId)) {
Effect.none
} else {
ctx.log.debug("action=find_dialog id={} result=found", dialogKey.id)
Effect.persist(DialogAdded(chatId))
}
val dialog = dialogs.headOption.getOrElse({
val dialogActor = ctx.spawn(CheckDeliveryDialog.behavior(chatId, botUri), s"delivery-check-${dialogKey.id}")
ctx.system.receptionist ! Receptionist.Register(dialogKey, dialogActor)
dialogActor
})
effect
.thenRun(_ => ctx.log.debug("action=process_update chat_id={} message={}", chatId, update.message.get))
.thenRun { state =>
val msg = update.message.get
val dialogActor = state.dialogs(chatId)
ctx.log.info("action=ask_dialog id={}", dialogKey.id)
ctx.log.info("action=ask_dialog id={}", chatId)
implicit val timeout: Timeout = 20.seconds
ctx.ask(dialog)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) {
case Success(ProcessMessageSuccess) => DialogResponseSuccess(dialogKey.id, replyTo)
case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(dialogKey.id, exception, replyTo)
case Failure(exception) => DialogResponseFailure(dialogKey.id, exception, replyTo)
ctx.ask(dialogActor)((CheckDeliveryDialog.ProcessMessage.apply _).curried(msg)) {
case Success(ProcessMessageSuccess) => DialogResponseSuccess(chatId, replyTo)
case Success(ProcessMessageFailure(exception)) => DialogResponseFailure(chatId, exception, replyTo)
case Failure(exception) => DialogResponseFailure(chatId, exception, replyTo)
}
}
} else {
Effect
.none
.thenRun { _ =>
ctx.log.debug("action=process_update result=success message=update message is empty")
}
}
Behaviors.same
case DialogResponseSuccess(dialogId, replyTo) =>
Effect
.none
.thenRun { _ =>
ctx.log.info("action=ask_dialog id={} result=success", dialogId)
replyTo ! ProcessUpdateSuccess
Behaviors.same
}
case DialogResponseFailure(dialogId, exception, replyTo) =>
Effect
.none
.thenRun { _ =>
ctx.log.error(exception, "action=ask_dialog id={} result=failure", dialogId)
replyTo ! ProcessUpdateFailure(exception)
Behaviors.same
}
}
}
val eventHandler: EventHandler[State, Event] = (state, evt) => {
evt match {
case DialogAdded(chatId) =>
val dialogActor = ctx.spawn(Behaviors.supervise(CheckDeliveryDialog.behavior(chatId, botUri)).onFailure(SupervisorStrategy.restart), s"delivery-check-$chatId")
state.copy(dialogs = state.dialogs.updated(chatId, dialogActor))
}
}
EventSourcedBehavior(
persistenceId = PersistenceId("dialog-manager"),
emptyState = State(),
commandHandler = commandHandler,
eventHandler = eventHandler
)
}
}

View File

@@ -2,8 +2,9 @@ package eu.xeppaka.bot
import akka.actor.Scheduler
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorSystem, DispatcherSelector}
import akka.actor.typed.{ActorSystem, DispatcherSelector, SupervisorStrategy}
import akka.http.scaladsl.Http
import akka.util.Timeout
import akka.{Done, actor}
@@ -14,9 +15,10 @@ import scala.io.StdIn
object Main {
def main(args: Array[String]): Unit = {
val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" // useless bot
//val botId = "693134480:AAE8JRXA6j1mkOKTaxapP6A-E4LPHRuiIf8" // delivery bot
val telegramBot = ActorSystem(TelegramBot.behavior(botId, "0.0.0.0", 8443), "telegram-bot")
//val botId = "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4" // useless bot
val botId = "693134480:AAE8JRXA6j1mkOKTaxapP6A-E4LPHRuiIf8" // delivery bot
val botBehavior = Behaviors.supervise(TelegramBot.behavior(botId, "0.0.0.0", 88)).onFailure(SupervisorStrategy.restart)
val telegramBot = ActorSystem(botBehavior, "telegram-bot")
implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped
implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default())
implicit val scheduler: Scheduler = telegramBot.scheduler

View File

@@ -8,10 +8,10 @@ import akka.Done
import akka.actor.{ActorSystem, Scheduler}
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector, SupervisorStrategy}
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives.{as, entity, onComplete, path, post, complete, extractLog}
import akka.http.scaladsl.server.Directives.{as, complete, entity, extractLog, onComplete, path, post}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.{ConnectionContext, Http, HttpExt, HttpsConnectionContext}
import akka.stream.ActorMaterializer
@@ -44,10 +44,10 @@ object TelegramBot {
val botUri = BotUri(botId)
val http: HttpExt = Http()
val hookId = UUID.randomUUID().toString
val webhookUri = Uri(s"https://xeppaka.eu:8443/$hookId")
val webhookUri = Uri(s"https://xeppaka.eu:88/$hookId")
val httpsContext = createHttpsConnectionContext
val stashBuffer = StashBuffer[Command](10)
val dialogManager = ctx.spawnAnonymous(DialogManager.behavior(botUri))
val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart))
val routes = botRoutes(hookId, dialogManager)(untypedSystem.scheduler)
def bindingServer: Behavior[Command] = Behaviors.setup[Command] { ctx =>

View File

@@ -1,16 +0,0 @@
package eu.xeppaka.bot
import eu.xeppaka.bot.TelegramEntities._
import io.circe.Printer
import io.circe.generic.auto._
import io.circe.syntax._
import org.scalatest.FlatSpec
import TelegramEntitiesDerivations._
class JsonSpec extends FlatSpec {
"blah" should "blah" in {
val keyboard = ReplyKeyboardRemove()
val message = SendMessage(100000, "Please enter command.", reply_markup = Some(keyboard))
println(message.asJson.pretty(Printer.spaces2.copy(dropNullValues = true)))
}
}