Dockerized telegram bot

This commit is contained in:
Pavel Kachalouski
2019-05-18 01:37:37 +02:00
parent 9d30525c8a
commit afe113c2db
17 changed files with 97 additions and 75 deletions

View File

@@ -3,14 +3,15 @@ import Dependencies._
lazy val commonSettings = Seq( lazy val commonSettings = Seq(
organization := "com.example", organization := "com.example",
scalaVersion := "2.12.8", scalaVersion := "2.12.8",
version := "0.1.0-SNAPSHOT", version := "1.0.0",
mainClass := Some("eu.xeppaka.bot.Main") mainClass := Some("eu.xeppaka.bot.Main")
) )
inThisBuild(commonSettings) inThisBuild(commonSettings)
lazy val `telegram-bot` = (project in file("telegram-bot")) lazy val `telegram-bot-delivery` = (project in file("."))
.settings( .settings(
name := "telegram-bot-delivery",
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
scalaTest % Test, scalaTest % Test,
akka, akka,
@@ -23,6 +24,10 @@ lazy val `telegram-bot` = (project in file("telegram-bot"))
circleGeneric, circleGeneric,
circleParser, circleParser,
circeAkkaHttp circeAkkaHttp
) ),
Docker / defaultLinuxInstallLocation := "/opt/telegram-bot-delivery",
Docker / dockerExposedPorts := Seq(8443),
Docker / dockerRepository := Some("registry.xeppaka.eu:443")
) )
.enablePlugins(JavaServerAppPackaging)
.enablePlugins(DockerPlugin) .enablePlugins(DockerPlugin)

View File

@@ -1,16 +1,26 @@
import sbt._ import sbt._
import Dependencies.Versions._
object Dependencies { object Dependencies {
lazy val akka = "com.typesafe.akka" %% "akka-actor" % "2.5.19" object Versions {
lazy val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % "2.5.19" val akkaVersion = "2.5.22"
lazy val akkaStream = "com.typesafe.akka" %% "akka-stream" % "2.5.19" val akkaHttpVersion = "10.1.8"
lazy val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.5" val levelDbJniVersion = "1.8"
lazy val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % "2.5.19" val circeVersion = "0.11.1"
lazy val levelDbJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8" val akkaHttpCirceVersion = "1.23.0"
//lazy val vkapi = "com.vk.api" % "sdk" % "0.5.12" val scalaTestVersion = "3.0.5"
lazy val circleCore = "io.circe" %% "circe-core" % "0.10.0" }
lazy val circleGeneric = "io.circe" %% "circe-generic" % "0.10.0"
lazy val circleParser = "io.circe" %% "circe-parser" % "0.10.0" lazy val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion
lazy val circeAkkaHttp = "de.heikoseeberger" %% "akka-http-circe" % "1.22.0" lazy val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion
lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.0.5" lazy val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
lazy val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
lazy val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion
lazy val levelDbJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % levelDbJniVersion
lazy val circleCore = "io.circe" %% "circe-core" % circeVersion
lazy val circleGeneric = "io.circe" %% "circe-generic" % circeVersion
lazy val circleParser = "io.circe" %% "circe-parser" % circeVersion
lazy val circeAkkaHttp = "de.heikoseeberger" %% "akka-http-circe" % akkaHttpCirceVersion
lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
} }

View File

@@ -1 +1 @@
sbt.version=1.2.7 sbt.version=1.2.8

View File

