From 758fbde683eca10b8029dcbe488cd1898d2add03 Mon Sep 17 00:00:00 2001
From: wieceslaw <vyache-nov@mail.ru>
Date: Thu, 20 Mar 2025 02:04:02 +0300
Subject: [PATCH 1/2] init

---
 pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pom.xml b/pom.xml
index 1655ee9..d05d409 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,6 +29,7 @@
         <module>ydb-cookbook</module>
         <module>url-shortener-demo</module>
         <module>jdbc</module>
+        <module>coordination</module>
     </modules>
 
     <dependencyManagement>

From d52bdd7f192446a40328660db79f5d544263f7c2 Mon Sep 17 00:00:00 2001
From: wieceslaw <vyache-nov@mail.ru>
Date: Thu, 20 Mar 2025 02:05:03 +0300
Subject: [PATCH 2/2] feat: coordination recipes examples

---
 coordination/pom.xml                          |  20 ++
 coordination/recipes/pom.xml                  |  58 ++++
 .../coordination/recipes/example/LockApp.java |  88 ++++++
 .../coordination/recipes/example/Main.java    |  30 ++
 .../lib/election/LeaderElectionListener.java  |   5 +
 .../example/lib/election/LeaderElector.java   | 194 +++++++++++++
 .../example/lib/locks/InterProcessLock.java   |  25 ++
 .../example/lib/locks/InterProcessMutex.java  | 251 ++++++++++++++++
 .../lib/locks/LockAcquireFailedException.java |  20 ++
 .../locks/LockAlreadyAcquiredException.java   |  20 ++
 .../example/lib/locks/LockInternals.java      | 267 ++++++++++++++++++
 .../example/lib/locks/ReadWriteLock.java      |  79 ++++++
 .../recipes/example/lib/util/Listenable.java  |  19 ++
 .../example/lib/util/ListenableProvider.java  |  29 ++
 .../lib/util/SessionListenerWrapper.java      |  54 ++++
 .../example/lib/watch/Participant.java        |  61 ++++
 .../lib/watch/SemaphoreWatchAdapter.java      | 178 ++++++++++++
 .../recipes/src/main/resources/log4j2.xml     |  27 ++
 18 files changed, 1425 insertions(+)
 create mode 100644 coordination/pom.xml
 create mode 100644 coordination/recipes/pom.xml
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java
 create mode 100644 coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java
 create mode 100644 coordination/recipes/src/main/resources/log4j2.xml

