Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add new members in SessionImpl for multiplexed session. Add a … #2961

Merged
merged 6 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,9 @@ void beforeReadOrQueryLocked() {
// Note that transactions are invalidated under some circumstances on the backend, but we
// implement the check more strictly here to encourage coding to contract rather than the
// implementation.
checkState(isValid, "Context has been invalidated by a new operation on the session");
if (!session.getIsMultiplexed()) {
checkState(isValid, "Context has been invalidated by a new operation on the session");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works here now, but I'm not sure this is the actual thing that needs to be changed to allow multiple operations on the same session. The isValid flag is set to false when invalidate() is called. That is called from SessionImpl#setActive(SessionTransaction) method. That again sets the activeTransaction on a SessionImpl. That field is what I think needs to be refactored for multiplexed sessions, as SessionImpl#activeTransaction is not really a thing anymore, as one SessionImpl can now have multiple transactions. At least; the latter is what I get from the fact that we just create a normal SessionImpl instance when we create a multiplexed sessions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are a few more changes that should be done to generalize SessionTransaction interface. I'll need to still do the changes. Will put that in a separate PR.

checkState(!isClosed, "Context has been closed");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ SessionImpl createSession() {
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
options);
return new SessionImpl(spanner, session.getName(), options);
return new SessionImpl(
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -224,6 +225,39 @@ SessionImpl createSession() {
}
}

/**
* Create a multiplexed session and returns it to the given {@link SessionConsumer}. A multiplexed
* session is not affiliated with any GRPC channel. The given {@link SessionConsumer} is
* guaranteed to eventually get exactly 1 multiplexed session unless an error occurs. In case of
* an error on the gRPC calls, the consumer will receive one or more {@link
* SessionConsumer#onSessionCreateFailure(Throwable, int)} calls with the error.
*
* @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available.
*/
void createMultiplexedSession(SessionConsumer consumer) {
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
.getRpc()
.createSession(
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
null,
true);
SessionImpl sessionImpl =
new SessionImpl(
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), null);
consumer.onSessionReady(sessionImpl);
} catch (RuntimeException e) {
span.setStatus(e);
consumer.onSessionCreateFailure(e, 1);
} finally {
span.end();
}
}

