-
Notifications
You must be signed in to change notification settings - Fork 4
Using Kalium with Kafka Work In Progress
The first thing you need to do is to instantiate a Kalium instance. You should probably use only one instance of Kalium per JVM runtime. Using multiple instances is possible and recommended in cases where there more than on queue clusters to interact with.
Instantiate Kalium using the a Kalium.Builder
:
Kalium kalium = Kalium.Builder()
.setQueue(new KaliumKafkaQueueAdapter("localhost:9092"))
.build();
Once a kalium instance was instantiated, it needs to be started in order to post and react to objects. Start Kalium by calling
kalium.start();
Stop Kalium by calling
kalium.stop();
Once a kalium was started, you can begin posting objects to be processed by methods who react to an object of the same type. Posting an object does not define how that object is going to be consumed. An object can be processed by more than one methods, simultaneously and in arbitrary order. When posting object, kalium will take care of serialization of that object. Currently, objects will be serialized as a JSON object. As such, it's highly recommended to use POJOs that are pretty simple and relatively "flat".
Here's a simple example. Imagine we're building an e-commerce site where we need to process payments. To process payments, we're using kalium. We want to process payments that are created by a user on our website. Let's assume we're using object of type Payment
which capture some information about the issued payment.
To post an object use the following syntax:
Payment payment = new Payment(fromBuyer,toSeller,amount);
kalium.post(myObject);
Now that we posted an object let's define a reaction for such an object.
A reaction is a method that is invoked upon arrival of an object of a type that the method accepts as input. Kalium provides two ways to define a reaction: a lambda expression and a reaction object. The former invokes a lambda expression upon arrival of an object while the latter, allow invocation of more than one method per arrived object of the same type.
By default, only one instance of a reaction id will be triggered for a posted object. This feature allows adding more and more kalium instances while guranteeing that a posted object will be process by only one reaction. This allows an application to scale easily.
Simply by using .on(...)
method you can define a reaction to an object of a particular type.
.on(...)
should be called before starting kalium. Here's a simple example:
Kalium kalium = Kalium.Builder()
.setQueue(new KaliumKafkaQueueAdapter("localhost:9092"))
.build();
kalium.on(Payment.class, payment -> {
// Do something with the payment, e.g. call Stripe to make the actual payment
payment.processed = true;
kalium.post(payment);
},"PAYMENT_PROCESSOR");
kalium.start()
In the example above, upon arrival of an object of type Payment.class
, a lambda expression payment -> {...}
is invoked. The last parameter is used to set a reaction id. In this case, the id is set to "PAYMENT_PROCESSOR"
.
A reaction object is an object defined by class with methods that are annotated with @On
annotation. For example, the following reaction class has two methods that are annotated with @On
public class PaymentProcessor {
private Kalium kalium;
public PaymentProcessor(Kalium kalium) {
this.kalium = kalium;
}
@On
public void processPayment(Payment payment) {
// Do something with the payment, e.g. call Stripe to make the actual payment
payment.processed = true;
kalium.post(payment);
}
@On
public void archivePayment(Payment payment) {
// store the payment somewhere
}
}
Add the reaction object to kalium and start reacting to Payment
objects using the following lines:
...
kalium.addReactor(new PaymentProcessor(kalium));
kalium.start()
...
By adding a reaction to a kalium instance, your app can start reacting to objects that are posted to Kalium. By design, each object will be processed by only one instance of a reactor class. For instance, if you're trying to create a scalable Payment Processing service, using the logic provided in the example above, you can launch multiple instances of Kalium that uses the same reactor class, PaymentProcessor
. In such setup, a posted object of type Payment
will be processed by only one Kalium instance.
This is achieved by leveraging Kafka's Consumer Group concept under the hood.
You can add as many different reactor instances as you like.
A Reactor Id