Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add new members in SessionImpl for multiplexed session. Add a … #2961

Merged
merged 6 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ SessionImpl createSession() {
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
options);
return new SessionImpl(spanner, session.getName(), options);
return new SessionImpl(
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -224,6 +225,39 @@ SessionImpl createSession() {
}
}

/**
* Create a multiplexed session and returns it to the given {@link SessionConsumer}. A multiplexed
* session is not affiliated with any GRPC channel. The given {@link SessionConsumer} is
* guaranteed to eventually get exactly 1 multiplexed session unless an error occurs. In case of
* an error on the gRPC calls, the consumer will receive one {@link
* SessionConsumer#onSessionCreateFailure(Throwable, int)} calls with the error.
*
* @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available.
*/
void createMultiplexedSession(SessionConsumer consumer) {
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
.getRpc()
.createSession(
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
null,
true);
SessionImpl sessionImpl =
new SessionImpl(
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), null);
consumer.onSessionReady(sessionImpl);
} catch (Throwable t) {
span.setStatus(t);
consumer.onSessionCreateFailure(t, 1);
} finally {
span.end();
}
}

/**
* Asynchronously creates a batch of sessions and returns these to the given {@link
* SessionConsumer}. This method may split the actual session creation over several gRPC calls in
Expand Down Expand Up @@ -311,7 +345,13 @@ private List<SessionImpl> internalBatchCreateSessions(
span.end();
List<SessionImpl> res = new ArrayList<>(sessionCount);
for (com.google.spanner.v1.Session session : sessions) {
res.add(new SessionImpl(spanner, session.getName(), options));
res.add(
new SessionImpl(
spanner,
session.getName(),
session.getCreateTime(),
session.getMultiplexed(),
options));
}
return res;
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ interface SessionTransaction {
ByteString readyTransactionId;
private final Map<SpannerRpc.Option, ?> options;
private volatile Instant lastUseTime;
@Nullable private final Instant createTime;
private final boolean isMultiplexed;
private ISpan currentSpan;

SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
Expand All @@ -107,6 +109,24 @@ interface SessionTransaction {
this.name = checkNotNull(name);
this.databaseId = SessionId.of(name).getDatabaseId();
this.lastUseTime = Instant.now();
this.createTime = null;
this.isMultiplexed = false;
}

SessionImpl(
SpannerImpl spanner,
String name,
com.google.protobuf.Timestamp createTime,
boolean isMultiplexed,
Map<SpannerRpc.Option, ?> options) {
this.spanner = spanner;
this.tracer = spanner.getTracer();
this.options = options;
this.name = checkNotNull(name);
this.databaseId = SessionId.of(name).getDatabaseId();
this.lastUseTime = Instant.now();
this.createTime = convert(createTime);
this.isMultiplexed = isMultiplexed;
}

@Override
Expand All @@ -130,6 +150,14 @@ Instant getLastUseTime() {
return lastUseTime;
}

Instant getCreateTime() {
return createTime;
}

boolean getIsMultiplexed() {
return isMultiplexed;
}

void markUsed(Instant instant) {
lastUseTime = instant;
}
Expand Down Expand Up @@ -455,4 +483,11 @@ boolean hasReadyTransaction() {
TraceWrapper getTracer() {
return tracer;
}

private Instant convert(com.google.protobuf.Timestamp timestamp) {
if (timestamp == null) {
return null;
}
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
MetricRegistryConstants.INSTRUMENTATION_SCOPE,
GaxProperties.getLibraryVersion(this.getOptions().getClass())));

static final String CREATE_MULTIPLEXED_SESSION = "CloudSpannerOperation.CreateMultiplexedSession";
static final String CREATE_SESSION = "CloudSpannerOperation.CreateSession";
static final String BATCH_CREATE_SESSIONS = "CloudSpannerOperation.BatchCreateSessions";
static final String BATCH_CREATE_SESSIONS_REQUEST =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,18 @@ public Session createSession(
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options)
throws SpannerException {
// By default sessions are not multiplexed
return createSession(databaseName, databaseRole, labels, options, false);
}

@Override
public Session createSession(
String databaseName,
@Nullable String databaseRole,
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options,
boolean isMultiplexed)
throws SpannerException {
CreateSessionRequest.Builder requestBuilder =
CreateSessionRequest.newBuilder().setDatabase(databaseName);
Session.Builder sessionBuilder = Session.newBuilder();
Expand All @@ -1614,6 +1626,7 @@ public Session createSession(
if (databaseRole != null && !databaseRole.isEmpty()) {
sessionBuilder.setCreatorRole(databaseRole);
}
sessionBuilder.setMultiplexed(isMultiplexed);
requestBuilder.setSession(sessionBuilder);
CreateSessionRequest request = requestBuilder.build();
GrpcCallContext context =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,16 @@ Session createSession(
@Nullable Map<Option, ?> options)
throws SpannerException;

default Session createSession(
String databaseName,
@Nullable String databaseRole,
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options,
boolean isMultiplexed)
throws SpannerException {
throw new UnsupportedOperationException("Unimplemented");
}

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,9 @@ public void createSession(
CreateSessionRequest request, StreamObserver<Session> responseObserver) {
requests.add(request);
Preconditions.checkNotNull(request.getDatabase());
Preconditions.checkNotNull(request.getSession());
String name = generateSessionName(request.getDatabase());
Session requestSession = request.getSession();
try {
createSessionExecutionTime.simulateExecutionTime(
exceptions, stickyGlobalExceptions, freezeLock);
Expand All @@ -868,6 +870,7 @@ public void createSession(
.setCreateTime(now)
.setName(name)
.setApproximateLastUseTime(now)
.setMultiplexed(requestSession.getMultiplexed())
.build();
Session prev = sessions.putIfAbsent(name, session);
if (prev == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
Expand All @@ -29,6 +32,7 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.common.collect.ImmutableMap;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import java.util.ArrayList;
Expand Down Expand Up @@ -151,6 +155,81 @@ public void createAndCloseSession() {
}
}

@Test
public void createAndCloseMultiplexedSession() {
DatabaseId db = DatabaseId.of(dbName);
String sessionName = dbName + "/sessions/s1";
Map<String, String> labels = ImmutableMap.of("env", "dev");
String databaseRole = "role";
when(spannerOptions.getSessionLabels()).thenReturn(labels);
when(spannerOptions.getDatabaseRole()).thenReturn(databaseRole);
com.google.spanner.v1.Session sessionProto =
com.google.spanner.v1.Session.newBuilder()
.setName(sessionName)
.setMultiplexed(true)
.putAllLabels(labels)
.build();
when(rpc.createSession(
Mockito.eq(dbName),
Mockito.eq(databaseRole),
Mockito.eq(labels),
options.capture(),
Mockito.eq(true)))
.thenReturn(sessionProto);
final AtomicInteger returnedSessionCount = new AtomicInteger();
final SessionConsumer consumer =
new SessionConsumer() {
@Override
public void onSessionReady(SessionImpl session) {
assertEquals(sessionName, session.getName());
returnedSessionCount.incrementAndGet();

session.close();
Mockito.verify(rpc).deleteSession(sessionName, options.getValue());
}

@Override
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {}
};
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.createMultiplexedSession(consumer);
}
// for multiplexed session there is no channel hint pass in the RPC options
assertNull(options.getValue());
assertEquals(1, returnedSessionCount.get());
}

@Test
public void createAndCloseMultiplexedSession_whenRPCThrowsException_thenAssertException() {
DatabaseId db = DatabaseId.of(dbName);
Map<String, String> labels = ImmutableMap.of("env", "dev");
String databaseRole = "role";
when(spannerOptions.getSessionLabels()).thenReturn(labels);
when(spannerOptions.getDatabaseRole()).thenReturn(databaseRole);
when(rpc.createSession(
Mockito.eq(dbName),
Mockito.eq(databaseRole),
Mockito.eq(labels),
options.capture(),
Mockito.eq(true)))
.thenThrow(RuntimeException.class);
final SessionConsumer consumer =
new SessionConsumer() {
@Override
public void onSessionReady(SessionImpl session) {}

@Override
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
assertTrue(t instanceof RuntimeException);
}
};
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.createMultiplexedSession(consumer);
}
// for multiplexed session there is no channel hint pass in the RPC options
assertNull(options.getValue());
}

@SuppressWarnings("unchecked")
@Test
public void batchCreateAndCloseSessions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.GetSessionRequest;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.SpannerGrpc;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.StructType.Field;
Expand Down Expand Up @@ -668,6 +669,42 @@ public void testAdminStubSettings_whenStubNotInitialized_assertNullClientSetting
rpc.shutdown();
}

@Test
public void testCreateSession_assertSessionProto() {
SpannerOptions options = createSpannerOptions();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, true);

Session session = rpc.createSession("DATABASE_NAME", null, null, null);
assertNotNull(session);
assertNotNull(session.getCreateTime());
assertEquals(false, session.getMultiplexed());
rpc.shutdown();
}

@Test
public void testCreateSession_whenMultiplexedSessionIsTrue_assertSessionProto() {
SpannerOptions options = createSpannerOptions();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, true);

Session session = rpc.createSession("DATABASE_NAME", null, null, null, true);
assertNotNull(session);
assertNotNull(session.getCreateTime());
assertEquals(true, session.getMultiplexed());
rpc.shutdown();
}

@Test
public void testCreateSession_whenMultiplexedSessionIsFalse_assertSessionProto() {
SpannerOptions options = createSpannerOptions();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, true);

Session session = rpc.createSession("DATABASE_NAME", null, null, null, false);
assertNotNull(session);
assertNotNull(session.getCreateTime());
assertEquals(false, session.getMultiplexed());
rpc.shutdown();
}

private SpannerOptions createSpannerOptions() {
String endpoint = address.getHostString() + ":" + server.getPort();
return SpannerOptions.newBuilder()
Expand Down
Loading