/**
* Asynchronously creates a batch of sessions and returns these to the given {@link
* SessionConsumer}. This method may split the actual session creation over several gRPC calls in
Expand Down Expand Up @@ -311,7 +345,13 @@ private List<SessionImpl> internalBatchCreateSessions(
span.end();
List<SessionImpl> res = new ArrayList<>(sessionCount);
for (com.google.spanner.v1.Session session : sessions) {
res.add(new SessionImpl(spanner, session.getName(), options));
res.add(
new SessionImpl(
spanner,
session.getName(),
session.getCreateTime(),
session.getMultiplexed(),
options));
}
return res;
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ interface SessionTransaction {
ByteString readyTransactionId;
private final Map<SpannerRpc.Option, ?> options;
private volatile Instant lastUseTime;
@Nullable private final Instant createTime;
private final boolean isMultiplexed;
private ISpan currentSpan;

SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
Expand All @@ -107,6 +109,24 @@ interface SessionTransaction {
this.name = checkNotNull(name);
this.databaseId = SessionId.of(name).getDatabaseId();
this.lastUseTime = Instant.now();
this.createTime = null;
this.isMultiplexed = false;
}

SessionImpl(
SpannerImpl spanner,
String name,
com.google.protobuf.Timestamp createTime,
boolean isMultiplexed,
Map<SpannerRpc.Option, ?> options) {
this.spanner = spanner;
this.tracer = spanner.getTracer();
this.options = options;
this.name = checkNotNull(name);
this.databaseId = SessionId.of(name).getDatabaseId();
this.lastUseTime = Instant.now();
this.createTime = convert(createTime);
this.isMultiplexed = isMultiplexed;
}

@Override
Expand All @@ -130,6 +150,14 @@ Instant getLastUseTime() {
return lastUseTime;
}

Instant getCreateTime() {
return createTime;
}

boolean getIsMultiplexed() {
return isMultiplexed;
}

void markUsed(Instant instant) {
lastUseTime = instant;
}
Expand Down Expand Up @@ -455,4 +483,11 @@ boolean hasReadyTransaction() {
TraceWrapper getTracer() {
return tracer;
}

private Instant convert(com.google.protobuf.Timestamp timestamp) {
if (timestamp == null) {
return null;
}
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
MetricRegistryConstants.INSTRUMENTATION_SCOPE,
GaxProperties.getLibraryVersion(this.getOptions().getClass())));

static final String CREATE_MULTIPLEXED_SESSION = "CloudSpannerOperation.CreateMultiplexedSession";
static final String CREATE_SESSION = "CloudSpannerOperation.CreateSession";
static final String BATCH_CREATE_SESSIONS = "CloudSpannerOperation.BatchCreateSessions";
static final String BATCH_CREATE_SESSIONS_REQUEST =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,18 @@ public Session createSession(
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options)
throws SpannerException {
// By default sessions are not multiplexed
return createSession(databaseName, databaseRole, labels, options, false);
}

@Override
public Session createSession(
String databaseName,
@Nullable String databaseRole,
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options,
boolean isMultiplexed)
throws SpannerException {
CreateSessionRequest.Builder requestBuilder =
CreateSessionRequest.newBuilder().setDatabase(databaseName);
Session.Builder sessionBuilder = Session.newBuilder();
Expand All @@ -1614,6 +1626,7 @@ public Session createSession(
if (databaseRole != null && !databaseRole.isEmpty()) {
sessionBuilder.setCreatorRole(databaseRole);
}
sessionBuilder.setMultiplexed(isMultiplexed);
requestBuilder.setSession(sessionBuilder);
CreateSessionRequest request = requestBuilder.build();
GrpcCallContext context =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,14 @@ Session createSession(
@Nullable Map<Option, ?> options)
throws SpannerException;

Session createSession(
String databaseName,
@Nullable String databaseRole,
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options,
boolean isMultiplexed)
throws SpannerException;

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,9 @@ public void createSession(
CreateSessionRequest request, StreamObserver<Session> responseObserver) {
requests.add(request);
Preconditions.checkNotNull(request.getDatabase());
Preconditions.checkNotNull(request.getSession());
String name = generateSessionName(request.getDatabase());
Session requestSession = request.getSession();
try {
createSessionExecutionTime.simulateExecutionTime(
exceptions, stickyGlobalExceptions, freezeLock);
Expand All @@ -868,6 +870,7 @@ public void createSession(
.setCreateTime(now)
.setName(name)
.setApproximateLastUseTime(now)
.setMultiplexed(requestSession.getMultiplexed())
.build();
Session prev = sessions.putIfAbsent(name, session);
if (prev == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.common.collect.ImmutableMap;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import java.util.ArrayList;
Expand Down Expand Up @@ -151,6 +152,82 @@ public void createAndCloseSession() {
}
}

@Test
public void createAndCloseMultiplexedSession() {
DatabaseId db = DatabaseId.of(dbName);
String sessionName = dbName + "/sessions/s1";
Map<String, String> labels = ImmutableMap.of("env", "dev");
String databaseRole = "role";
when(spannerOptions.getSessionLabels()).thenReturn(labels);
when(spannerOptions.getDatabaseRole()).thenReturn(databaseRole);
com.google.spanner.v1.Session sessionProto =
com.google.spanner.v1.Session.newBuilder()
.setName(sessionName)
.setMultiplexed(true)
.putAllLabels(labels)
.build();
when(rpc.createSession(
Mockito.eq(dbName),
Mockito.eq(databaseRole),
Mockito.eq(labels),
options.capture(),
Mockito.eq(true)))
.thenReturn(sessionProto);
final AtomicInteger returnedSessionCount = new AtomicInteger();
final SessionConsumer consumer =
new SessionConsumer() {
@Override
public void onSessionReady(SessionImpl session) {
assertThat(session.getName()).isEqualTo(sessionName);
returnedSessionCount.incrementAndGet();

session.close();
Mockito.verify(rpc).deleteSession(sessionName, options.getValue());
}

@Override
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {}
};
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.createMultiplexedSession(consumer);
}
// for multiplexed session there is no channel hint pass in the RPC options
assertThat(options.getValue()).isNull();
assertThat(returnedSessionCount.get()).isEqualTo(1);
}

@Test
public void createAndCloseMultiplexedSession_whenRPCThrowsException_thenAssertException() {
DatabaseId db = DatabaseId.of(dbName);
Map<String, String> labels = ImmutableMap.of("env", "dev");
String databaseRole = "role";
when(spannerOptions.getSessionLabels()).thenReturn(labels);
when(spannerOptions.getDatabaseRole()).thenReturn(databaseRole);
when(rpc.createSession(
Mockito.eq(dbName),
Mockito.eq(databaseRole),
Mockito.eq(labels),
options.capture(),
Mockito.eq(true)))
.thenThrow(RuntimeException.class);
final AtomicInteger returnedSessionCount = new AtomicInteger();
final SessionConsumer consumer =
new SessionConsumer() {
@Override
public void onSessionReady(SessionImpl session) {}

@Override
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
assertThat(t).isInstanceOf(RuntimeException.class);
}
};
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.createMultiplexedSession(consumer);
}
// for multiplexed session there is no channel hint pass in the RPC options
assertThat(options.getValue()).isNull();
}

@SuppressWarnings("unchecked")
@Test
public void batchCreateAndCloseSessions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.protobuf.ByteString;
Expand All @@ -60,6 +61,7 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -274,6 +276,49 @@ private static long utcTimeSeconds(int year, int month, int day, int hour, int m
return calendar.getTimeInMillis() / 1000;
}

@Test
public void multiplesSingleUseContext_whenMultiplexedSession_assertValidContext() {
final String dbName = "projects/p1/instances/i1/databases/d1";
final DatabaseId db = DatabaseId.of(dbName);
final String sessionName = dbName + "/sessions/s1";
final SpannerImpl spanner = new SpannerImpl(rpc, spannerOptions);
final SessionConsumer consumer =
new SessionConsumer() {
@Override
public void onSessionReady(SessionImpl session) {
ReadContext ctx1 = session.singleUse(TimestampBound.strong());
session.singleUse(TimestampBound.strong());
try {
ResultSet resultSet =
ctx1.read("Dummy", KeySet.all(), Collections.singletonList("C"));
assertNotNull(resultSet);
} catch (IllegalStateException ex) {
// in case of multiplexed session we should allow concurrent requests on same session
// and should not receive a IllegalStateException
Assert.fail();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need this catch block? I think that the test would also fail without it if we get an uncaught IllegalStateException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm removing test and the current change in AbstractReadContext outside of this PR. I'll need to think a bit more on how SessionTransaction interface should be refactored to handle all use-cases.

}

@Override
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {}
};
Session sessionProto =
Session.newBuilder()
.setCreateTime(getCurrentGoogleTimestamp())
.setName(sessionName)
.setMultiplexed(true)
.build();
Mockito.when(
rpc.createSession(
Mockito.eq(dbName),
Mockito.anyString(),
Mockito.anyMap(),
optionsCaptor.capture(),
Mockito.eq(true)))
.thenReturn(sessionProto);
spanner.getSessionClient(db).createMultiplexedSession(consumer);
}

@Test
public void newSingleUseContextClosesOldSingleUseContext() {
ReadContext ctx = session.singleUse(TimestampBound.strong());
Expand Down Expand Up @@ -515,4 +560,11 @@ public void multiUseReadOnlyTransactionReturnsMissingTransactionId() throws Pars
() -> txn.readRow("Dummy", Key.of(), Collections.singletonList("C")));
assertEquals(ErrorCode.INTERNAL, e.getErrorCode());
}

private com.google.protobuf.Timestamp getCurrentGoogleTimestamp() {
long current = System.currentTimeMillis();
long seconds = TimeUnit.MILLISECONDS.toSeconds(current);
int nanos = (int) TimeUnit.MILLISECONDS.toNanos(current - TimeUnit.SECONDS.toMillis(seconds));
return com.google.protobuf.Timestamp.newBuilder().setSeconds(seconds).setNanos(nanos).build();
}
}
Loading
Loading