Skip to content

Commit d0ba290

Browse files
arpan14olavloite
andauthored
chore: add new members in SessionImpl for multiplexed session. Add a … (#2961)
* chore: add new members in SessionImpl for multiplexed session. Add a new method to create multiplexed session. * chore: add unit tests. * Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java Co-authored-by: Knut Olav Løite <koloite@gmail.com> * fix: comments. * chore: prefer junit assertions. * chore: change to default method in SpannerRpc interface. --------- Co-authored-by: Knut Olav Løite <koloite@gmail.com>
1 parent b2dc788 commit d0ba290

File tree

9 files changed

+220
-3
lines changed

9 files changed

+220
-3
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

+42-2
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ SessionImpl createSession() {
215215
spanner.getOptions().getDatabaseRole(),
216216
spanner.getOptions().getSessionLabels(),
217217
options);
218-
return new SessionImpl(spanner, session.getName(), options);
218+
return new SessionImpl(
219+
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
219220
} catch (RuntimeException e) {
220221
span.setStatus(e);
221222
throw e;
@@ -224,6 +225,39 @@ SessionImpl createSession() {
224225
}
225226
}
226227

228+
/**
229+
* Create a multiplexed session and returns it to the given {@link SessionConsumer}. A multiplexed
230+
* session is not affiliated with any GRPC channel. The given {@link SessionConsumer} is
231+
* guaranteed to eventually get exactly 1 multiplexed session unless an error occurs. In case of
232+
* an error on the gRPC calls, the consumer will receive one {@link
233+
* SessionConsumer#onSessionCreateFailure(Throwable, int)} calls with the error.
234+
*
235+
* @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available.
236+
*/
237+
void createMultiplexedSession(SessionConsumer consumer) {
238+
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
239+
try (IScope s = spanner.getTracer().withSpan(span)) {
240+
com.google.spanner.v1.Session session =
241+
spanner
242+
.getRpc()
243+
.createSession(
244+
db.getName(),
245+
spanner.getOptions().getDatabaseRole(),
246+
spanner.getOptions().getSessionLabels(),
247+
null,
248+
true);
249+
SessionImpl sessionImpl =
250+
new SessionImpl(
251+
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), null);
252+
consumer.onSessionReady(sessionImpl);
253+
} catch (Throwable t) {
254+
span.setStatus(t);
255+
consumer.onSessionCreateFailure(t, 1);
256+
} finally {
257+
span.end();
258+
}
259+
}
260+
227261
/**
228262
* Asynchronously creates a batch of sessions and returns these to the given {@link
229263
* SessionConsumer}. This method may split the actual session creation over several gRPC calls in
@@ -311,7 +345,13 @@ private List<SessionImpl> internalBatchCreateSessions(
311345
span.end();
312346
List<SessionImpl> res = new ArrayList<>(sessionCount);
313347
for (com.google.spanner.v1.Session session : sessions) {
314-
res.add(new SessionImpl(spanner, session.getName(), options));
348+
res.add(
349+
new SessionImpl(
350+
spanner,
351+
session.getName(),
352+
session.getCreateTime(),
353+
session.getMultiplexed(),
354+
options));
315355
}
316356
return res;
317357
} catch (RuntimeException e) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

+35
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ interface SessionTransaction {
9898
ByteString readyTransactionId;
9999
private final Map<SpannerRpc.Option, ?> options;
100100
private volatile Instant lastUseTime;
101+
@Nullable private final Instant createTime;
102+
private final boolean isMultiplexed;
101103
private ISpan currentSpan;
102104

103105
SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
@@ -107,6 +109,24 @@ interface SessionTransaction {
107109
this.name = checkNotNull(name);
108110
this.databaseId = SessionId.of(name).getDatabaseId();
109111
this.lastUseTime = Instant.now();
112+
this.createTime = null;
113+
this.isMultiplexed = false;
114+
}
115+
116+
SessionImpl(
117+
SpannerImpl spanner,
118+
String name,
119+
com.google.protobuf.Timestamp createTime,
120+
boolean isMultiplexed,
121+
Map<SpannerRpc.Option, ?> options) {
122+
this.spanner = spanner;
123+
this.tracer = spanner.getTracer();
124+
this.options = options;
125+
this.name = checkNotNull(name);
126+
this.databaseId = SessionId.of(name).getDatabaseId();
127+
this.lastUseTime = Instant.now();
128+
this.createTime = convert(createTime);
129+
this.isMultiplexed = isMultiplexed;
110130
}
111131

112132
@Override
@@ -130,6 +150,14 @@ Instant getLastUseTime() {
130150
return lastUseTime;
131151
}
132152

153+
Instant getCreateTime() {
154+
return createTime;
155+
}
156+
157+
boolean getIsMultiplexed() {
158+
return isMultiplexed;
159+
}
160+
133161
void markUsed(Instant instant) {
134162
lastUseTime = instant;
135163
}
@@ -455,4 +483,11 @@ boolean hasReadyTransaction() {
455483
TraceWrapper getTracer() {
456484
return tracer;
457485
}
486+
487+
private Instant convert(com.google.protobuf.Timestamp timestamp) {
488+
if (timestamp == null) {
489+
return null;
490+
}
491+
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
492+
}
458493
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
6868
MetricRegistryConstants.INSTRUMENTATION_SCOPE,
6969
GaxProperties.getLibraryVersion(this.getOptions().getClass())));
7070

71+
static final String CREATE_MULTIPLEXED_SESSION = "CloudSpannerOperation.CreateMultiplexedSession";
7172
static final String CREATE_SESSION = "CloudSpannerOperation.CreateSession";
7273
static final String BATCH_CREATE_SESSIONS = "CloudSpannerOperation.BatchCreateSessions";
7374
static final String BATCH_CREATE_SESSIONS_REQUEST =

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

+13
Original file line numberDiff line numberDiff line change
@@ -1605,6 +1605,18 @@ public Session createSession(
16051605
@Nullable Map<String, String> labels,
16061606
@Nullable Map<Option, ?> options)
16071607
throws SpannerException {
1608+
// By default sessions are not multiplexed
1609+
return createSession(databaseName, databaseRole, labels, options, false);
1610+
}
1611+
1612+
@Override
1613+
public Session createSession(
1614+
String databaseName,
1615+
@Nullable String databaseRole,
1616+
@Nullable Map<String, String> labels,
1617+
@Nullable Map<Option, ?> options,
1618+
boolean isMultiplexed)
1619+
throws SpannerException {
16081620
CreateSessionRequest.Builder requestBuilder =
16091621
CreateSessionRequest.newBuilder().setDatabase(databaseName);
16101622
Session.Builder sessionBuilder = Session.newBuilder();
@@ -1614,6 +1626,7 @@ public Session createSession(
16141626
if (databaseRole != null && !databaseRole.isEmpty()) {
16151627
sessionBuilder.setCreatorRole(databaseRole);
16161628
}
1629+
sessionBuilder.setMultiplexed(isMultiplexed);
16171630
requestBuilder.setSession(sessionBuilder);
16181631
CreateSessionRequest request = requestBuilder.build();
16191632
GrpcCallContext context =

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

+10
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,16 @@ Session createSession(
340340
@Nullable Map<Option, ?> options)
341341
throws SpannerException;
342342

343+
default Session createSession(
344+
String databaseName,
345+
@Nullable String databaseRole,
346+
@Nullable Map<String, String> labels,
347+
@Nullable Map<Option, ?> options,
348+
boolean isMultiplexed)
349+
throws SpannerException {
350+
throw new UnsupportedOperationException("Unimplemented");
351+
}
352+
343353
void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;
344354

345355
ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

+3
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,9 @@ public void createSession(
858858
CreateSessionRequest request, StreamObserver<Session> responseObserver) {
859859
requests.add(request);
860860
Preconditions.checkNotNull(request.getDatabase());
861+
Preconditions.checkNotNull(request.getSession());
861862
String name = generateSessionName(request.getDatabase());
863+
Session requestSession = request.getSession();
862864
try {
863865
createSessionExecutionTime.simulateExecutionTime(
864866
exceptions, stickyGlobalExceptions, freezeLock);
@@ -868,6 +870,7 @@ public void createSession(
868870
.setCreateTime(now)
869871
.setName(name)
870872
.setApproximateLastUseTime(now)
873+
.setMultiplexed(requestSession.getMultiplexed())
871874
.build();
872875
Session prev = sessions.putIfAbsent(name, session);
873876
if (prev == null) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java

+79
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package com.google.cloud.spanner;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNull;
22+
import static org.junit.Assert.assertTrue;
2023
import static org.mockito.ArgumentMatchers.any;
2124
import static org.mockito.Mockito.doNothing;
2225
import static org.mockito.Mockito.mock;
@@ -29,6 +32,7 @@
2932
import com.google.cloud.spanner.SessionClient.SessionConsumer;
3033
import com.google.cloud.spanner.spi.v1.SpannerRpc;
3134
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
35+
import com.google.common.collect.ImmutableMap;
3236
import io.opencensus.trace.Tracing;
3337
import io.opentelemetry.api.OpenTelemetry;
3438
import java.util.ArrayList;
@@ -151,6 +155,81 @@ public void createAndCloseSession() {
151155
}
152156
}
153157

158+
@Test
159+
public void createAndCloseMultiplexedSession() {
160+
DatabaseId db = DatabaseId.of(dbName);
161+
String sessionName = dbName + "/sessions/s1";
162+
Map<String, String> labels = ImmutableMap.of("env", "dev");
163+
String databaseRole = "role";
164+
when(spannerOptions.getSessionLabels()).thenReturn(labels);
165+
when(spannerOptions.getDatabaseRole()).thenReturn(databaseRole);
166+
com.google.spanner.v1.Session sessionProto =
167+
com.google.spanner.v1.Session.newBuilder()
168+
.setName(sessionName)
169+
.setMultiplexed(true)
170+
.putAllLabels(labels)
171+
.build();
172+
when(rpc.createSession(
173+
Mockito.eq(dbName),
174+
Mockito.eq(databaseRole),
175+
Mockito.eq(labels),
176+
options.capture(),
177+
Mockito.eq(true)))
178+
.thenReturn(sessionProto);
179+
final AtomicInteger returnedSessionCount = new AtomicInteger();
180+
final SessionConsumer consumer =
181+
new SessionConsumer() {
182+
@Override
183+
public void onSessionReady(SessionImpl session) {
184+
assertEquals(sessionName, session.getName());
185+
returnedSessionCount.incrementAndGet();
186+
187+
session.close();
188+
Mockito.verify(rpc).deleteSession(sessionName, options.getValue());
189+
}
190+
191+
@Override
192+
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {}
193+
};
194+
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
195+
client.createMultiplexedSession(consumer);
196+
}
197+
// for multiplexed session there is no channel hint pass in the RPC options
198+
assertNull(options.getValue());
199+
assertEquals(1, returnedSessionCount.get());
200+
}
201+
202+
@Test
203+
public void createAndCloseMultiplexedSession_whenRPCThrowsException_thenAssertException() {
204+
DatabaseId db = DatabaseId.of(dbName);
205+
Map<String, String> labels = ImmutableMap.of("env", "dev");
206+
String databaseRole = "role";
207+
when(spannerOptions.getSessionLabels()).thenReturn(labels);
208+
when(spannerOptions.getDatabaseRole()).thenReturn(databaseRole);
209+
when(rpc.createSession(
210+
Mockito.eq(dbName),
211+
Mockito.eq(databaseRole),
212+
Mockito.eq(labels),
213+
options.capture(),
214+
Mockito.eq(true)))
215+
.thenThrow(RuntimeException.class);
216+
final SessionConsumer consumer =
217+
new SessionConsumer() {
218+
@Override
219+
public void onSessionReady(SessionImpl session) {}
220+
221+
@Override
222+
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
223+
assertTrue(t instanceof RuntimeException);
224+
}
225+
};
226+
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
227+
client.createMultiplexedSession(consumer);
228+
}
229+
// for multiplexed session there is no channel hint pass in the RPC options
230+
assertNull(options.getValue());
231+
}
232+
154233
@SuppressWarnings("unchecked")
155234
@Test
156235
public void batchCreateAndCloseSessions() {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.junit.Assert.assertThrows;
2323
import static org.junit.Assert.fail;
2424
import static org.mockito.ArgumentMatchers.eq;
25-
import static org.mockito.Mockito.eq;
2625
import static org.mockito.Mockito.mock;
2726
import static org.mockito.Mockito.when;
2827

google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java

+37
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.google.spanner.v1.ExecuteSqlRequest;
5858
import com.google.spanner.v1.GetSessionRequest;
5959
import com.google.spanner.v1.ResultSetMetadata;
60+
import com.google.spanner.v1.Session;
6061
import com.google.spanner.v1.SpannerGrpc;
6162
import com.google.spanner.v1.StructType;
6263
import com.google.spanner.v1.StructType.Field;
@@ -668,6 +669,42 @@ public void testAdminStubSettings_whenStubNotInitialized_assertNullClientSetting
668669
rpc.shutdown();
669670
}
670671

672+
@Test
673+
public void testCreateSession_assertSessionProto() {
674+
SpannerOptions options = createSpannerOptions();
675+
GapicSpannerRpc rpc = new GapicSpannerRpc(options, true);
676+
677+
Session session = rpc.createSession("DATABASE_NAME", null, null, null);
678+
assertNotNull(session);
679+
assertNotNull(session.getCreateTime());
680+
assertEquals(false, session.getMultiplexed());
681+
rpc.shutdown();
682+
}
683+
684+
@Test
685+
public void testCreateSession_whenMultiplexedSessionIsTrue_assertSessionProto() {
686+
SpannerOptions options = createSpannerOptions();
687+
GapicSpannerRpc rpc = new GapicSpannerRpc(options, true);
688+
689+
Session session = rpc.createSession("DATABASE_NAME", null, null, null, true);
690+
assertNotNull(session);
691+
assertNotNull(session.getCreateTime());
692+
assertEquals(true, session.getMultiplexed());
693+
rpc.shutdown();
694+
}
695+
696+
@Test
697+
public void testCreateSession_whenMultiplexedSessionIsFalse_assertSessionProto() {
698+
SpannerOptions options = createSpannerOptions();
699+
GapicSpannerRpc rpc = new GapicSpannerRpc(options, true);
700+
701+
Session session = rpc.createSession("DATABASE_NAME", null, null, null, false);
702+
assertNotNull(session);
703+
assertNotNull(session.getCreateTime());
704+
assertEquals(false, session.getMultiplexed());
705+
rpc.shutdown();
706+
}
707+
671708
private SpannerOptions createSpannerOptions() {
672709
String endpoint = address.getHostString() + ":" + server.getPort();
673710
return SpannerOptions.newBuilder()

0 commit comments

Comments
 (0)