diff --git a/build.sbt b/build.sbt index 7546973..737de62 100644 --- a/build.sbt +++ b/build.sbt @@ -18,6 +18,7 @@ lazy val `telegram-bot-delivery` = (project in file(".")) akkaHttp, akkaStream, akkaPersistence, + akkaPersistenceCassandra, levelDbJni, circleCore, circleGeneric, @@ -31,6 +32,7 @@ lazy val `telegram-bot-delivery` = (project in file(".")) Docker / daemonUserUid := Some("1001"), Docker / daemonUser := "telegram-bot", Docker / defaultLinuxInstallLocation := "/opt/telegram-bot-delivery", + version := "1.0.1" ) .enablePlugins(JavaServerAppPackaging) .enablePlugins(DockerPlugin) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 8994f2f..b0aff66 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,6 +6,7 @@ object Dependencies { object Versions { val akkaVersion = "2.5.26" val akkaHttpVersion = "10.1.10" + val akkaPersistenceCassandraVersion = "0.100" val levelDbJniVersion = "1.8" val circeVersion = "0.12.3" val akkaHttpCirceVersion = "1.29.1" @@ -18,6 +19,7 @@ object Dependencies { val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion + val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.100" val levelDbJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % levelDbJniVersion val circleCore = "io.circe" %% "circe-core" % circeVersion val circleGeneric = "io.circe" %% "circe-generic" % circeVersion diff --git a/project/build.properties b/project/build.properties index c0bab04..6adcdc7 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.2.8 +sbt.version=1.3.3 diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 2fd03d3..f8fb5ad 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -5,14 +5,11 @@ akka { persistence { journal { - plugin = "akka.persistence.journal.leveldb" - auto-start-journals = ["akka.persistence.journal.leveldb"] - leveldb.dir = "journal-check-delivery" - } - - snapshot-store { - plugin = "akka.persistence.snapshot-store.local" - auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"] + plugin = "cassandra-journal" } } } + +cassandra-journal { + contact-points = ["cassandra"] +} diff --git a/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala b/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala index ceb630a..f589be7 100644 --- a/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala +++ b/src/main/scala/eu/xeppaka/bot/CheckDeliveryDialog.scala @@ -128,7 +128,7 @@ object CheckDeliveryDialog { } def listParcels: Behavior[Command] = Behaviors.setup { ctx => - case class ListParcelsSuccess(parcelsList: Set[String]) extends Command + case class ListParcelsSuccess(parcelsList: Seq[String]) extends Command case class ListParcelsFailure(exception: Throwable) extends Command implicit val timeout: Timeout = 5.seconds @@ -139,7 +139,7 @@ object CheckDeliveryDialog { Behaviors.receiveMessage { case ListParcelsSuccess(parcelsList) => - val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.toSeq.sorted.mkString("\n") else "(empty)") + val messageText = "*List of your watched parcels:*\n" + (if (parcelsList.nonEmpty) parcelsList.sorted.mkString("\n") else "(empty)") val message = SendMessage(chatId, messageText, Some("Markdown"), reply_markup = commandsKeyboard) sendMessage(message, waitCommand, waitCommand) case ListParcelsFailure(exception) => @@ -154,19 +154,19 @@ object CheckDeliveryDialog { def removeParcel(onSuccess: => Behavior[Command], onFailure: => Behavior[Command]): Behavior[Command] = Behaviors.setup { ctx => - case class ListParcelsSuccess(parcelsList: Set[String]) extends Command - case class ListParcelsFailure(exception: Throwable) extends Command + case class ListParcelIdsSuccess(parcelsList: Seq[String]) extends Command + case class ListParcelIdsFailure(exception: Throwable) extends Command implicit val timeout: Timeout = 5.seconds - ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelsResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.ListParcels(ref)) { - case Success(CzechPostDeliveryCheck.ListParcelsResult(parcelsList)) => ListParcelsSuccess(parcelsList) - case Failure(exception) => ListParcelsFailure(exception) + ctx.ask[CzechPostDeliveryCheck.Command, CzechPostDeliveryCheck.ListParcelIdsResult](czechPostDeliveryCheck)(ref => CzechPostDeliveryCheck.ListParcelIds(ref)) { + case Success(CzechPostDeliveryCheck.ListParcelIdsResult(parcelsList)) => ListParcelIdsSuccess(parcelsList) + case Failure(exception) => ListParcelIdsFailure(exception) } Behaviors.receiveMessage { - case ListParcelsSuccess(parcelsList) => + case ListParcelIdsSuccess(parcelsList) => if (parcelsList.nonEmpty) { - val keyboardButtons = parcelsList.toSeq.sorted.grouped(3).map(_.map(id => KeyboardButton(id))).toSeq + val keyboardButtons = parcelsList.sorted.grouped(3).map(_.map(id => KeyboardButton(id))).toSeq val markup = ReplyKeyboardMarkup(keyboard = keyboardButtons, resize_keyboard = Some(true), one_time_keyboard = Some(true)) val message = SendMessage(chatId, "Please enter a parcel id to remove.", reply_markup = Some(markup)) sendMessage(message, waitTextMessage(parcelId => removeParcelId(parcelId)), onFailure) @@ -174,7 +174,7 @@ object CheckDeliveryDialog { val message = SendMessage(chatId, "You don't have watched parcels. There is nothing to remove.", reply_markup = commandsKeyboard) sendMessage(message, onSuccess, onFailure) } - case ListParcelsFailure(exception) => + case ListParcelIdsFailure(exception) => ctx.log.error(exception, "action=list_parcels result=failure chat_id={}", chatId) val message = SendMessage(chatId, "Failed to get a list of your watched parcels. Please try again later.", reply_markup = commandsKeyboard) sendMessage(message, waitCommand, waitCommand) diff --git a/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala b/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala index 2250d52..cbf81e8 100644 --- a/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala +++ b/src/main/scala/eu/xeppaka/bot/CzechPostDeliveryCheck.scala @@ -60,8 +60,8 @@ object CzechPostDeliveryCheck { sealed trait Command sealed trait CommandResult sealed trait Event - case class ParcelState(comment: String, attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) { - def prettyPrint(parcelId: String): String = { + case class Parcel(comment: String, attributes: Option[Entities.Attributes] = None, states: Set[Entities.State] = Set.empty) { + def fullStatePrint(parcelId: String): String = { val statesString = states .toSeq .sortBy(state => czechPostDateFormat.parse(state.date)) @@ -72,13 +72,27 @@ object CzechPostDeliveryCheck { |=========================== |$statesString""".stripMargin } + + def latestStatePrint(parcelId: String): String = { + val state = latestState + s"$parcelId ($comment) - ${printDateFormat.format(czechPostDateFormat.parse(state.date))} - ${state.text}" + } + + private def latestState: Entities.State = states.toSeq.maxBy(state => czechPostDateFormat.parse(state.date)) + } + + case class State(parcelStates: Map[String, Parcel] = Map.empty) { + def latestStatesPrint: Seq[String] = parcelStates + .map { case (id, parcel) => parcel.latestStatePrint(id) } + .to(Seq) } - case class State(parcelStates: Map[String, ParcelState] = Map.empty) case class AddParcel(parcelId: String, comment: String, replyTo: ActorRef[CommandResult]) extends Command case class RemoveParcel(parcelId: String, replyTo: ActorRef[CommandResult]) extends Command case class ListParcels(replyTo: ActorRef[ListParcelsResult]) extends Command - case class ListParcelsResult(parcelsList: Set[String]) + case class ListParcelsResult(parcelsList: Seq[String]) + case class ListParcelIds(replyTo: ActorRef[ListParcelIdsResult]) extends Command + case class ListParcelIdsResult(parcelIds: Seq[String]) case object CommandResultSuccess extends CommandResult case class CommandResultFailure(exception: Throwable) extends CommandResult @@ -166,10 +180,16 @@ object CzechPostDeliveryCheck { case ListParcels(replyTo) => Effect.none .thenRun { state => - val parcelsList = state.parcelStates.keySet + val parcelsList = state.latestStatesPrint replyTo ! ListParcelsResult(parcelsList) } + case ListParcelIds(replyTo) => + Effect.none + .thenRun { state => + replyTo ! ListParcelIdsResult(state.parcelStates.keys.toSeq) + } + case CheckParcels => Effect .none @@ -223,7 +243,7 @@ object CzechPostDeliveryCheck { .persist(attributesChangedEvents ++ stateEvents) .thenRun(_ => { if (newStates.nonEmpty) { - stateReporter ! DeliveryStateChanged(ParcelState(comment, None, newStates).prettyPrint(parcelId)) + stateReporter ! DeliveryStateChanged(Parcel(comment, None, newStates).fullStatePrint(parcelId)) } }) } @@ -232,7 +252,7 @@ object CzechPostDeliveryCheck { val eventHandler: EventHandler[State, Event] = (state, evt) => { evt match { case ParcelAdded(parcelId, comment) => - state.copy(parcelStates = state.parcelStates + (parcelId -> ParcelState(comment))) + state.copy(parcelStates = state.parcelStates + (parcelId -> Parcel(comment))) case ParcelRemoved(parcelId) => state.copy(parcelStates = state.parcelStates - parcelId) case ParcelHistoryStateAdded(parcelId, newState) => val parcelState = state.parcelStates(parcelId)