diff --git a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java index 84a8287e..ca8eb5b3 100644 --- a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java +++ b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java @@ -184,7 +184,7 @@ private static class ChatMessages { private final List newMessages; private final List allMessages; - private final List newChatMessageContent; + private final List> newChatMessageContent; public ChatMessages(List allMessages) { this.allMessages = Collections.unmodifiableList(allMessages); @@ -195,7 +195,7 @@ public ChatMessages(List allMessages) { private ChatMessages( List allMessages, List newMessages, - List newChatMessageContent) { + List> newChatMessageContent) { this.allMessages = Collections.unmodifiableList(allMessages); this.newMessages = Collections.unmodifiableList(newMessages); this.newChatMessageContent = Collections.unmodifiableList(newChatMessageContent); @@ -219,8 +219,8 @@ public ChatMessages add(ChatRequestMessage requestMessage) { } @CheckReturnValue - public ChatMessages addChatMessage(List chatMessageContent) { - ArrayList tmpChatMessageContent = new ArrayList<>( + public ChatMessages addChatMessage(List> chatMessageContent) { + ArrayList> tmpChatMessageContent = new ArrayList<>( newChatMessageContent); tmpChatMessageContent.addAll(chatMessageContent); @@ -357,19 +357,16 @@ private Mono internalChatMessageContentsAsync( // If we don't want to attempt to invoke any functions // Or if we are auto-invoking, but we somehow end up with other than 1 choice even though only 1 was requested if (autoInvokeAttempts == 0 || responseMessages.size() != 1) { - return getChatMessageContentsAsync(completions) - .flatMap(m -> { - return Mono.just(messages.addChatMessage(m)); - }); + List> chatMessageContents = getChatMessageContentsAsync(completions); + return Mono.just(messages.addChatMessage(chatMessageContents)); } // Or if there are no tool calls to be done ChatResponseMessage response = responseMessages.get(0); List toolCalls = response.getToolCalls(); if (toolCalls == null || toolCalls.isEmpty()) { - return getChatMessageContentsAsync(completions) - .flatMap(m -> { - return Mono.just(messages.addChatMessage(m)); - }); + List> chatMessageContents = getChatMessageContentsAsync( + completions); + return Mono.just(messages.addChatMessage(chatMessageContents)); } ChatRequestAssistantMessage requestMessage = new ChatRequestAssistantMessage( @@ -592,7 +589,7 @@ private OpenAIFunctionToolCall extractOpenAIFunctionToolCall( arguments); } - private Mono> getChatMessageContentsAsync( + private List> getChatMessageContentsAsync( ChatCompletions completions) { FunctionResultMetadata completionMetadata = FunctionResultMetadata.build( completions.getId(), @@ -606,22 +603,28 @@ private Mono> getChatMessageContentsAsync( .filter(Objects::nonNull) .collect(Collectors.toList()); - return Flux.fromIterable(responseMessages) - .flatMap(response -> { + List> chatMessageContent = + responseMessages + .stream() + .map(response -> { try { - return Mono.just(new OpenAIChatMessageContent( + return new OpenAIChatMessageContent<>( AuthorRole.ASSISTANT, response.getContent(), this.getModelId(), null, null, completionMetadata, - formOpenAiToolCalls(response))); - } catch (Exception e) { - return Mono.error(e); + formOpenAiToolCalls(response)); + } catch (SKCheckedException e) { + LOGGER.warn("Failed to form chat message content", e); + return null; } }) - .collectList(); + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + return chatMessageContent; } private List> toOpenAIChatMessageContent( @@ -931,7 +934,7 @@ private static boolean hasToolCallBeenExecuted(List chatRequ } private static List getChatRequestMessages( - List messages) { + List> messages) { if (messages == null || messages.isEmpty()) { return new ArrayList<>(); } diff --git a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatMessageContent.java b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatMessageContent.java index f2cbf858..89f45014 100644 --- a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatMessageContent.java +++ b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatMessageContent.java @@ -36,7 +36,7 @@ public OpenAIChatMessageContent( @Nullable String modelId, @Nullable T innerContent, @Nullable Charset encoding, - @Nullable FunctionResultMetadata metadata, + @Nullable FunctionResultMetadata metadata, @Nullable List toolCall) { super(authorRole, content, modelId, innerContent, encoding, metadata); diff --git a/semantickernel-api/src/main/java/com/microsoft/semantickernel/services/chatcompletion/ChatHistory.java b/semantickernel-api/src/main/java/com/microsoft/semantickernel/services/chatcompletion/ChatHistory.java index 8d6bdce6..a8303061 100644 --- a/semantickernel-api/src/main/java/com/microsoft/semantickernel/services/chatcompletion/ChatHistory.java +++ b/semantickernel-api/src/main/java/com/microsoft/semantickernel/services/chatcompletion/ChatHistory.java @@ -5,11 +5,13 @@ import com.microsoft.semantickernel.services.chatcompletion.message.ChatMessageTextContent; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Spliterator; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import javax.annotation.Nullable; @@ -18,7 +20,7 @@ */ public class ChatHistory implements Iterable> { - private final List> chatMessageContents; + private final Collection> chatMessageContents; /** * The default constructor @@ -33,7 +35,7 @@ public ChatHistory() { * @param instructions The instructions to add to the chat history */ public ChatHistory(@Nullable String instructions) { - this.chatMessageContents = new ArrayList<>(); + this.chatMessageContents = new ConcurrentLinkedQueue<>(); if (instructions != null) { this.chatMessageContents.add( ChatMessageTextContent.systemMessage(instructions)); @@ -45,8 +47,8 @@ public ChatHistory(@Nullable String instructions) { * * @param chatMessageContents The chat message contents to add to the chat history */ - public ChatHistory(List chatMessageContents) { - this.chatMessageContents = new ArrayList(chatMessageContents); + public ChatHistory(List> chatMessageContents) { + this.chatMessageContents = new ConcurrentLinkedQueue<>(chatMessageContents); } /** @@ -55,7 +57,7 @@ public ChatHistory(List chatMessageContents) { * @return List of messages in the chat */ public List> getMessages() { - return Collections.unmodifiableList(chatMessageContents); + return Collections.unmodifiableList(new ArrayList<>(chatMessageContents)); } /** @@ -67,7 +69,7 @@ public Optional> getLastMessage() { if (chatMessageContents.isEmpty()) { return Optional.empty(); } - return Optional.of(chatMessageContents.get(chatMessageContents.size() - 1)); + return Optional.of(((ConcurrentLinkedQueue>)chatMessageContents).peek()); } /** @@ -114,7 +116,7 @@ public Spliterator> spliterator() { * @param metadata The metadata of the message */ public ChatHistory addMessage(AuthorRole authorRole, String content, Charset encoding, - FunctionResultMetadata metadata) { + FunctionResultMetadata metadata) { chatMessageContents.add( ChatMessageTextContent.builder() .withAuthorRole(authorRole) diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java new file mode 100644 index 00000000..d425fe00 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java @@ -0,0 +1,124 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import java.util.List; + +import javax.annotation.Nullable; + +import reactor.core.publisher.Mono; + +/** + * Base abstraction for all Semantic Kernel agents. An agent instance + * may participate in one or more conversations, or {@link AgentChat}. + * A conversation may include one or more agents. + * + * In addition to identity and descriptive meta-data, an {@link Agent} + * must define its communication protocol, or {@link AgentChannel}. + * + * @param The type of {@code AgentChannel} associated with the agent. + */ +public abstract class Agent { + + /** + * The description of the agent (optional) + */ + private final String description; + + /** + * The identifier of the agent (optional). + * Default to a random guid value, but may be overridden. + */ + private final String id; + + /** + * The name of the agent (optional) + */ + private final String name; + + /** + * Construct a new {@link Agent} instance. + * @param id The identifier of the agent. + * @param name The name of the agent. + * @param description The description of the agent. + */ + protected Agent( + @Nullable String id, + @Nullable String name, + @Nullable String description) { + this.id = id; + this.name = name; + this.description = description; + } + + /** + * Get the description of the agent. + * @return The description of the agent. + */ + public String getDescription() { + return description; + } + + /** + * Get the identifier of the agent. + * @return The identifier of the agent. + */ + public String getId() { + return id; + } + + /** + * Get the name of the agent. + * @return The name of the agent. + */ + public String getName() { + return name; + } + + /** + * Set of keys to establish channel affinity. + * Two specific agents of the same type may each require their own channel. This is + * why the channel type alone is insufficient. + * For example, two OpenAI Assistant agents each targeting a different Azure OpenAI endpoint + * would require their own channel. In this case, the endpoint could be expressed as an additional key. + */ + protected abstract List getChannelKeys(); + + /** + * Produce the an {@link AgentChannel} appropriate for the agent type. + * Every agent conversation, or {@link AgentChat}, will establish one or more + * {@link AgentChannel} objects according to the specific {@link Agent} type. + * + * @return An {@link AgentChannel} appropriate for the agent type. + */ + protected abstract Mono createChannelAsync(); + + /** + * Base class for agent builders. + */ + public abstract static class Builder { + + protected String id; + protected String name; + protected String description; + + public Builder withId(String id) { + this.id = id; + return this; + } + + public Builder withName(String name) { + this.name = name; + return this; + } + + public Builder withDescription(String description) { + this.description = description; + return this; + } + + public abstract TAgent build(); + + protected Builder() { + } + } +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java new file mode 100644 index 00000000..627a68f6 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import java.util.List; + +import javax.annotation.Nonnull; + +import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; + +import reactor.core.publisher.Mono; + +/** + * Defines the communication protocol for a particular {@code Agent} type. + * An {@code Agent} provides its own {@code AgentChannel} via + * {@link Agent#createChannelAsync()}. + * @param The type of agent that this channel is associated with. + */ +public interface AgentChannel { + + + /** + * Receive the conversation messages. Used when joining a conversation and also during each agent interaction. + * + * @param history The chat history at the point the channel is created. + * @return A future task that completes when the conversation messages are received. + */ + Mono receiveAsync(List> history); + + /** + * Perform a discrete incremental interaction between a single Agent and AgentChat. + * @param agent The agent actively interacting with the chat. + * @return Asynchronous enumeration of messages. + */ + Mono>> invokeAsync(@Nonnull Agent agent); + + /** + * Retrieve the message history specific to this channel. + * @return Asynchronous enumeration of messages. + */ + Mono>> getHistoryAsync(); +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java new file mode 100644 index 00000000..4b7a2181 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java @@ -0,0 +1,205 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import javax.annotation.Nonnull; + +import com.microsoft.semantickernel.agents.internal.BroadcastQueue; +import com.microsoft.semantickernel.agents.internal.ChannelReference; +import com.microsoft.semantickernel.agents.internal.KeyEncoder; +import com.microsoft.semantickernel.exceptions.SKException; +import com.microsoft.semantickernel.services.chatcompletion.AuthorRole; +import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; +import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; + +import reactor.core.publisher.Mono; + +/** + * Point of interaction for one or more agents. + * + * An AgentChat instance does not support concurrent invocation and + * are synchronized using {@code java.util.concurrent.locks.Lock}. Any + * thread attempting to invoke a public method while another thread is + * holding the lock will block until the lock is released. + */ +public abstract class AgentChat { + + private final BroadcastQueue broadcastQueue; + // Map channel hash to channel: one entry per channel. + private final Map agentChannels; + // Map agent to its channel-hash: one entry per agent. + private final Map channelMap; + private final ChatHistory chatHistory; + private final Lock lock; + + /** + * Process a series of interactions between the agents participating in this chat. + * + * @return Asynchronous enumeration of messages. + */ + public abstract List> invokeAsync(); + + /** + * Retrieve the chat history. + * + * @return The message history + */ + public Mono>> getChatMessagesAsync() { + return getChatMessagesAsync(null); + } + + /** + * Retrieve the message history, either the primary history or + * an agent specific version. + * + * @param agent An optional agent, if requesting an agent history. + * @return The message history + * + * Any AgentChat instance does not support concurrent invocation and + * will throw exception if concurrent activity is attempted. + */ + public Mono>> getChatMessagesAsync(Agent agent) { + lock.lock(); + try { + if (agent == null) { + return Mono.just(chatHistory.getMessages()); + } else { + String channelHash = getAgentHash(agent); + return synchronizeChannelAsync(channelHash) + .flatMap(channel -> channel.getHistoryAsync()); + } + } finally { + lock.unlock(); + } + } + + /** + * Append a message to the conversation. Adding a message while an agent is active is not allowed. + * + * @param message A non-system message with which to append to the conversation. + * @throws KernelException if a system message is present, without taking any other action + * @throws KernelException chat has current activity. + */ + public void addChatMessage(ChatMessageContent message) { + addChatMessages(Arrays.asList(message)); + } + + /** + * Append messages to the conversation. Adding messages while an agent is active is not allowed. + * + * @param messages Set of non-system messages with which to append to the conversation. + * @throws KernelException if a system message is present, without taking any other action + * @throws KernelException chat has current activity. + */ + public void addChatMessages(List> messages) { + lock.lock(); + try { + + if (messages.stream().anyMatch(it -> it.getAuthorRole() == AuthorRole.SYSTEM)) { + throw new SKException( + String.format("History does not support messages with Rople of %s", AuthorRole.SYSTEM)); + } + + // Append chat history + chatHistory.addAll(messages); + + // Broadcast message to other channels (in parallel) + // Note: Able to queue messages without synchronizing channels. + List channelRefs = + agentChannels.entrySet().stream() + .map(entry -> new ChannelReference(entry.getValue(), entry.getKey())) + .collect(Collectors.toList()); + + broadcastQueue.enqueue(channelRefs, messages); + + } finally { + lock.unlock(); + } + } + + /** + * Construct a new {@link AgentChat} instance. + */ + protected AgentChat() { + broadcastQueue = new BroadcastQueue(); + agentChannels = new HashMap<>(); + channelMap = new HashMap<>(); + chatHistory = new ChatHistory(); + lock = new ReentrantLock(); + } + + /** + * Exposes the internal history to subclasses. + */ + protected ChatHistory getHistory() { + return chatHistory; + } + + /** + * Process a discrete incremental interaction between a single Agent an a AgentChat. + * + * @param agent The agent actively interacting with the chat. + * @return Asynchronous enumeration of messages. + */ + protected Mono>> invokeAgentAsync(Agent agent) { + lock.lock(); + try { + return getOrCreateChannel(agent) + .flatMap(channel -> channel.invokeAsync(agent) + .doOnNext(messages -> { + chatHistory.addAll(messages); + List channelRefs = agentChannels.entrySet().stream() + .filter(entry -> !entry.getKey().equals(agent)) + .map(entry -> new ChannelReference(entry.getValue(), entry.getKey())) + .collect(Collectors.toList()); + broadcastQueue.enqueue(channelRefs, messages); + }) + ); + } finally { + lock.unlock(); + } + } + + private String getAgentHash(@Nonnull Agent agent) { + String hash = channelMap.computeIfAbsent(agent, key -> KeyEncoder.generateHash(key.getChannelKeys())); + return hash; + } + + private Mono synchronizeChannelAsync(String channelHash) { + AgentChannel channel = agentChannels.get(channelHash); + if (channel != null) { + return broadcastQueue.ensureSynchronizedAsync( + new ChannelReference(channel, channelHash) + ); + } + return Mono.empty(); + } + + private Mono getOrCreateChannel(Agent agent) { + String channelHash = getAgentHash(agent); + return synchronizeChannelAsync(channelHash) + .flatMap(channel -> { + if (channel == null) { + return agent.createChannelAsync() + .doOnNext(newChannel -> { + agentChannels.put(channelHash, newChannel); + }) + .flatMap(newChannel -> { + if (chatHistory.getMessages().size() > 0) { + return newChannel.receiveAsync(chatHistory.getMessages()); + } + return Mono.empty(); + }) + .then(Mono.just(agentChannels.get(channelHash))); + } + return Mono.just(channel); + }); + } +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java new file mode 100644 index 00000000..23fe342b --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import javax.annotation.Nullable; + +import com.microsoft.semantickernel.Kernel; + +/** + * Base class for agents utilizing {@link Microsoft.SemanticKernel.Kernel} plugins or services. + * @param The type of {@code AgentChannel} associated with the agent. + */ +public abstract class KernelAgent extends Agent { + + private final String instructions; + private final Kernel kernel; + + protected KernelAgent( + @Nullable String id, + @Nullable String name, + @Nullable String description, + @Nullable String instructions, + Kernel kernel) { + super(id, name, description); + this.instructions = instructions; + this.kernel = kernel; + } + + /** + * The instructions of the agent (optional). + */ + public String getInstructions() { + return instructions; + } + + /** + * The {@link Kernel} containing services, plugins, and filters for use throughout the agent lifetime. + * Defaults to empty Kernel, but may be overridden. + */ + public Kernel getKernel() { + return kernel; + + } + + /** + * Builder for {@link KernelAgent} instances. + */ + public abstract static class Builder extends Agent.Builder { + + protected String instructions; + protected Kernel kernel; + + public Builder withInstructions(String instructions) { + this.instructions = instructions; + return this; + } + + public Builder withKernel(Kernel kernel) { + this.kernel = kernel; + return this; + } + + protected Builder() { + super(); + } + } +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/BroadcastQueue.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/BroadcastQueue.java new file mode 100644 index 00000000..9d49447d --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/BroadcastQueue.java @@ -0,0 +1,250 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents.internal; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.reactivestreams.Publisher; + +import com.microsoft.semantickernel.agents.Agent; +import com.microsoft.semantickernel.agents.AgentChannel; +import com.microsoft.semantickernel.exceptions.SKException; +import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; + +import reactor.core.publisher.Mono; + +/** + * Utility class used by {@link AgentChat} to manage the broadcast of + * conversation messages via the {@link com.microsoft.semantickernel.agents.AgentChannel#receiveAsync}. + * Interaction occurs via two methods: + *
    + *
  • {@link BroadcastQueue#enqueue}: Adds messages to a channel specific queue for processing.
  • + *
  • {@link BroadcastQueue#ensureSynchronizedAsync}: Blocks until the specified channel's processing queue is empty.
  • + *
+ * Maintains a set of channel specific queues, each with individual locks. + * Queue specific locks exist to synchronize access to an individual queue only. + * Due to the closed "friend" relationship between with {@link AgentChat}, + * {@link BroadcastQueue} is never invoked concurrently, which eliminates + * race conditions over the queue dictionary. + */ +public class BroadcastQueue { + + /** + * The queue reference structure. + */ + private static class QueueReference { + + private final ConcurrentLinkedQueue>> queue = new ConcurrentLinkedQueue<>(); + private final Lock queueLock = new ReentrantLock(); + private FutureTask receiveTask; + + // Any failure that occured during execution of {@link #receiveTask}. + private Exception receiveFailure; + + /** + * Convenience logic + */ + private boolean isEmpty() { + return this.queue.isEmpty(); + } + + + private Lock getQueueLock() { + return queueLock; + } + + private Queue>> getQueue() { + return queue; + } + + /** + * Capture any failure that may occur during execution of {@link #receiveTask}. + */ + private void setReceiveFailure(Exception receiveFailure) { + this.receiveFailure = receiveFailure; + } + + private Exception getReceiveFailure() { + return receiveFailure; + } + + private FutureTask getReceiveTask() { + return receiveTask; + } + + private void setReceiveTask(FutureTask receiveTask) { + this.receiveTask = receiveTask; + } + } + + private final Map queues = new ConcurrentHashMap<>(); + + + // Defines the yield duration when waiting on a channel-queue to drain. + // TODO: This should be a configuration setting. See Duration#parse + private static final Duration blockDuration = Duration.ofMillis(100L); + + private final ExecutorService executorService = + Executors.newCachedThreadPool(runnable -> { + Thread thread = Executors.defaultThreadFactory().newThread(runnable); + thread.setDaemon(true); + return thread; + } + ); + + /** + * Enqueue a set of messages for a given channel. + * + * @param channelRefs The target channels for which to broadcast. + * @param messages The messages being broadcast. + */ + public void enqueue(List channelRefs, List> messages) { + for (ChannelReference channelRef : channelRefs) { + QueueReference queueRef = queues.computeIfAbsent(channelRef.getHash(), key -> new QueueReference()); + + queueRef.getQueueLock().lock(); + try { + queueRef.getQueue().add(messages); + if (queueRef.getReceiveTask() == null || queueRef.getReceiveTask().isDone()) { + queueRef.setReceiveTask(new FutureTask<>(receiveAsync(channelRef, queueRef), null)); + executorService.submit(queueRef.getReceiveTask()); + } + } finally { + queueRef.getQueueLock().unlock(); + } + } + + } + + /** + * Blocks until a channel-queue is not in a receive state to ensure that + * channel history is complete. + * + * @param channelRef A {@link ChannelReference} structure. + * @return false when channel is no longer receiving. + * @throws KernelException When channel is out of sync. + */ + public Mono ensureSynchronizedAsync(ChannelReference channelRef) { + // Either won race with Enqueue or lost race with ReceiveAsync. + // Missing queue is synchronized by definition. + QueueReference queueRef = queues.get(channelRef.getHash()); + if (queueRef == null) { + return Mono.just(channelRef.getChannel()); + } + + FutureTask receiveTask = queueRef.getReceiveTask(); + if (receiveTask == null || receiveTask.isDone()) { + return Mono.just(channelRef.getChannel()); + } + + return Mono.fromRunnable(() -> { + try { + receiveTask.get(blockDuration.toMillis(), TimeUnit.MILLISECONDS); + } catch (CancellationException | TimeoutException e) { + // TODO: Should log the TimeoutException + // Swallow the exception and move on. + // If a TimeoutException occurs, the queue is probably still processing. + // If a CancellationException occurs, the task was cancelled so there is no point in waiting. + } catch (InterruptedException e) { + // Propogate the interrupt + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + // Exception thrown by the receiveTask + queueRef.setReceiveFailure(e); + } + + // Propagate prior failure (inform caller of synchronization issue) + Exception failure = queueRef.getReceiveFailure(); + if (failure != null) { + queueRef.setReceiveFailure(null); + throw new SKException("Unexpected failure broadcasting to channel: " + channelRef.getChannel().getClass(), failure); + } + }) + .then(Mono.just(channelRef.getChannel())); + } + + /** + * Processes the specified queue with the provided channel, until queue is empty. + * @param channelRef the channel reference + * @param queueRef the queue reference + */ + private static Runnable receiveAsync(ChannelReference channelRef, QueueReference queueRef) + { + return () -> { + // Need to capture any failure that may occur during execution of receiveTask. + // It's an array to get around the final requirement for lambdas. + Exception[] failures = new Exception[1]; + + boolean isEmpty = true; // Default to fall-through state + + // This is a somewhat faithful translation of the .NET code. + do + { + failures[0] = null; + + Mono receiveTask; + + // Queue state is only changed within acquired QueueLock. + // If its empty here, it is synchronized. + queueRef.getQueueLock().lock(); + try { + isEmpty = queueRef.isEmpty(); + + // Process non empty queue + if (isEmpty) { + break; + } + + List> messages = queueRef.getQueue().peek(); + receiveTask = channelRef.getChannel().receiveAsync(messages); + } finally { + queueRef.getQueueLock().unlock(); + } + + // Queue not empty. + receiveTask.onErrorMap(e -> { + if (e instanceof Exception) { + failures[0] = (Exception)e; + } + return e; + }) + .block(); + + queueRef.getQueueLock().lock(); + try { + // Propagate failure or update queue + if (failures[0] != null) { + queueRef.setReceiveFailure(failures[0]); + break; // Failure on non-empty queue means, still not empty. + } + + // Queue has already been peeked. Remove head on success. + queueRef.getQueue().remove(); + + isEmpty = queueRef.isEmpty(); // Re-evaluate state + } finally { + queueRef.getQueueLock().unlock(); + } + } + while (!isEmpty); + }; + } + +} + diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/ChannelReference.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/ChannelReference.java new file mode 100644 index 00000000..b0b5f573 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/ChannelReference.java @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents.internal; + +import com.microsoft.semantickernel.agents.AgentChannel; + +/** + * Tracks channel along with its hashed key. + */ +public class ChannelReference { + + private final AgentChannel channel; + private final String hash; + + public ChannelReference(AgentChannel channel, String hash) { + this.channel = channel; + this.hash = hash; + } + + /** + * The referenced channel. + * @return The referenced channel. + */ + public AgentChannel getChannel() { + return channel; + } + + /** + * The channel hash. + * @return The channel hash. + */ + public String getHash() { + return hash; + } +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/KeyEncoder.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/KeyEncoder.java new file mode 100644 index 00000000..78f0f062 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/KeyEncoder.java @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents.internal; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; + +import com.microsoft.semantickernel.exceptions.SKException; + +import java.util.Base64; + +/** + * Utility to encode a list of string keys to a base-64 encoded hash. + */ +public class KeyEncoder { + /** + * Produces a base-64 encoded hash for a set of input strings. + * + * @param keys A set of input strings + * @return A base-64 encoded hash + */ + public static String generateHash(List keys) { + final MessageDigest digest; + try { + digest = MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException e) { + // if this happens, something is very wrong with the JVM + throw new SKException("Failed to generate hash", e); + } + String key = String.join(":", keys); + byte[] hash = digest.digest(key.getBytes(StandardCharsets.UTF_8)); + return Base64.getEncoder().encodeToString(hash); + } +}