From 37533b7c908e91c98c02065736fd44c0192dc3fa Mon Sep 17 00:00:00 2001 From: Pavel Kachalouski Date: Tue, 23 Apr 2019 13:38:15 +0200 Subject: [PATCH] Initial commit --- .gitignore | 78 ++++++++++ .idea/encodings.xml | 4 + .idea/hydra.xml | 9 ++ .idea/misc.xml | 6 + .idea/modules.xml | 9 ++ .idea/modules/scala-learn-1-build.iml | 139 ++++++++++++++++++ .idea/modules/scala-learn-1.iml | 68 +++++++++ .idea/sbt.xml | 18 +++ .idea/scala_compiler.xml | 6 + .idea/vcs.xml | 6 + application.conf | 2 + build.sbt | 28 ++++ project/build.properties | 1 + project/plugins.sbt | 3 + project/protoc.sbt | 3 + src/main/protobuf/Messages.proto | 16 ++ src/main/resources/application.conf | 5 + .../scala/eu/xeppaka/scalalearn/Actor1.scala | 47 ++++++ .../scala/eu/xeppaka/scalalearn/Actor2.scala | 26 ++++ .../eu/xeppaka/scalalearn/ActorTest1.scala | 59 ++++++++ .../eu/xeppaka/scalalearn/CacheUser.scala | 33 +++++ .../xeppaka/scalalearn/CardCacheActor.scala | 19 +++ .../eu/xeppaka/scalalearn/CardCacheApi.scala | 8 + .../scala/eu/xeppaka/scalalearn/Class23.scala | 27 ++++ .../scala/eu/xeppaka/scalalearn/Main.scala | 80 ++++++++++ .../scala/eu/xeppaka/scalalearn/Main1.scala | 41 ++++++ .../scala/eu/xeppaka/scalalearn/Main10.scala | 14 ++ .../scala/eu/xeppaka/scalalearn/Main11.scala | 44 ++++++ .../scala/eu/xeppaka/scalalearn/Main12.scala | 28 ++++ .../scala/eu/xeppaka/scalalearn/Main13.scala | 28 ++++ .../scala/eu/xeppaka/scalalearn/Main2.scala | 35 +++++ .../scala/eu/xeppaka/scalalearn/Main3.scala | 29 ++++ .../scala/eu/xeppaka/scalalearn/Main4.scala | 24 +++ .../scala/eu/xeppaka/scalalearn/Main5.scala | 103 +++++++++++++ .../scala/eu/xeppaka/scalalearn/Main6.scala | 24 +++ .../scala/eu/xeppaka/scalalearn/Main7.scala | 53 +++++++ .../scala/eu/xeppaka/scalalearn/Main8.scala | 40 +++++ .../scala/eu/xeppaka/scalalearn/Main9.scala | 39 +++++ .../eu/xeppaka/scalalearn/TestMirror.scala | 6 + .../eu/xeppaka/scalalearn/TestStub.scala | 26 ++++ 40 files changed, 1234 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/encodings.xml create mode 100644 .idea/hydra.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/modules/scala-learn-1-build.iml create mode 100644 .idea/modules/scala-learn-1.iml create mode 100644 .idea/sbt.xml create mode 100644 .idea/scala_compiler.xml create mode 100644 .idea/vcs.xml create mode 100644 application.conf create mode 100644 build.sbt create mode 100644 project/build.properties create mode 100644 project/plugins.sbt create mode 100644 project/protoc.sbt create mode 100644 src/main/protobuf/Messages.proto create mode 100644 src/main/resources/application.conf create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Actor1.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Actor2.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/ActorTest1.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/CacheUser.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/CardCacheActor.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/CardCacheApi.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Class23.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main1.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main10.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main11.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main12.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main13.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main2.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main3.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main4.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main5.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main6.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main7.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main8.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/Main9.scala create mode 100644 src/main/scala/eu/xeppaka/scalalearn/TestMirror.scala create mode 100644 src/test/scala/eu/xeppaka/scalalearn/TestStub.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5c33e64 --- /dev/null +++ b/.gitignore @@ -0,0 +1,78 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/modules.xml +# .idea/*.iml +# .idea/modules + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests +### Scala template +*.class +*.log +### SBT template +# Simple Build Tool +# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control + +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ +.history +.cache +.lib/ diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..15a15b2 --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/hydra.xml b/.idea/hydra.xml new file mode 100644 index 0000000..66eeb9a --- /dev/null +++ b/.idea/hydra.xml @@ -0,0 +1,9 @@ + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..44ae3bd --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..8840c5d --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules/scala-learn-1-build.iml b/.idea/modules/scala-learn-1-build.iml new file mode 100644 index 0000000..412ccb8 --- /dev/null +++ b/.idea/modules/scala-learn-1-build.iml @@ -0,0 +1,139 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules/scala-learn-1.iml b/.idea/modules/scala-learn-1.iml new file mode 100644 index 0000000..c6ee340 --- /dev/null +++ b/.idea/modules/scala-learn-1.iml @@ -0,0 +1,68 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/sbt.xml b/.idea/sbt.xml new file mode 100644 index 0000000..f169f36 --- /dev/null +++ b/.idea/sbt.xml @@ -0,0 +1,18 @@ + + + + + + \ No newline at end of file diff --git a/.idea/scala_compiler.xml b/.idea/scala_compiler.xml new file mode 100644 index 0000000..1627e8a --- /dev/null +++ b/.idea/scala_compiler.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/application.conf b/application.conf new file mode 100644 index 0000000..fa1bdab --- /dev/null +++ b/application.conf @@ -0,0 +1,2 @@ +field1.field2 = 11 +field1.field3 = 15 \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..5bdae1f --- /dev/null +++ b/build.sbt @@ -0,0 +1,28 @@ +import scala.sys.process.Process + +scalaVersion := "2.12.6" + +mainClass in Compile := Some("eu.xeppaka.scalalearn.Main") +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor" % "2.5.22", + "com.typesafe.akka" %% "akka-stream" % "2.5.22", + "com.typesafe" % "config" % "1.3.3", + "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0", + "io.kamon" %% "kamon-core" % "1.1.3", + "io.kamon" %% "kamon-scala-future" % "1.0.0", + "io.kamon" %% "kamon-akka-2.5" % "1.1.2", + "io.kamon" %% "kamon-akka-http-2.5" % "1.1.0", + "io.kamon" %% "kamon-akka-remote-2.5" % "1.1.0", + "io.kamon" %% "kamon-logback" % "1.0.3", + "org.aspectj" % "aspectjweaver" % "1.9.1", + "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf", + "org.scalatest" %% "scalatest" % "3.0.5" % Test, + "org.scalamock" %% "scalamock" % "4.1.0" % Test +) + +fork := true +runMain / javaOptions += "-J-Xmx512M" +PB.runProtoc in Compile := (args => Process("/run/current-system/sw/bin/protoc", args)!) +PB.targets in Compile := Seq( + scalapb.gen() -> (sourceManaged in Compile).value +) diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..5620cc5 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.2.1 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..639dd79 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,3 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.7") +addSbtPlugin("io.kamon" % "sbt-aspectj-runner" % "1.1.1") +addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4") diff --git a/project/protoc.sbt b/project/protoc.sbt new file mode 100644 index 0000000..a916452 --- /dev/null +++ b/project/protoc.sbt @@ -0,0 +1,3 @@ +addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") + +libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4" diff --git a/src/main/protobuf/Messages.proto b/src/main/protobuf/Messages.proto new file mode 100644 index 0000000..859475c --- /dev/null +++ b/src/main/protobuf/Messages.proto @@ -0,0 +1,16 @@ +syntax = "proto2"; +package eu.xeppaka.scalalearn; + +import "scalapb/scalapb.proto"; + +option (scalapb.options) = { + flat_package: true +}; + +enum Status { + OPEN = 0; + CLOSED = 1; + BLOCKED_IN = 2; + BLOCKED_OUT = 3; + BLOCKED_ALL = 4; +} diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 0000000..06bd007 --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,5 @@ +var1 = { + val1 = 1 + val2 = 2 +} +include "test.conf" diff --git a/src/main/scala/eu/xeppaka/scalalearn/Actor1.scala b/src/main/scala/eu/xeppaka/scalalearn/Actor1.scala new file mode 100644 index 0000000..005db29 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Actor1.scala @@ -0,0 +1,47 @@ +package eu.xeppaka.scalalearn + +import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import akka.stream.{ActorMaterializer, Materializer} +import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.util.Timeout +import eu.xeppaka.scalalearn.Actor1.{FinishFailure, FinishSuccess, Print, Start} +import eu.xeppaka.scalalearn.Actor2._ + +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +object Actor1 { + case object Start + case object FinishSuccess + case object FinishFailure + case object Print + + def props(actor2: ActorRef): Props = Props(new Actor1(actor2)) +} + +class Actor1(actor2: ActorRef) extends Actor with ActorLogging { + import context.dispatcher + + implicit val timeout: Timeout = 2.seconds + implicit val materializer: Materializer = ActorMaterializer() + + override def receive: Receive = { + case Start => + Source + .repeat(Request) + .take(100) + .ask[Response.type](actor2) + .toMat(Sink.foreach(response => log.info(response.toString)))(Keep.right) + .run() + .onComplete { + case Success(_) => self ! FinishSuccess + case Failure(_) => self ! FinishFailure + } + case FinishSuccess => + log.info("actor1 finish success") + case FinishFailure => + log.info("actor1 finish failure") + case Print => + log.info("print done") + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Actor2.scala b/src/main/scala/eu/xeppaka/scalalearn/Actor2.scala new file mode 100644 index 0000000..17a4902 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Actor2.scala @@ -0,0 +1,26 @@ +package eu.xeppaka.scalalearn + +import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import eu.xeppaka.scalalearn.Actor2.{Request, Response, SendResponse} + +import scala.concurrent.duration._ + +object Actor2 { + case object Request + case object Response + private case class SendResponse(destination: ActorRef) + + def props(): Props = Props(new Actor2()) +} + +class Actor2 extends Actor with ActorLogging { + import context.dispatcher + + override def receive: Receive = { + case Request => + log.info("actor2 request") + context.system.scheduler.scheduleOnce(10.millis, self, SendResponse(sender())) + case SendResponse(destination) => + destination ! Response + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/ActorTest1.scala b/src/main/scala/eu/xeppaka/scalalearn/ActorTest1.scala new file mode 100644 index 0000000..43c51d6 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/ActorTest1.scala @@ -0,0 +1,59 @@ +package eu.xeppaka.scalalearn + +import akka.actor.{Actor, OneForOneStrategy, PoisonPill, SupervisorStrategy, Timers} +import ActorTest1._ + +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration._ +import akka.actor.SupervisorStrategy._ + +object ActorTest1 { + case object Create + case object Delete + case object Print +} + +class ActorTest1 extends Actor { + + override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case _ => Restart + } + + case class State( + exist: Boolean = false + ) + + private var state = State() + + override def preStart(): Unit = { + super.preStart() + + implicit val executionContext: ExecutionContextExecutor = context.dispatcher + //context.system.scheduler.scheduleOnce(2.seconds, self, PoisonPill) + } + + override def postStop(): Unit = { + super.postStop() + println(s"actor ${self.path.name} stopped.") + } + + val exist: Receive = { + case Create => + case Delete => state = state.copy(exist = false) + case Print => println(state) + } + + val notExist: Receive = { + case Create => state = state.copy(exist = true) + case Delete => + case Print => + println(state) + //throw new Exception("blah") + //throw new Throwable("blah") + } + + override val receive: Receive = { + case message: Any if state.exist => exist(message) + case message: Any if !state.exist => notExist(message) + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/CacheUser.scala b/src/main/scala/eu/xeppaka/scalalearn/CacheUser.scala new file mode 100644 index 0000000..fb3a618 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/CacheUser.scala @@ -0,0 +1,33 @@ +package eu.xeppaka.scalalearn + +import java.util.UUID + +import akka.actor.{Actor, ActorRef, Props} +import eu.xeppaka.scalalearn.CacheUser.{CacheRandomCard, CacheRandomCard2, TestMsg, TestMsgSuccess} +import eu.xeppaka.scalalearn.CardCacheActor.{CacheCard, CacheSuccess} + +object CacheUser { + case object CacheRandomCard + case object CacheRandomCard2 + case object TestMsg + + case class TestMsgSuccess(msg: Option[String]) + case object TestMsgFailure + + def props(cardCacheApi: CardCacheApi, cache: ActorRef): Props = Props(classOf[CacheUser], cardCacheApi, cache) +} + +class CacheUser(cardCacheApi: CardCacheApi, cache: ActorRef) extends Actor { + override def receive: Receive = { + case CacheRandomCard => + println(s"name: ${self.path.name}") + cardCacheApi.cacheCard(UUID.randomUUID().toString) + case CacheRandomCard2 => + cache ! CacheCard(UUID.randomUUID().toString) + case TestMsg => + println(s"name: ${self.path.toStringWithoutAddress}") + sender() ! TestMsgSuccess(Some("data")) + case CacheSuccess => + println("cache success!") + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/CardCacheActor.scala b/src/main/scala/eu/xeppaka/scalalearn/CardCacheActor.scala new file mode 100644 index 0000000..b39680e --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/CardCacheActor.scala @@ -0,0 +1,19 @@ +package eu.xeppaka.scalalearn + +import akka.actor.Actor +import eu.xeppaka.scalalearn.CardCacheActor.{CacheCard, CacheSuccess} + +object CardCacheActor { + case class CacheCard(idCard: String) + case object CacheSuccess +} + +class CardCacheActor extends Actor { + override def receive: Receive = { + case CacheCard(idCard) => + println("cache card") + sender() ! CacheSuccess + case _ => + println("error!") + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/CardCacheApi.scala b/src/main/scala/eu/xeppaka/scalalearn/CardCacheApi.scala new file mode 100644 index 0000000..0ac62e6 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/CardCacheApi.scala @@ -0,0 +1,8 @@ +package eu.xeppaka.scalalearn + +import akka.actor.ActorRef +import eu.xeppaka.scalalearn.CardCacheActor.CacheCard + +class CardCacheApi(val cacheActor: ActorRef) { + def cacheCard(idCard: String): Unit = cacheActor ! CacheCard(idCard) +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Class23.scala b/src/main/scala/eu/xeppaka/scalalearn/Class23.scala new file mode 100644 index 0000000..ef304be --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Class23.scala @@ -0,0 +1,27 @@ +package eu.xeppaka.scalalearn + +case class Class23( + field1: String, + field2: String, + field3: String, + field4: String, + field5: String, + field6: String, + field7: String, + field8: String, + field9: String, + field10: String, + field11: String, + field12: String, + field13: String, + field14: String, + field15: String, + field16: String, + field17: String, + field18: String, + field19: String, + field20: String, + field21: String, + field22: String, + field23: String +) diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main.scala b/src/main/scala/eu/xeppaka/scalalearn/Main.scala new file mode 100644 index 0000000..7971deb --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main.scala @@ -0,0 +1,80 @@ +package eu.xeppaka.scalalearn + +import akka.actor.ActorSystem +import akka.stream.{ActorMaterializer, OverflowStrategy} +import akka.stream.scaladsl.{Keep, Sink, Source} +import org.mongodb.scala.bson.BsonValue +import org.mongodb.scala.bson.collection.immutable.Document +import org.mongodb.scala.{MongoClient, MongoClientSettings, Observer, Subscription} +import org.mongodb.scala.model.Filters.{and, equal, gt, gte} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Success} + +object Main { + def main(args: Array[String]): Unit = { + implicit val actorSystem: ActorSystem = ActorSystem() + implicit val materializer: ActorMaterializer = ActorMaterializer() + + val mongoClient = MongoClient("mongodb://begencardcore:Heslo_1234@10.18.255.53:27017,10.18.255.163:27017,10.18.255.118:27017/begencardcore?replicaSet=cluster-lcs-dvp-mw") + val db = mongoClient.getDatabase("begencardcore") + val coll = db.getCollection("snapshot") + + val complete = Promise[Unit]() + val batchSize = 1000 + var docs = ListBuffer[Document]() + + val onNextDoc = (doc: Document) => { + coll + .find[Document](equal("idAccount", doc.get("idAccount").get)) + .toFuture() + .map(_.headOption) + .andThen { + case Success(value) => println(value) + case Failure(exception) => println(exception) + } + } + + val queue = Source + .queue[Document](1000, OverflowStrategy.backpressure) + .mapAsync(10)(onNextDoc) + .toMat(Sink.ignore)(Keep.left) + .run() + + coll.find().subscribe(new Observer[Document] { + var processed: Long = 0 + var subscription: Option[Subscription] = None + var processingFuture: Future[Unit] = Future.unit + + override def onSubscribe(subscription: Subscription): Unit = { + this.subscription = Some(subscription) + subscription.request(batchSize) + } + + override def onNext(result: Document): Unit = { + queue + .offer(result) + .onComplete { + case Success(offerResult) => println(offerResult) + case Failure(exception) => println("offer failed") + } + processed = processed + 1 + + if (processed == batchSize) { + processed = 0 + subscription.get.request(batchSize) + + println(s"request next $batchSize...") + } + } + + override def onError(e: Throwable): Unit = () + override def onComplete(): Unit = complete.success(()) + }) + + Await.ready(complete.future, Duration.Inf) + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main1.scala b/src/main/scala/eu/xeppaka/scalalearn/Main1.scala new file mode 100644 index 0000000..3d4425c --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main1.scala @@ -0,0 +1,41 @@ +package eu.xeppaka.scalalearn + +import java.util.concurrent.ConcurrentHashMap + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Flow, Sink, Source} + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.io.StdIn +import scala.collection.JavaConverters._ + +object Main1 { + val mapsCount = 10000 + + def mapKey(idCard: String) = s"map-${math.abs(idCard.hashCode) % mapsCount}" + + def main(args: Array[String]): Unit = { + implicit val actorSystem: ActorSystem = ActorSystem() + implicit val materializer: ActorMaterializer = ActorMaterializer() + + val resultMap = new ConcurrentHashMap[String, Int]() + val l = 1 to 200000 + + Source + .fromIterator(() => l.iterator) + .map(i => f"$i%032d") + .groupBy(mapsCount, idCard => mapKey(idCard)) + .groupedWithin(100, 2.seconds) + .mergeSubstreams + .to(Sink.foreach(elems => elems.foreach(elem => resultMap.compute(mapKey(elem), (_, num) => num + 1)))) + .run() + + StdIn.readLine() + + resultMap.entrySet().asScala.foreach(pair => if (pair.getValue >= 25) println(pair)) + + Await.ready(actorSystem.terminate(), 3.seconds) + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main10.scala b/src/main/scala/eu/xeppaka/scalalearn/Main10.scala new file mode 100644 index 0000000..619b465 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main10.scala @@ -0,0 +1,14 @@ +package eu.xeppaka.scalalearn + +import java.time.{Instant, LocalDateTime, ZoneId} +import java.time.temporal.{ChronoField, IsoFields} + +object Main10 { + def main(args: Array[String]): Unit = { + val inst = Instant.parse("2011-02-11T12:00:00.00Z") + val ldt = LocalDateTime.ofInstant(inst, ZoneId.of("UTC")) + println(ldt.getDayOfWeek) + println(ldt.get(IsoFields.WEEK_OF_WEEK_BASED_YEAR)) + println(ldt.getMonthValue) + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main11.scala b/src/main/scala/eu/xeppaka/scalalearn/Main11.scala new file mode 100644 index 0000000..4464e9d --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main11.scala @@ -0,0 +1,44 @@ +package eu.xeppaka.scalalearn + +import akka.actor.ActorSystem +import akka.stream.{ActorMaterializer, OverflowStrategy} +import akka.stream.scaladsl.{Sink, Source, Tcp} +import akka.util.ByteString + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.io.StdIn + +object Main11 { + def main(args: Array[String]): Unit = { + val actorSystem = ActorSystem() + val materializer = ActorMaterializer()(actorSystem) + + val queue = Source + .queue[Int](0, OverflowStrategy.dropHead) + .map(i => { + println(s"before buffer elem: $i") + i + }) + .map(i => { + println(s"after buffer elem: $i") + i + }) + .map(i => ByteString.fromString(s"$i")) + .via(Tcp()(actorSystem).outgoingConnection("10.18.255.30", 9093)) + .to(Sink.foreach(println)) + .run()(materializer) + + try { + println(Await.result(queue.offer(1), 2.seconds)) + println(Await.result(queue.offer(2), 2.seconds)) + println(Await.result(queue.offer(3), 2.seconds)) + + println("before readline") + + StdIn.readLine() + } finally { + actorSystem.terminate() + } + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main12.scala b/src/main/scala/eu/xeppaka/scalalearn/Main12.scala new file mode 100644 index 0000000..adf9693 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main12.scala @@ -0,0 +1,28 @@ +package eu.xeppaka.scalalearn + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import eu.xeppaka.scalalearn.Actor1.{Print, Start} + +import scala.io.StdIn +import scala.concurrent.duration._ + +object Main12 { + def main(args: Array[String]): Unit = { + val actorSystem = ActorSystem() + //val materializer = ActorMaterializer()(actorSystem) + + try { + val actor2 = actorSystem.actorOf(Actor2.props()) + val actor1 = actorSystem.actorOf(Actor1.props(actor2)) + + actorSystem.scheduler.schedule(1.second, 10.millis, actor1, Print)(actorSystem.dispatcher) + + actor1 ! Start + + StdIn.readLine() + } finally { + actorSystem.terminate() + } + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main13.scala b/src/main/scala/eu/xeppaka/scalalearn/Main13.scala new file mode 100644 index 0000000..03cb207 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main13.scala @@ -0,0 +1,28 @@ +package eu.xeppaka.scalalearn + +import org.mongodb.scala.bson.codecs.Macros._ +import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY +import org.bson.codecs.configuration.CodecRegistries.{fromCodecs, fromProviders, fromRegistries} +import org.mongodb.scala.MongoClient + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object Main13 { + case class SomeData(field1: String, field2: Option[String]) + + def main(args: Array[String]): Unit = { + val codecRegistry = fromRegistries(fromProviders(classOf[SomeData]), DEFAULT_CODEC_REGISTRY) + val mongo = MongoClient() + val db = mongo.getDatabase("db-0003").withCodecRegistry(codecRegistry) + val coll = db.getCollection[SomeData]("coll0001") + + Await.ready(coll.insertOne(SomeData("field1", Some("field2"))).toFuture(), 1.second) + Await.ready(coll.insertOne(SomeData("field1", None)).toFuture(), 1.second) + + val res = Await.result(coll.find().toFuture(), 2.seconds) + res.foreach(println) + + mongo.close() + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main2.scala b/src/main/scala/eu/xeppaka/scalalearn/Main2.scala new file mode 100644 index 0000000..de3d969 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main2.scala @@ -0,0 +1,35 @@ +package eu.xeppaka.scalalearn + +import akka.actor.{ActorSystem, Props} +import akka.pattern.ask +import akka.stream.ActorMaterializer +import akka.util.Timeout +import eu.xeppaka.scalalearn.CacheUser.{TestMsg, TestMsgSuccess} + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ +import scala.util.Success + +object Main2 { + def main(args: Array[String]): Unit = { + implicit val actorSystem: ActorSystem = ActorSystem() + implicit val materializer: ActorMaterializer = ActorMaterializer() + implicit val timeout: Timeout = 1.second + implicit val ec: ExecutionContext = actorSystem.dispatcher + + val cache = actorSystem.actorOf(Props(classOf[CardCacheActor])) + val cacheApi = new CardCacheApi(cache) + val cacheUser = actorSystem.actorOf(CacheUser.props(cacheApi, cache)) + + (cacheUser ? TestMsg) + .flatMap { vvv => + println(vvv) + vvv match { + case TestMsgSuccess(Some(msg)) => + println(msg) + Future.successful("success") + } + } + .onComplete(vvv => println(vvv)) + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main3.scala b/src/main/scala/eu/xeppaka/scalalearn/Main3.scala new file mode 100644 index 0000000..9d48d0e --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main3.scala @@ -0,0 +1,29 @@ +package eu.xeppaka.scalalearn + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +object Main3 { + def checkValue(value: String): Future[Option[Boolean]] = { + Future { + if (value == "true") Some(true) else Some(false) + } + } + + def main(args: Array[String]): Unit = { + val f = checkValue("test") + .map(b => b.map(!_)) + .map(_.map(b => + if (!b) { + b + } else { + throw new IllegalArgumentException() + } + ).getOrElse(throw new IllegalStateException()) + ) + + Await.ready(f, 1.second) + println(f) + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main4.scala b/src/main/scala/eu/xeppaka/scalalearn/Main4.scala new file mode 100644 index 0000000..018819f --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main4.scala @@ -0,0 +1,24 @@ +package eu.xeppaka.scalalearn + +import org.mongodb.scala.MongoClient +import org.mongodb.scala.bson.collection.immutable.Document + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object Main4 { + def main(args: Array[String]): Unit = { + val mongoClient = MongoClient("mongodb://localhost:27017/?maxPoolSize=2") + val db = mongoClient.getDatabase("my-db1") + + for (i <- 1 to 100) { + val col = db.getCollection(s"col$i") + col.insertOne(Document("{field1:12}")).toFuture() + //Await.result(f1, 1.second) + } + + Thread.sleep(5000) + + //mongoClient.close() + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main5.scala b/src/main/scala/eu/xeppaka/scalalearn/Main5.scala new file mode 100644 index 0000000..102f6bc --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main5.scala @@ -0,0 +1,103 @@ +package eu.xeppaka.scalalearn + +import java.util.Currency + +import org.bson.{BsonReader, BsonWriter} +import org.bson.codecs.{Codec, DecoderContext, EncoderContext} +import org.mongodb.scala.MongoClient +import org.mongodb.scala.bson.codecs.Macros._ +import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY +import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries, fromCodecs} + +import scala.concurrent.duration._ +import scala.concurrent.Await + +object Main5 { + sealed class EventData { + def idAccount: String = ??? + } + case class Event(pid: String, sn: Long, data: EventData) + + case class Value(v: Int) + case class AccountV1(override val idAccount: String, amount: Int, v: Value) extends EventData + case class AccountV2(override val idAccount: String, amount: Int) extends EventData + + case class Class23( + field1: String = "field1", + field2: String = "field1", + field3: String = "field1", + field4: String = "field1", + field5: String = "field1", + field6: String = "field1", + field7: String = "field1", + field8: String = "field1", + field9: String = "field1", + field10: String = "field1", + field11: String = "field1", + field12: String = "field1", + field13: String = "field1", + field14: String = "field1", + field15: String = "field1", + field16: String = "field1", + field17: String = "field1", + field18: String = "field1", + field19: String = "field1", + field20: String = "field1", + field21: String = "field1", + field22: String = "field1", + field23: String = "field1", + field24: String = "field1", + field25: String = "field1", + field26: String = "field1", + field27: String = "field1", + field28: String = "field1", + field29: String = "field1", + field30: String = "field1", + field31: String = "field1" + ) extends EventData + + class ValueCodec extends Codec[Value] { + override def decode(reader: BsonReader, decoderContext: DecoderContext): Value = Value(reader.readInt32()) + override def encode(writer: BsonWriter, value: Value, encoderContext: EncoderContext): Unit = writer.writeInt32(value.v) + override def getEncoderClass: Class[Value] = classOf[Value] + } + + class CurrencyCodec extends Codec[Currency] { + override def decode(reader: BsonReader, decoderContext: DecoderContext): Currency = Currency.getInstance(reader.readString()) + override def encode(writer: BsonWriter, value: Currency, encoderContext: EncoderContext): Unit = writer.writeString(value.getCurrencyCode) + override def getEncoderClass: Class[Currency] = classOf[Currency] + } + + class Class23Codec extends Codec[Class23] { + override def encode(writer: BsonWriter, value: Class23, encoderContext: EncoderContext): Unit = { + writer.writeStartDocument() + writer.writeString("field1", value.field1) + writer.writeEndDocument() + } + override def decode(reader: BsonReader, decoderContext: DecoderContext): Class23 = { + reader.readStartDocument() + val field1 = reader.readString("field1") + reader.readEndDocument() + Class23(field1 = field1) + } + override def getEncoderClass: Class[Class23] = classOf[Class23] + } + + def main(args: Array[String]): Unit = { + val codecRegistry = fromRegistries(fromCodecs(new ValueCodec, new CurrencyCodec, new Class23Codec), fromProviders(classOf[Event], classOf[EventData]), DEFAULT_CODEC_REGISTRY) + val client = MongoClient() + val db = client.getDatabase("hierarchy-test").withCodecRegistry(codecRegistry) + val accounts = db.getCollection[Event]("accounts") + val fieldColl = db.getCollection[Class23]("fields") + + Await.result(fieldColl.insertOne(Class23()).toFuture(), 1.second) +// Await.result(accounts.insertOne(Event("001", 1, Class23())).toFuture(), 1.second) +// Await.result(accounts.insertOne(Event("001", 1, AccountV1("000001", 30, Value(10)))).toFuture(), 1.second) +// Await.result(accounts.insertOne(Event("001", 2, AccountV2("000002", 20, "EUR"))).toFuture(), 1.second) + + val res = Await.result(accounts.find().toFuture(), 1.second) + res.foreach(println) + + client.close() + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main6.scala b/src/main/scala/eu/xeppaka/scalalearn/Main6.scala new file mode 100644 index 0000000..e646d1a --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main6.scala @@ -0,0 +1,24 @@ +package eu.xeppaka.scalalearn + +import akka.actor.{ActorSystem, PoisonPill, Props} +import akka.stream.ActorMaterializer +import eu.xeppaka.scalalearn.ActorTest1.Print + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.io.StdIn + +object Main6 { + def main(args: Array[String]): Unit = { + val actorSystem = ActorSystem() + implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) + + val actor = actorSystem.actorOf(Props(new ActorTest1)) + actor ! Print + //actor ! PoisonPill + actor ! Print + + StdIn.readLine() + Await.ready(actorSystem.terminate(), 5.seconds) + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main7.scala b/src/main/scala/eu/xeppaka/scalalearn/Main7.scala new file mode 100644 index 0000000..6e2e77d --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main7.scala @@ -0,0 +1,53 @@ +package eu.xeppaka.scalalearn + +import org.bson.codecs.configuration.{CodecProvider, CodecRegistry} +import org.mongodb.scala.bson.codecs.Macros._ +import org.bson.codecs.configuration.CodecRegistries.{fromCodecs, fromProviders, fromRegistries} +import org.bson.codecs.{Codec, DecoderContext, EncoderContext} +import org.bson.{BsonReader, BsonWriter} +import org.mongodb.scala.MongoClient +import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY +import scalapb.{GeneratedEnum, GeneratedEnumCompanion} + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.reflect._ + +object Main7 { + class ProtoEnumCodecProvider extends CodecProvider { + override def get[T](clazz: Class[T], registry: CodecRegistry): Codec[T] = { + val ass = classOf[GeneratedEnum].isAssignableFrom(clazz) + if (ass) { + new ProtoEnumCodec(clazz.asInstanceOf[Class[GeneratedEnum]]).asInstanceOf[Codec[T]] + } else { + null + } + } + } + + class ProtoEnumCodec(clazz: Class[GeneratedEnum]) extends Codec[GeneratedEnum] { + override def decode(reader: BsonReader, decoderContext: DecoderContext): GeneratedEnum = { + val method = clazz.getDeclaredMethod("fromName", classOf[String]) + method.invoke(clazz, reader.readString()).asInstanceOf[Option[GeneratedEnum]].get + } + override def encode(writer: BsonWriter, value: GeneratedEnum, encoderContext: EncoderContext): Unit = writer.writeString(value.name) + override def getEncoderClass: Class[GeneratedEnum] = clazz + } + + case class UseEnum( + status1: Status, + status2: Status + ) + + def main(args: Array[String]): Unit = { + val codecRegistry = fromRegistries(fromProviders(classOf[UseEnum], new ProtoEnumCodecProvider), DEFAULT_CODEC_REGISTRY) + val client = MongoClient() + val db = client.getDatabase("classtag-test").withCodecRegistry(codecRegistry) + val status = db.getCollection[UseEnum]("statuses") + + //Await.result(status.insertOne(UseEnum(Status.OPEN, Status.BLOCKED_ALL)).toFuture(), 1.second) + //Await.result(status.insertOne(Status.BLOCKED_ALL).toFuture(), 1.second) + val res = Await.result(status.find().toFuture(), 1.second) + println(res.head.status1) + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main8.scala b/src/main/scala/eu/xeppaka/scalalearn/Main8.scala new file mode 100644 index 0000000..4f6ff33 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main8.scala @@ -0,0 +1,40 @@ +package eu.xeppaka.scalalearn + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Sink, Source} +import akka.stream.{ActorMaterializer, Materializer} + +import scala.concurrent.Future +import scala.io.StdIn +import scala.util.Random +import scala.concurrent.duration._ + +object Main8 { + def main(args: Array[String]): Unit = { + implicit val actorSystem: ActorSystem = ActorSystem() + implicit val materializer: Materializer = ActorMaterializer() + + Source + .repeat(NotUsed) + .throttle(10, 1.second) + .mapAsync(1) { _ => + Future.successful(Random.nextInt()) + } + .mapAsync(1) { _ => + Future.successful(Random.nextInt()) + } + .flatMapConcat { i => + if (i % 2 == 0) { + Source.empty + } else { + Source.single(i) + } + } + .to(Sink.foreach(println)) + .run() + + StdIn.readLine() + actorSystem.terminate() + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/Main9.scala b/src/main/scala/eu/xeppaka/scalalearn/Main9.scala new file mode 100644 index 0000000..1c0c250 --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/Main9.scala @@ -0,0 +1,39 @@ +package eu.xeppaka.scalalearn + +import java.util.Currency + +import org.bson.codecs.configuration.CodecRegistries.{fromCodecs, fromProviders, fromRegistries} +import org.bson.codecs.{Codec, DecoderContext, EncoderContext} +import org.bson.{BsonReader, BsonWriter} +import org.mongodb.scala.MongoClient +import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY +import org.mongodb.scala.bson.codecs.Macros._ + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object Main9 { + case class Id(id: String) + case class Value(data: Int, text: String) + case class Data(values: Map[String, Value]) + + def main(args: Array[String]): Unit = { + val codecRegistry = fromRegistries(fromProviders(classOf[Id], classOf[Value], classOf[Data]), DEFAULT_CODEC_REGISTRY) + val client = MongoClient() + val db = client.getDatabase("db-0002").withCodecRegistry(codecRegistry) + val coll = db.getCollection[Data]("coll-0001") + + val d = Data(Map("111" -> Value(1, "111"), "222" -> Value(2, "222"))) + //Await.result(coll.insertOne(d).toFuture(), 2.seconds) + +// Await.result(fieldColl.insertOne(Class23()).toFuture(), 1.second) +// Await.result(accounts.insertOne(Event("001", 1, Class23())).toFuture(), 1.second) +// Await.result(accounts.insertOne(Event("001", 1, AccountV1("000001", 30, Value(10)))).toFuture(), 1.second) +// Await.result(accounts.insertOne(Event("001", 2, AccountV2("000002", 20, "EUR"))).toFuture(), 1.second) + + val res = Await.result(coll.find().toFuture(), 1.second) + res.foreach(println) + + client.close() + } +} diff --git a/src/main/scala/eu/xeppaka/scalalearn/TestMirror.scala b/src/main/scala/eu/xeppaka/scalalearn/TestMirror.scala new file mode 100644 index 0000000..8c3a21b --- /dev/null +++ b/src/main/scala/eu/xeppaka/scalalearn/TestMirror.scala @@ -0,0 +1,6 @@ +package eu.xeppaka.scalalearn + +trait TestMirror { + def print(s: String) + def print2(s: String) +} diff --git a/src/test/scala/eu/xeppaka/scalalearn/TestStub.scala b/src/test/scala/eu/xeppaka/scalalearn/TestStub.scala new file mode 100644 index 0000000..f7306ec --- /dev/null +++ b/src/test/scala/eu/xeppaka/scalalearn/TestStub.scala @@ -0,0 +1,26 @@ +package eu.xeppaka.scalalearn + +import org.scalamock.scalatest.MockFactory +import org.scalatest.{BeforeAndAfterEach, FlatSpec, OneInstancePerTest} + +class TestStub extends FlatSpec with BeforeAndAfterEach with MockFactory with OneInstancePerTest { + var testMirror: TestMirror = _ + + override def beforeEach(): Unit = { + testMirror = stub[TestMirror] + (testMirror.print(_: String)).when(*).onCall { s: String => println(s"stub call: $s") } + } + + "blah1" should "blah" in { + testMirror.print("123") + testMirror.print2("123") + (testMirror.print(_: String)).verify(*).once() + (testMirror.print2(_: String)).verify(*).once() + } + + "blah2" should "blah" in { + testMirror.print("456") + testMirror.print("567") + (testMirror.print(_: String)).verify(*).twice() + } +}