diff --git a/coordination/pom.xml b/coordination/pom.xml
new file mode 100644
index 0000000..cf8a6bc
--- /dev/null
+++ b/coordination/pom.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>tech.ydb.examples</groupId>
+        <artifactId>ydb-sdk-examples</artifactId>
+        <version>1.1.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>tech.ydb.coordination.examples</groupId>
+    <artifactId>ydb-coordination-examples</artifactId>
+
+    <name>YDB SDK Coordination Service examples</name>
+
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>recipes</module>
+    </modules>
+</project>
diff --git a/coordination/recipes/pom.xml b/coordination/recipes/pom.xml
new file mode 100644
index 0000000..9a6ed9c
--- /dev/null
+++ b/coordination/recipes/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>tech.ydb.coordination.examples</groupId>
+        <artifactId>ydb-coordination-examples</artifactId>
+        <version>1.1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>ydb-coordination-recipes-example</artifactId>
+    <name>YDB Coordination Service recipes example</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>tech.ydb</groupId>
+            <artifactId>ydb-sdk-coordination</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>tech.ydb.auth</groupId>
+            <artifactId>yc-auth-provider</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>jdbc-coordination-recipes-example</finalName>
+        <plugins>
+            <!-- copy dependencies to libs folder -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+            </plugin>
+            <!-- add libs folder to classpath -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <classpathPrefix>libs/</classpathPrefix>
+                            <mainClass>tech.ydb.coordination.recipes.example.Main</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java
new file mode 100644
index 0000000..c7b4b92
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java
@@ -0,0 +1,88 @@
+package tech.ydb.coordination.recipes.example;
+
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.recipes.example.lib.locks.InterProcessLock;
+import tech.ydb.coordination.recipes.example.lib.locks.InterProcessMutex;
+
+import java.time.Duration;
+import java.util.Scanner;
+
+public class LockApp {
+
+    InterProcessLock lock;
+
+    LockApp(CoordinationClient client) {
+        client.createNode("examples/app").join().expectSuccess("cannot create coordination path");
+        lock = new InterProcessMutex(
+                client,
+                "examples/app",
+                "data".getBytes(),
+                "default_lock"
+        );
+    }
+
+    public void lock(Duration duration) {
+        try {
+            if (duration == null) {
+                lock.acquire();
+            } else {
+                lock.acquire(duration);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void release() {
+        try {
+            lock.release();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private boolean isAcquired() {
+        return lock.isAcquiredInThisProcess();
+    }
+
+    public void run() {
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("Enter commands: lock [seconds] | release | reconnect | ?");
+
+        while (scanner.hasNextLine()) {
+            String commandLine = scanner.nextLine().trim();
+            String[] commandParts = commandLine.split("\\s+");
+            String command = commandParts[0];
+
+            switch (command.toLowerCase()) {
+                case "lock":
+                    int seconds = -1;
+                    if (commandParts.length > 1) {
+                        try {
+                            seconds = Integer.parseInt(commandParts[1]);
+                        } catch (NumberFormatException e) {
+                            System.out.println("Invalid number format, defaulting to 0 seconds");
+                        }
+                    }
+                    if (seconds == -1) {
+                        lock(null);
+                    } else {
+                        lock(Duration.ofSeconds(seconds));
+                    }
+                    break;
+                case "release":
+                    release();
+                    break;
+                case "?":
+                    System.out.println("Lock is acquired: " + isAcquired());
+                    break;
+                default:
+                    System.out.println("Unknown command: " + command);
+            }
+        }
+
+        scanner.close();
+    }
+
+}
+
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java
new file mode 100644
index 0000000..a9c9579
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java
@@ -0,0 +1,30 @@
+package tech.ydb.coordination.recipes.example;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import tech.ydb.auth.iam.CloudAuthHelper;
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.core.grpc.GrpcTransport;
+
+public class Main {
+    private final static Logger logger = LoggerFactory.getLogger(Main.class);
+
+    public static void main(String[] args) {
+        if (args.length != 1) {
+            System.err.println("Usage: java -jar jdbc-coordination-api-example.jar <connection_url>");
+            return;
+        }
+
+        try (GrpcTransport transport = GrpcTransport.forConnectionString(args[0])
+                .withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
+                .build()) {
+
+            logger.info("run lock app example");
+            CoordinationClient client = CoordinationClient.newClient(transport);
+            LockApp lockApp = new LockApp(client);
+            lockApp.run();
+            logger.info("lock app example finished");
+        }
+    }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java
new file mode 100644
index 0000000..957f403
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java
@@ -0,0 +1,5 @@
+package tech.ydb.coordination.recipes.example.lib.election;
+
+public interface LeaderElectionListener {
+    void takeLeadership() throws Exception;
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java
new file mode 100644
index 0000000..5ca863c
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java
@@ -0,0 +1,194 @@
+package tech.ydb.coordination.recipes.example.lib.election;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.recipes.example.lib.watch.Participant;
+import tech.ydb.coordination.recipes.example.lib.locks.InterProcessMutex;
+import tech.ydb.coordination.recipes.example.lib.watch.SemaphoreWatchAdapter;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider;
+
+public class LeaderElector implements Closeable, ListenableProvider<CoordinationSession.State> {
+    private static final Logger logger = LoggerFactory.getLogger(LeaderElector.class);
+
+    private final CoordinationClient client;
+    private final LeaderElectionListener leaderElectionListener;
+    private final String coordinationNodePath;
+    private final String semaphoreName;
+    private final ExecutorService electionExecutor;
+    private final InterProcessMutex lock;
+    private final SemaphoreWatchAdapter semaphoreWatchAdapter;
+
+    private AtomicReference<State> state = new AtomicReference<>(State.STARTED);
+    private volatile boolean autoRequeue = false;
+    private volatile boolean isLeader = false;
+    private Future<Void> electionTask = null;
+
+
+    private enum State { // TODO: needs third state (CREATED)?
+        STARTED,
+        CLOSED
+    }
+
+    public LeaderElector(
+            CoordinationClient client,
+            LeaderElectionListener leaderElectionListener,
+            String coordinationNodePath,
+            String semaphoreName
+    ) {
+        this(client, leaderElectionListener, coordinationNodePath, semaphoreName, Executors.newSingleThreadExecutor());
+    }
+
+    public LeaderElector(
+            CoordinationClient client,
+            LeaderElectionListener leaderElectionListener,
+            String coordinationNodePath,
+            String semaphoreName,
+            ExecutorService executorService
+    ) {
+        this.client = client;
+        this.leaderElectionListener = leaderElectionListener;
+        this.coordinationNodePath = coordinationNodePath;
+        this.semaphoreName = semaphoreName;
+        this.electionExecutor = executorService;
+        this.lock = new InterProcessMutex(
+                client,
+                coordinationNodePath,
+                semaphoreName
+        );
+        this.semaphoreWatchAdapter = new SemaphoreWatchAdapter(lock.getSession(), semaphoreName);
+        semaphoreWatchAdapter.start();
+    }
+
+    public boolean isLeader() {
+        return isLeader;
+    }
+
+    public synchronized void interruptLeadership() {
+        Future<?> task = electionTask;
+        if (task != null) {
+            task.cancel(true);
+        }
+    }
+
+    /**
+     * Re-queue an attempt for leadership. If this instance is already queued, nothing
+     * happens and false is returned. If the instance was not queued, it is re-queued and true
+     * is returned
+     *
+     * @return true if re-enqueue was successful
+     */
+    public boolean requeue() {
+        Preconditions.checkState(state.get() == State.STARTED, "Already closed or not yet started");
+
+        return enqueueElection();
+    }
+
+    public void autoRequeue() {
+        autoRequeue = true;
+    }
+
+    private synchronized boolean enqueueElection() {
+        if (!isQueued() && state.get() == State.STARTED) {
+            electionTask = electionExecutor.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    try {
+                        doWork();
+                    } finally {
+                        finishTask();
+                    }
+                    return null;
+                }
+            });
+            return true;
+        }
+
+        return false;
+    }
+
+    private void doWork() throws Exception {
+        isLeader = false;
+
+        try {
+            lock.acquire();
+            isLeader = true;
+            try {
+                leaderElectionListener.takeLeadership();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw e;
+            } catch (Throwable e) {
+                logger.debug("takeLeadership exception", e);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw e;
+        } finally {
+            if (isLeader) {
+                isLeader = false;
+                boolean wasInterrupted = Thread.interrupted();
+                try {
+                    lock.release();
+                } catch (Exception e) {
+                    logger.error("Lock release exception for: " + coordinationNodePath);
+                } finally {
+                    if (wasInterrupted) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        }
+    }
+
+    private synchronized void finishTask() {
+        electionTask = null;
+        if (autoRequeue) { // TODO: requeue if critical exception?
+            enqueueElection();
+        }
+    }
+
+    private boolean isQueued() {
+        return electionTask != null;
+    }
+
+    public List<Participant> getParticipants() {
+        return semaphoreWatchAdapter.getParticipants();
+    }
+
+    public Optional<Participant> getLeader() {
+        return semaphoreWatchAdapter.getOwners().stream().findFirst();
+    }
+
+    @Override
+    public synchronized void close() {
+        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed");
+
+        Future<Void> task = electionTask;
+        if (task != null) {
+            task.cancel(true);
+        }
+
+        electionTask = null;
+        electionExecutor.close();
+        semaphoreWatchAdapter.close();
+        getListenable().clearListeners();
+    }
+
+    @Override
+    public Listenable<CoordinationSession.State> getListenable() {
+        return lock.getListenable();
+    }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java
new file mode 100644
index 0000000..b67d5ea
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java
@@ -0,0 +1,25 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+import java.time.Duration;
+
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+
+public interface InterProcessLock extends Listenable<CoordinationSession.State> {
+    void acquire() throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException;
+
+    /**
+     * @return true - if successfully acquired lock, false - if lock waiting time expired
+     */
+    boolean acquire(Duration waitDuration) throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException;
+
+    /**
+     * @return false if nothing to release
+     */
+    boolean release() throws Exception;
+
+    /**
+     * @return true if the lock is acquired by a thread in this JVM
+     */
+    boolean isAcquiredInThisProcess();
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java
new file mode 100644
index 0000000..11b18e3
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java
@@ -0,0 +1,251 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.SemaphoreLease;
+import tech.ydb.coordination.description.SemaphoreDescription;
+import tech.ydb.coordination.recipes.example.lib.util.SessionListenerWrapper;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider;
+import tech.ydb.coordination.settings.DescribeSemaphoreMode;
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+import tech.ydb.core.StatusCode;
+
+@ThreadSafe
+public class InterProcessMutex implements InterProcessLock, ListenableProvider<CoordinationSession.State> {
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
+    private static final Logger logger = LoggerFactory.getLogger(InterProcessMutex.class);
+
+    private final Lock leaseLock = new ReentrantLock();
+    private final CoordinationSession session;
+    private final CompletableFuture<Status> sessionConnectionTask;
+    private final SessionListenerWrapper sessionListenerWrapper;
+    private final String semaphoreName;
+    private final String coordinationNodePath;
+
+    private volatile SemaphoreLease processLease = null;
+
+    public InterProcessMutex(
+            CoordinationClient client,
+            String coordinationNodePath,
+            String lockName
+    ) {
+        this.coordinationNodePath = coordinationNodePath;
+        this.session = client.createSession(coordinationNodePath);
+        this.sessionListenerWrapper = new SessionListenerWrapper(session);
+        this.semaphoreName = lockName;
+
+        this.sessionConnectionTask = session.connect().thenApply(status -> {
+            logger.debug("Session connection status: " + status);
+            return status;
+        });
+        session.addStateListener(state -> {
+            switch (state) {
+                case RECONNECTED: {
+                    logger.debug("Session RECONNECTED");
+                    reconnect();
+                    break;
+                }
+                case CLOSED: {
+                    logger.debug("Session CLOSED, releasing lock");
+                    internalRelease();
+                    break;
+                }
+                case LOST: {
+                    logger.debug("Session LOST, releasing lock");
+                    internalRelease();
+                    break;
+                }
+            }
+        });
+    }
+
+    private CoordinationSession connectedSession() {
+        try {
+            sessionConnectionTask.get().expectSuccess("Unable to connect to session on: " + coordinationNodePath);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        return session;
+    }
+
+    private void reconnect() {
+        connectedSession().describeSemaphore(
+                semaphoreName,
+                DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS
+        ).thenAccept(result -> {
+            if (!result.isSuccess()) {
+                logger.error("Unable to describe semaphore {}", semaphoreName);
+                return;
+            }
+            SemaphoreDescription semaphoreDescription = result.getValue();
+            SemaphoreDescription.Session owner = semaphoreDescription.getOwnersList().getFirst();
+            if (owner.getId() != session.getId()) {
+                logger.warn(
+                        "Current session with id: {} lost lease after reconnection on semaphore: {}",
+                        owner.getId(),
+                        semaphoreName
+                );
+                internalRelease();
+            }
+        });
+    }
+
+    @Override
+    public void acquire() throws Exception {
+        logger.debug("Trying to acquire without timeout");
+        safeAcquire(null);
+    }
+
+    @Override
+    public boolean acquire(Duration duration) throws Exception {
+        logger.debug("Trying to acquire with deadline: {}", duration);
+        Instant deadline = Instant.now().plus(duration);
+        return safeAcquire(deadline);
+    }
+
+    @Override
+    public boolean release() throws Exception {
+        return internalRelease().get();
+    }
+
+    private CompletableFuture<Boolean> internalRelease() {
+        logger.debug("Trying to release");
+        if (processLease == null) {
+            logger.debug("Already released");
+            return CompletableFuture.completedFuture(false);
+        }
+
+        leaseLock.lock();
+        try {
+            if (processLease != null) {
+                return processLease.release().thenApply(it -> {
+                    logger.debug("Released lock");
+                    processLease = null;
+                    leaseLock.unlock();
+                    return true;
+                });
+            }
+        } finally {
+            leaseLock.unlock();
+        }
+
+        logger.debug("Already released");
+        return CompletableFuture.completedFuture(false);
+    }
+
+    @Override
+    public boolean isAcquiredInThisProcess() {
+        return processLease != null;
+    }
+
+    // TODO: implement interruption
+
+    /**
+     * @param deadline
+     * @return true - if successfully acquired lock
+     * @throws Exception
+     * @throws LockAlreadyAcquiredException
+     */
+    private boolean safeAcquire(@Nullable Instant deadline) throws Exception, LockAlreadyAcquiredException {
+        if (processLease != null) {
+            logger.debug("Already acquired lock: {}", semaphoreName);
+            throw new LockAlreadyAcquiredException(semaphoreName);
+        }
+
+        leaseLock.lock();
+        try {
+            if (processLease != null) {
+                logger.debug("Already acquired lock: {}", semaphoreName);
+                throw new LockAlreadyAcquiredException(semaphoreName);
+            }
+
+            SemaphoreLease lease = internalLock(deadline);
+            if (lease != null) {
+                processLease = lease;
+                logger.debug("Successfully acquired lock: {}", semaphoreName);
+                return true;
+            }
+        } finally {
+            leaseLock.unlock();
+        }
+
+        logger.debug("Unable to acquire lock: {}", semaphoreName);
+        return false;
+    }
+
+    private SemaphoreLease internalLock(@Nullable Instant deadline) throws ExecutionException, InterruptedException {
+        int retryCount = 0;
+        while (connectedSession().getState().isActive() && (deadline == null || Instant.now().isBefore(deadline))) {
+            retryCount++;
+
+            Duration timeout;
+            if (deadline == null) {
+                timeout = DEFAULT_TIMEOUT;
+            } else {
+                timeout = Duration.between(Instant.now(), deadline); // TODO: use external Clock instead of Instant?
+            }
+            CompletableFuture<Result<SemaphoreLease>> acquireTask = connectedSession().acquireEphemeralSemaphore(
+                    semaphoreName, true, null, timeout // TODO: change Session API to use deadlines
+            );
+            Result<SemaphoreLease> leaseResult;
+            try {
+                leaseResult = acquireTask.get();
+            } catch (InterruptedException e) {
+                // If acquire is interrupted, then release immediately
+                Thread.currentThread().interrupt();
+                acquireTask.thenAccept(acquireResult -> {
+                    if (!acquireResult.getStatus().isSuccess()) {
+                        return;
+                    }
+                    SemaphoreLease lease = acquireResult.getValue();
+                    lease.release();
+                });
+                throw e;
+            }
+
+            Status status = leaseResult.getStatus();
+            logger.debug("Lease result status: {}", status);
+
+            if (status.isSuccess()) {
+                logger.debug("Successfully acquired the lock");
+                return leaseResult.getValue();
+            }
+
+            if (status.getCode() == StatusCode.TIMEOUT) {
+                logger.debug("Trying to acquire again, retries: {}", retryCount);
+                continue;
+            }
+
+            if (!status.getCode().isRetryable(true)) {
+                status.expectSuccess("Unable to retry acquiring semaphore");
+                return null;
+            }
+        }
+
+        // TODO: handle timeout and error differently
+        throw new LockAcquireFailedException(coordinationNodePath, semaphoreName);
+    }
+
+    @Override
+    public Listenable<CoordinationSession.State> getListenable() {
+        return sessionListenerWrapper;
+    }
+
+    public CoordinationSession getSession() {
+        return session;
+    }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java
new file mode 100644
index 0000000..d4ea7b1
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java
@@ -0,0 +1,20 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+public class LockAcquireFailedException extends RuntimeException {
+    private final String coordinationNodePath;
+    private final String semaphoreName;
+
+    public LockAcquireFailedException(String coordinationNodePath, String semaphoreName) {
+        super("Failed to acquire semaphore=" + semaphoreName + ", on coordination node=" + coordinationNodePath);
+        this.coordinationNodePath = coordinationNodePath;
+        this.semaphoreName = semaphoreName;
+    }
+
+    public String getCoordinationNodePath() {
+        return coordinationNodePath;
+    }
+
+    public String getSemaphoreName() {
+        return semaphoreName;
+    }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java
new file mode 100644
index 0000000..1d1009a
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java
@@ -0,0 +1,20 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+public class LockAlreadyAcquiredException extends RuntimeException {
+    private final String coordinationNodePath;
+    private final String semaphoreName;
+
+    public LockAlreadyAcquiredException(String coordinationNodePath, String semaphoreName) {
+        super("Semaphore=" + semaphoreName + " on path=" + coordinationNodePath + " is already acquired");
+        this.coordinationNodePath = coordinationNodePath;
+        this.semaphoreName = semaphoreName;
+    }
+
+    public String getCoordinationNodePath() {
+        return coordinationNodePath;
+    }
+
+    public String getSemaphoreName() {
+        return semaphoreName;
+    }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java
new file mode 100644
index 0000000..57d16bf
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java
@@ -0,0 +1,267 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.SemaphoreLease;
+import tech.ydb.coordination.description.SemaphoreDescription;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider;
+import tech.ydb.coordination.recipes.example.lib.util.SessionListenerWrapper;
+import tech.ydb.coordination.settings.DescribeSemaphoreMode;
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+import tech.ydb.core.StatusCode;
+
+@ThreadSafe
+class LockInternals implements ListenableProvider<CoordinationSession.State>, Closeable {
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
+    private static final Logger logger = LoggerFactory.getLogger(LockInternals.class);
+
+    private final String coordinationNodePath;
+    private final String semaphoreName;
+    private final CoordinationSession session;
+    private final SessionListenerWrapper sessionListenerWrapper;
+
+    private CompletableFuture<Status> sessionConnectionTask = null;
+    private volatile SemaphoreLease processLease = null; // TODO: volatile?
+
+    LockInternals(
+            CoordinationClient client,
+            String coordinationNodePath,
+            String lockName
+    ) {
+        this.coordinationNodePath = coordinationNodePath;
+        this.semaphoreName = lockName;
+        this.session = client.createSession(coordinationNodePath);
+        this.sessionListenerWrapper = new SessionListenerWrapper(session);
+    }
+
+    public void start() {
+        this.sessionConnectionTask = session.connect().thenApply(status -> {
+            logger.debug("Session connection status: {}", status);
+            return status;
+        });
+
+        Consumer<CoordinationSession.State> listener = state -> {
+            switch (state) {
+                case RECONNECTED: {
+                    logger.debug("Session RECONNECTED");
+                    reconnect();
+                    break;
+                }
+                case CLOSED: {
+                    logger.debug("Session CLOSED, releasing lock");
+                    internalRelease();
+                    break;
+                }
+                case LOST: {
+                    logger.debug("Session LOST, releasing lock");
+                    internalRelease();
+                    break;
+                }
+            }
+        };
+
+        session.addStateListener(listener);
+    }
+
+    private CoordinationSession connectedSession() {
+        try {
+            sessionConnectionTask.get().expectSuccess("Unable to connect to session on: " + coordinationNodePath);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        return session;
+    }
+
+    private void reconnect() {
+        CoordinationSession coordinationSession = connectedSession();
+        coordinationSession.describeSemaphore(
+                semaphoreName,
+                DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS
+        ).thenAccept(result -> {
+            if (!result.isSuccess()) {
+                logger.error("Unable to describe semaphore {}", semaphoreName);
+                return;
+            }
+            SemaphoreDescription semaphoreDescription = result.getValue();
+            SemaphoreDescription.Session owner = semaphoreDescription.getOwnersList().getFirst();
+            if (owner.getId() != coordinationSession.getId()) {
+                logger.warn(
+                        "Current session with id: {} lost lease after reconnection on semaphore: {}",
+                        owner.getId(),
+                        semaphoreName
+                );
+                internalRelease();
+            }
+        });
+    }
+
+    public boolean tryAcquire(@Nullable Duration duration, boolean exclusive, byte[] data) throws Exception {
+        logger.debug("Trying to acquire with deadline: {}", duration);
+        Instant deadline = Instant.now().plus(duration);
+        return safeAcquire(deadline, exclusive, data);
+    }
+
+    public boolean release() {
+        return internalRelease();
+    }
+
+    // TODO: interruptible?
+    private synchronized boolean internalRelease() {
+        logger.debug("Trying to release");
+        if (processLease == null) {
+            logger.debug("Already released");
+            return false;
+        }
+
+        try {
+            return processLease.release().thenApply(it -> {
+                logger.debug("Released lock");
+                processLease = null;
+                return true;
+            }).get();
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * @param deadline
+     * @return true - if successfully acquired lock
+     * @throws Exception
+     * @throws LockAlreadyAcquiredException
+     * @throws LockAcquireFailedException
+     */
+    // TODO: deadlock? Move synchronized?
+    private synchronized boolean safeAcquire(
+            @Nullable Instant deadline,
+            boolean exclusive,
+            byte[] data
+    ) throws Exception {
+        if (processLease != null) {
+            logger.debug("Already acquired lock: {}", semaphoreName);
+            throw new LockAlreadyAcquiredException(coordinationNodePath, semaphoreName);
+        }
+
+        Optional<SemaphoreLease> lease = tryBlockingLock(deadline, true, data);
+        if (lease.isPresent()) {
+            processLease = lease.get();
+            logger.debug("Successfully acquired lock: {}", semaphoreName);
+            return true;
+        }
+
+        logger.debug("Unable to acquire lock: {}", semaphoreName);
+        return false;
+    }
+
+    private Optional<SemaphoreLease> tryBlockingLock(
+            @Nullable Instant deadline,
+            boolean exclusive,
+            byte[] data
+    ) throws Exception {
+        int retryCount = 0;
+        CoordinationSession coordinationSession = connectedSession();
+
+        while (coordinationSession.getState().isActive() && (deadline == null || Instant.now().isBefore(deadline))) {
+            retryCount++;
+
+            Duration timeout;
+            if (deadline == null) {
+                timeout = DEFAULT_TIMEOUT;
+            } else {
+                timeout = Duration.between(Instant.now(), deadline); // TODO: use external Clock instead of Instant?
+            }
+
+            CompletableFuture<Result<SemaphoreLease>> acquireTask = coordinationSession.acquireEphemeralSemaphore(
+                    semaphoreName, exclusive, data, timeout // TODO: change Session API to use deadlines
+            );
+            Result<SemaphoreLease> leaseResult;
+            try {
+                leaseResult = acquireTask.get();
+            } catch (InterruptedException e) {
+                // If acquire is interrupted, then release immediately
+                Thread.currentThread().interrupt();
+                acquireTask.thenAccept(acquireResult -> {
+                    if (!acquireResult.getStatus().isSuccess()) {
+                        return;
+                    }
+                    SemaphoreLease lease = acquireResult.getValue();
+                    lease.release();
+                });
+                throw e;
+            }
+
+            Status status = leaseResult.getStatus();
+            logger.debug("Lease result status: {}", status);
+
+            if (status.isSuccess()) {
+                logger.debug("Successfully acquired the lock");
+                return Optional.of(leaseResult.getValue());
+            }
+
+            if (status.getCode() == StatusCode.TIMEOUT) {
+                logger.debug("Trying to acquire semaphore {} again, retries: {}", semaphoreName, retryCount);
+                continue;
+            }
+
+            if (!status.getCode().isRetryable(true)) {
+                status.expectSuccess("Unable to retry acquiring semaphore");
+                throw new LockAcquireFailedException(coordinationNodePath, semaphoreName);
+            }
+        }
+
+        if (deadline != null && Instant.now().compareTo(deadline) >= 0) {
+            return Optional.empty();
+        }
+
+        throw new LockAcquireFailedException(coordinationNodePath, semaphoreName);
+    }
+
+    public String getCoordinationNodePath() {
+        return coordinationNodePath;
+    }
+
+    public String getSemaphoreName() {
+        return semaphoreName;
+    }
+
+    public CoordinationSession getCoordinationSession() {
+        return connectedSession();
+    }
+
+    public @Nullable SemaphoreLease getProcessLease() {
+        return processLease;
+    }
+
+    @Override
+    public Listenable<CoordinationSession.State> getListenable() {
+        return sessionListenerWrapper;
+    }
+
+    @Override
+    public void close() {
+        try {
+            release();
+        } catch (Exception ignored) {
+        }
+
+        session.close();
+        sessionListenerWrapper.clearListeners();
+    }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java
new file mode 100644
index 0000000..7b5ad67
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java
@@ -0,0 +1,79 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+import java.time.Duration;
+
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider;
+
+public class ReadWriteLock {
+    private final InternalLock readLock;
+    private final InternalLock writeLock;
+
+    public ReadWriteLock(
+            CoordinationClient client,
+            String coordinationNodePath,
+            String lockName
+    ) {
+        LockInternals lockInternals = new LockInternals(
+                client, coordinationNodePath, lockName
+        );
+        lockInternals.start();
+        // TODO: Share same lockInternals?
+        this.readLock = new InternalLock(lockInternals, false);
+        this.writeLock = new InternalLock(lockInternals, true);
+    }
+
+    public InterProcessLock writeLock() {
+        return readLock;
+    }
+
+    public InterProcessLock readLock() {
+        return writeLock;
+    }
+
+    private static class InternalLock implements InterProcessLock, ListenableProvider<CoordinationSession.State> {
+        private final LockInternals lockInternals;
+        private final boolean isExclisive;
+
+        private InternalLock(LockInternals lockInternals, boolean isExclisive) {
+            this.lockInternals = lockInternals;
+            this.isExclisive = isExclisive;
+        }
+
+        @Override
+        public void acquire() throws Exception {
+            lockInternals.tryAcquire(
+                    null,
+                    isExclisive,
+                    null
+            );
+        }
+
+        @Override
+        public boolean acquire(Duration waitDuration) throws Exception {
+            return lockInternals.tryAcquire(
+                    waitDuration,
+                    isExclisive,
+                    null
+            );
+        }
+
+        @Override
+        public boolean release() {
+            return lockInternals.release();
+        }
+
+        @Override
+        public boolean isAcquiredInThisProcess() {
+            return lockInternals.getProcessLease() != null;
+        }
+
+        @Override
+        public Listenable<CoordinationSession.State> getListenable() {
+            return lockInternals.getListenable();
+        }
+    }
+
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java
new file mode 100644
index 0000000..fa887a3
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java
@@ -0,0 +1,19 @@
+package tech.ydb.coordination.recipes.example.lib.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+
+public interface Listenable<T> {
+    void addListener(Consumer<T> listener);
+
+    /**
+     * Listener call will be processed in executor
+     * @param listener
+     * @param executor
+     */
+    void addListener(Consumer<T> listener, ExecutorService executor);
+
+    void removeListener(Consumer<T> listener);
+
+    void clearListeners();
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java
new file mode 100644
index 0000000..f4b80f2
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java
@@ -0,0 +1,29 @@
+package tech.ydb.coordination.recipes.example.lib.util;
+
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+
+public interface ListenableProvider<T> extends Listenable<T> {
+    Listenable<T> getListenable();
+
+    @Override
+    default void addListener(Consumer<T> listener) {
+        getListenable().addListener(listener);
+    }
+
+    @Override
+    default void addListener(Consumer<T> listener, ExecutorService executor) {
+        getListenable().addListener(listener, executor);
+    }
+
+    @Override
+    default void removeListener(Consumer<T> listener) {
+        getListenable().removeListener(listener);
+    }
+
+    @Override
+    default void clearListeners() {
+        getListenable().clearListeners();
+    }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java
new file mode 100644
index 0000000..0ef56d7
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java
@@ -0,0 +1,54 @@
+package tech.ydb.coordination.recipes.example.lib.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.CoordinationSession.State;
+
+public class SessionListenerWrapper implements Listenable<State> {
+    private final CoordinationSession session;
+    /**
+     * key - original (external) consumer, value - consumer wrapper or original consumer depending on executor
+     */
+    private final Map<Consumer<State>, Consumer<State>> listenersMapping = new HashMap<>();
+
+    public SessionListenerWrapper(CoordinationSession session) {
+        this.session = session;
+    }
+
+    @Override
+    public void addListener(Consumer<State> listener) {
+        if (listenersMapping.containsKey(listener)) {
+            return;
+        }
+
+        listenersMapping.put(listener, listener);
+        session.addStateListener(listener);
+    }
+
+    @Override
+    public void addListener(Consumer<State> listener, ExecutorService executor) {
+        if (listenersMapping.containsKey(listener)) {
+            return;
+        }
+
+        Consumer<State> wrapper = state -> executor.submit(() -> listener.accept(state));
+        listenersMapping.put(listener, wrapper);
+        session.addStateListener(wrapper);
+    }
+
+    @Override
+    public void removeListener(Consumer<State> listener) {
+        Consumer<State> removed = listenersMapping.remove(listener);
+        session.removeStateListener(removed);
+    }
+
+    @Override
+    public void clearListeners() {
+        listenersMapping.keySet().forEach(this::removeListener);
+        listenersMapping.clear();
+    }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java
new file mode 100644
index 0000000..568b6a3
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java
@@ -0,0 +1,61 @@
+package tech.ydb.coordination.recipes.example.lib.watch;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class Participant {
+    private final long id;
+    private final byte[] data;
+    private final long count;
+    private final boolean isLeader;
+
+    public Participant(long id, byte[] data, long count, boolean isLeader) {
+        this.id = id;
+        this.data = data;
+        this.count = count;
+        this.isLeader = isLeader;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public byte[] getData() {
+        return data;
+    }
+
+    public long getCount() {
+        return count;
+    }
+
+    public boolean isLeader() {
+        return isLeader;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Participant that = (Participant) o;
+        return id == that.id && count == that.count && isLeader == that.isLeader && Objects.deepEquals(data, that.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, Arrays.hashCode(data), count, isLeader);
+    }
+
+    @Override
+    public String toString() {
+        return "Participant{" +
+                "id=" + id +
+                ", data=" + Arrays.toString(data) +
+                ", count=" + count +
+                ", isLeader=" + isLeader +
+                '}';
+    }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java
new file mode 100644
index 0000000..d1b982d
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java
@@ -0,0 +1,178 @@
+package tech.ydb.coordination.recipes.example.lib.watch;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.description.SemaphoreDescription;
+import tech.ydb.coordination.description.SemaphoreWatcher;
+import tech.ydb.coordination.settings.DescribeSemaphoreMode;
+import tech.ydb.coordination.settings.WatchSemaphoreMode;
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+
+public class SemaphoreWatchAdapter implements Closeable {
+    private static final Logger logger = LoggerFactory.getLogger(SemaphoreWatchAdapter.class);
+
+    private final CoordinationSession session;
+    private final String semaphoreName;
+
+    private AtomicReference<State> state;
+    private Future<Void> watchTask;
+    private volatile WatchData watchData;
+
+    private enum State {
+        CREATED,
+        STARTED,
+        CLOSED
+    }
+
+    private class WatchData {
+        final long count;
+        final byte[] data;
+        final List<Participant> waiters;
+        final List<Participant> owners;
+        final List<Participant> participants;
+
+        WatchData(long count, byte[] data, List<Participant> waiters, List<Participant> owners) {
+            this.count = count;
+            this.data = data;
+            this.waiters = waiters;
+            this.owners = owners;
+            this.participants = Stream.concat(owners.stream(), waiters.stream()).collect(Collectors.toList());
+        }
+    }
+
+    public SemaphoreWatchAdapter(CoordinationSession session, String semaphoreName) {
+        this.session = session;
+        this.semaphoreName = semaphoreName;
+        this.state = new AtomicReference<>(State.CREATED);
+        this.watchTask = null;
+        this.watchData = null;
+    }
+
+    public List<Participant> getOwners() {
+        // TODO: block until initialized or throw exception or return default value or return Optional.empty()
+        Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+        return Collections.unmodifiableList(watchData.owners); // TODO: copy Participant.data[]?
+    }
+
+    public List<Participant> getWaiters() {
+        Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+        return Collections.unmodifiableList(watchData.waiters); // TODO: copy Participant.data[]?
+    }
+
+    public List<Participant> getParticipants() {
+        Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+        return Collections.unmodifiableList(watchData.participants); // TODO: copy Participant.data[]?
+    }
+
+    public long getCount() {
+        Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+        return watchData.count;
+    }
+
+    public byte[] getData() {
+        Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+        return watchData.data.clone();
+    }
+
+    public boolean start() {
+        Preconditions.checkState(state.compareAndSet(State.CREATED, State.STARTED), "Already started or closed");
+
+        return enqueueWatch();
+    }
+
+    private synchronized boolean enqueueWatch() {
+        if (watchIsQueued() && state.get() == State.STARTED) {
+            return false;
+        }
+
+        watchTask = watchSemaphore().thenCompose(status -> {
+            if (!status.isSuccess()) {
+                // TODO: stop watching on error?
+                logger.error("Wailed to watch semaphore: {} with status: {}", semaphoreName, status);
+            }
+
+            finish();
+            return null;
+        });
+        return true;
+    }
+
+    private boolean watchIsQueued() {
+        return watchTask != null;
+    }
+
+    private synchronized void finish() {
+        watchTask = null;
+        enqueueWatch();
+    }
+
+    private CompletableFuture<Status> watchSemaphore() {
+        return session.watchSemaphore(
+                semaphoreName,
+                DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS,
+                WatchSemaphoreMode.WATCH_DATA_AND_OWNERS
+        ).thenCompose(result -> {
+            Status status = result.getStatus();
+            if (!status.isSuccess()) {
+                return CompletableFuture.completedFuture(status);
+            }
+            SemaphoreWatcher watcher = result.getValue();
+            saveWatchState(watcher.getDescription());
+            return watcher.getChangedFuture().thenApply(Result::getStatus);
+        });
+    }
+
+    private void saveWatchState(SemaphoreDescription description) {
+        List<Participant> waitersList = description.getWaitersList().stream().map(it -> new Participant(
+                it.getId(),
+                it.getData(),
+                it.getCount(),
+                false
+        )).collect(Collectors.toList());
+        List<Participant> ownersList = description.getOwnersList().stream().map(it -> new Participant(
+                it.getId(),
+                it.getData(),
+                it.getCount(),
+                true
+        )).collect(Collectors.toList());
+
+        watchData = new WatchData(
+                description.getCount(),
+                description.getData(),
+                waitersList,
+                ownersList
+        );
+    }
+
+    private synchronized void stopWatch() {
+        Future<Void> task = watchTask;
+        if (task != null) {
+            task.cancel(true);
+        }
+        watchTask = null;
+    }
+
+    @Override
+    public void close() {
+        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Is not yet started");
+
+        stopWatch();
+    }
+}
diff --git a/coordination/recipes/src/main/resources/log4j2.xml b/coordination/recipes/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..1cf1cde
--- /dev/null
+++ b/coordination/recipes/src/main/resources/log4j2.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<Configuration status="WARN" shutdownHook="disable">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="[%level] %d{HH:mm:ss.SSS} [%t] %logger - %msg%n"/>
+        </Console>
+    </Appenders>
+
+    <Loggers>
+        <Logger name="io.perfmark" level="warn" additivity="false">
+            <AppenderRef ref="Console"/>
+        </Logger>
+        <Logger name="io.grpc" level="warn" additivity="false">
+            <AppenderRef ref="Console"/>
+        </Logger>
+        <Logger name="tech.ydb" level="info" additivity="false">
+            <AppenderRef ref="Console"/>
+        </Logger>
+
+<!--        <Logger name="tech.ydb.coordination.impl" level="trace" additivity="false">
+            <AppenderRef ref="Console"/>
+        </Logger>-->
+        <Root level="info" >
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
\ No newline at end of file