This repository was archived by the owner on Apr 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 437
/
Copy pathUserEvents.scala
58 lines (51 loc) · 2.33 KB
/
UserEvents.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package sample.sharding.kafka
import akka.Done
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity}
import akka.kafka.cluster.sharding.KafkaClusterSharding
import scala.concurrent.Future
import scala.concurrent.duration._
object UserEvents {
def init(system: ActorSystem[_], settings: ProcessorSettings): Future[ActorRef[Command]] = {
import system.executionContext
KafkaClusterSharding(settings.system).messageExtractorNoEnvelope(
timeout = 10.seconds,
topic = settings.topics.head,
entityIdExtractor = (msg: Command) => msg.userId,
settings = settings.kafkaConsumerSettings()
).map(messageExtractor => {
system.log.info("Message extractor created. Initializing sharding")
ClusterSharding(system).init(
Entity(settings.entityTypeKey)(createBehavior = _ => UserEvents())
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, settings.entityTypeKey.name))
.withMessageExtractor(messageExtractor))
})
}
sealed trait Command extends CborSerializable {
def userId: String
}
final case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done]) extends Command
final case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends Command
// state
final case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable
def apply(): Behavior[Command] = running(RunningTotal(0, 0))
private def running(runningTotal: RunningTotal): Behavior[Command] = {
Behaviors.setup { ctx =>
Behaviors.receiveMessage[Command] {
case UserPurchase(id, product, quantity, price, ack) =>
ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price)
ack.tell(Done)
running(
runningTotal.copy(
totalPurchases = runningTotal.totalPurchases + 1,
amountSpent = runningTotal.amountSpent + (quantity * price)))
case GetRunningTotal(id, replyTo) =>
ctx.log.info("user {} running total queried", id)
replyTo ! runningTotal
Behaviors.same
}
}
}
}