@@ -1,5 +1,5 @@
akka { akka {
loglevel = "DEBUG" loglevel = "INFO"
extensions = [akka.persistence.Persistence] extensions = [akka.persistence.Persistence]

View File

@@ -5,7 +5,7 @@
</encoder> </encoder>
</appender> </appender>
<root level="DEBUG"> <root level="INFO">
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
</root> </root>
</configuration> </configuration>

View File

@@ -4,7 +4,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler} import akka.persistence.typed.scaladsl.EventSourcedBehavior.{CommandHandler, EventHandler}
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior} import akka.persistence.typed.scaladsl.{Effect, EffectBuilder, EventSourcedBehavior}
import akka.util.Timeout import akka.util.Timeout
import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess} import eu.xeppaka.bot.CheckDeliveryDialog.{ProcessMessageFailure, ProcessMessageSuccess}
import eu.xeppaka.bot.TelegramEntities.Update import eu.xeppaka.bot.TelegramEntities.Update
@@ -36,7 +36,7 @@ object DialogManager {
if (update.message.isDefined) { if (update.message.isDefined) {
val chatId = update.message.get.chat.id val chatId = update.message.get.chat.id
val effect: Effect[Event, State] = if (state.dialogs.contains(chatId)) { val effect: EffectBuilder[Event, State] = if (state.dialogs.contains(chatId)) {
Effect.none Effect.none
} else { } else {
Effect.persist(DialogAdded(chatId)) Effect.persist(DialogAdded(chatId))

View File

@@ -0,0 +1,44 @@
package eu.xeppaka.bot
import java.nio.file.Paths
import akka.actor.Scheduler
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorSystem, DispatcherSelector, SupervisorStrategy}
import akka.http.scaladsl.Http
import akka.util.Timeout
import akka.{Done, actor}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.io.StdIn
object Main {
def main(args: Array[String]): Unit = {
val botId = System.getProperty("botId", "570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4")
val localPort = 8443
val hookDomain = System.getProperty("hookDomain", "xeppaka.eu")
val hookPort = System.getProperty("hookPort", "8443").toInt
val useHttpsServer = System.getProperty("useHttpsServer", "true").toBoolean
val botBehavior = Behaviors.supervise(TelegramBot.behavior(botId, "0.0.0.0", localPort, hookDomain, hookPort, useHttpsServer)).onFailure(SupervisorStrategy.restart)
val telegramBot = ActorSystem(botBehavior, "telegram-bot-delivery")
// implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped
// implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default())
// implicit val scheduler: Scheduler = telegramBot.scheduler
// implicit val timeout: Timeout = 10.seconds
// println("Press enter to finish bot...")
// StdIn.readLine()
//
// val stopFuture: Future[Done] = telegramBot ? (ref => TelegramBot.Stop(ref))
//
// val terminateFuture = stopFuture
// .andThen { case _ => Http().shutdownAllConnectionPools() }
// .andThen { case _ => telegramBot.terminate() }
//
// Await.ready(terminateFuture, 20.seconds)
}
}

View File

@@ -1,6 +1,7 @@
package eu.xeppaka.bot package eu.xeppaka.bot
import java.io.InputStream import java.io.InputStream
import java.nio.file.Path
import java.security.{KeyStore, SecureRandom} import java.security.{KeyStore, SecureRandom}
import java.util.UUID import java.util.UUID
@@ -20,7 +21,7 @@ import eu.xeppaka.bot.TelegramEntities._
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.io.Source import scala.io.Source
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
@@ -34,7 +35,7 @@ object TelegramBot {
case object GetBotInfo case object GetBotInfo
case object GetWebhookInfo case object GetWebhookInfo
def behavior(botId: String, interface: String, localPort: Int, hookPort: Int): Behavior[Command] = Behaviors.setup[Command] { ctx => def behavior(botId: String, interface: String, localPort: Int, hookDomain: String, hookPort: Int, useHttpsServer: Boolean = true): Behavior[Command] = Behaviors.setup[Command] { ctx =>
ctx.log.info("action=start_bot") ctx.log.info("action=start_bot")
implicit val untypedSystem: ActorSystem = ctx.system.toUntyped implicit val untypedSystem: ActorSystem = ctx.system.toUntyped
@@ -44,8 +45,8 @@ object TelegramBot {
val botUri = BotUri(botId) val botUri = BotUri(botId)
val http: HttpExt = Http() val http: HttpExt = Http()
val hookId = UUID.randomUUID().toString val hookId = UUID.randomUUID().toString
val webhookUri = Uri(s"https://xeppaka.eu:$hookPort/$hookId") val webhookUri = Uri(s"https://$hookDomain:$hookPort/$hookId")
val httpsContext = createHttpsConnectionContext val httpsContext = if (useHttpsServer) Some(createHttpsConnectionContext) else None
val stashBuffer = StashBuffer[Command](10) val stashBuffer = StashBuffer[Command](10)
val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart)) val dialogManager = ctx.spawnAnonymous(Behaviors.supervise(DialogManager.behavior(botUri)).onFailure(SupervisorStrategy.restart))
val routes = botRoutes(hookId, dialogManager)(untypedSystem.scheduler) val routes = botRoutes(hookId, dialogManager)(untypedSystem.scheduler)
@@ -57,7 +58,7 @@ object TelegramBot {
ctx.log.info("action=bind_server interface={} port={}", interface, localPort) ctx.log.info("action=bind_server interface={} port={}", interface, localPort)
http http
.bindAndHandle(routes, interface, localPort, httpsContext) .bindAndHandle(routes, interface, localPort, httpsContext.getOrElse(http.defaultServerHttpContext))
.onComplete { .onComplete {
case Success(binding) => ctx.self ! BindingSuccess(binding) case Success(binding) => ctx.self ! BindingSuccess(binding)
case Failure(exception) => ctx.self ! BindingFailure(exception) case Failure(exception) => ctx.self ! BindingFailure(exception)
@@ -112,15 +113,21 @@ object TelegramBot {
implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default()) implicit val executionContextExecutor: ExecutionContextExecutor = ctx.system.dispatchers.lookup(DispatcherSelector.default())
val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString())) val urlEntity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(webhookUri.toString()))
val urlPart = Multipart.FormData.BodyPart.Strict("url", urlEntity) val urlPart = Some(Multipart.FormData.BodyPart.Strict("url", urlEntity))
val certificate = ByteString(Source.fromResource("telegram-bot.pem").mkString) val certificatePart = if (useHttpsServer) {
val certificateEntity = HttpEntity.Strict(ContentTypes.`application/octet-stream`, certificate) val certificate = ByteString(Source.fromResource("telegram-bot.pem").mkString)
val certificatePart = Multipart.FormData.BodyPart.Strict("certificate", certificateEntity, Map("filename" -> "telegram-bot.pem")) val certificateEntity = HttpEntity.Strict(ContentTypes.`application/octet-stream`, certificate)
val setWebhookFormData = Multipart.FormData.Strict(immutable.Seq(urlPart, certificatePart)) Some(Multipart.FormData.BodyPart.Strict("certificate", certificateEntity, Map("filename" -> "cert.pem")))
} else {
None
}
Marshal(setWebhookFormData) val formParts = immutable.Seq(urlPart, certificatePart).flatten
val formData = Multipart.FormData.Strict(formParts)
Marshal(formData)
.to[RequestEntity] .to[RequestEntity]
.flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity))) .flatMap(requestEntity => http.singleRequest(HttpRequest(uri = botUri.setWebhook, method = HttpMethods.POST, entity = requestEntity)))
.onComplete { .onComplete {

View File

@@ -1,44 +0,0 @@
package eu.xeppaka.bot
import akka.actor.Scheduler
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorSystem, DispatcherSelector, SupervisorStrategy}
import akka.http.scaladsl.Http
import akka.util.Timeout
import akka.{Done, actor}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.io.StdIn
object Main {
def main(args: Array[String]): Unit = {
val isProduction = System.getProperty("isProduction", "false").toBoolean
val (botId, localPort, hookPort) = if (isProduction) {
("693134480:AAE8JRXA6j1mkOKTaxapP6A-E4LPHRuiIf8", 88, 88) // delivery bot
} else {
("570855144:AAEv7b817cuq2JJI9f2kG5B9G3zW1x-btz4", 8443, 8443) // useless bot
}
val botBehavior = Behaviors.supervise(TelegramBot.behavior(botId, "0.0.0.0", localPort, hookPort)).onFailure(SupervisorStrategy.restart)
val telegramBot = ActorSystem(botBehavior, "telegram-bot")
implicit val actorSystem: actor.ActorSystem = telegramBot.toUntyped
implicit val executionContext: ExecutionContextExecutor = telegramBot.dispatchers.lookup(DispatcherSelector.default())
implicit val scheduler: Scheduler = telegramBot.scheduler
implicit val timeout: Timeout = 10.seconds
println("Press enter to finish bot...")
StdIn.readLine()
val stopFuture: Future[Done] = telegramBot ? (ref => TelegramBot.Stop(ref))
val terminateFuture = stopFuture
.andThen { case _ => Http().shutdownAllConnectionPools() }
.andThen { case _ => telegramBot.terminate() }
Await.ready(terminateFuture, 20.seconds)
}
}