Skip to content

Commit d908fdd

Browse files
authored
Adjust fromChannel method to minimize injected headers and add examples (forcedotcom#27)
The intent of this change is to analyze how to use the JDBC driver in the least invasive way (compared to manual gRPC calling). To reach that minimum the `DataCloudConnection.fromChannel` method was adjusted to only add the must have interceptors, which - without usage of the corresponding connection parameters - are complete no ops. To demonstrate this flow a new `examples` test package was create that demonstrates the approach and serves as documentation. To write that test case (and minimize the use of helpers which would hide complexity) the test hyper instance variable with its getPort method was exposed as public. Drive-By:t - The hyper test instance was made non-static to avoid problems in parallel test execution. - A sample implementation of using the row based API to serve row based paginated results was also added and commented. With the parallel ongoing work we should see simplification of it.
1 parent 1321093 commit d908fdd

File tree

9 files changed

+228
-19
lines changed

9 files changed

+228
-19
lines changed

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

+31-6
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,14 @@ public class DataCloudConnection implements Connection, AutoCloseable {
9090
@Getter(AccessLevel.PACKAGE)
9191
@NonNull private final HyperGrpcClientExecutor executor;
9292

93+
/**
94+
* This creates a Data Cloud connection with minimal adjustments to the channels.
95+
* The only added interceptors are those for handling connection parameters that influence headers.
96+
* This will not provide auth / tracing, users of this API are expected to wire their own
97+
*/
9398
public static DataCloudConnection fromChannel(@NonNull ManagedChannelBuilder<?> builder, Properties properties)
9499
throws SQLException {
95-
val interceptors = getClientInterceptors(null, properties);
100+
val interceptors = getPropertyDerivedClientInterceptors(properties);
96101
val executor = HyperGrpcClientExecutor.of(builder.intercept(interceptors), properties);
97102

98103
return DataCloudConnection.builder()
@@ -122,19 +127,38 @@ public static DataCloudConnection fromTokenSupplier(
122127
.build();
123128
}
124129

125-
static List<ClientInterceptor> getClientInterceptors(
126-
AuthorizationHeaderInterceptor authInterceptor, Properties properties) {
130+
/**
131+
* Initializes a list of interceptors that handle channel level concerns that can be defined through properties
132+
* @param properties - The connection properties
133+
* @return a list of client interceptors
134+
*/
135+
static List<ClientInterceptor> getPropertyDerivedClientInterceptors(Properties properties) {
127136
return Stream.of(
128-
authInterceptor,
129-
TracingHeadersInterceptor.of(),
130137
HyperExternalClientContextHeaderInterceptor.of(properties),
131138
HyperWorkloadHeaderInterceptor.of(properties),
132139
DataspaceHeaderInterceptor.of(properties))
133140
.filter(Objects::nonNull)
134-
.peek(t -> log.info("Registering interceptor. interceptor={}", t))
135141
.collect(Collectors.toList());
136142
}
137143

144+
/**
145+
* Initializes the full set of client interceptors from property handling to tracing and auth
146+
* @param authInterceptor an optional auth interceptor, is allowed to be null
147+
* @param properties the connection properties
148+
* @return a list of client interceptors
149+
*/
150+
static List<ClientInterceptor> getClientInterceptors(
151+
AuthorizationHeaderInterceptor authInterceptor, Properties properties) {
152+
val list = getPropertyDerivedClientInterceptors(properties);
153+
list.add(0, TracingHeadersInterceptor.of());
154+
if (authInterceptor != null) {
155+
list.add(0, authInterceptor);
156+
}
157+
;
158+
log.info("Registering interceptor. interceptor={}", list);
159+
return list;
160+
}
161+
138162
public static DataCloudConnection of(String url, Properties properties) throws SQLException {
139163
val connectionString = DataCloudConnectionString.of(url);
140164
addClientUsernameIfRequired(properties);
@@ -179,6 +203,7 @@ private DataCloudPreparedStatement getQueryPreparedStatement(String sql) {
179203
/**
180204
* Retrieves a collection of rows for the specified query once it is ready.
181205
* Use {@link #getQueryStatus(String)} to check if the query has produced results or finished execution before calling this method.
206+
* You can get the Query Id from the executeQuery `DataCloudResultSet`.
182207
* <p>
183208
* When using {@link RowBased.Mode#FULL_RANGE}, this method does not handle pagination near the end of available rows.
184209
* The caller is responsible for calculating the correct offset and limit to avoid out-of-range errors.

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/interceptor/SingleHeaderMutatingClientInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import io.grpc.Metadata;
1919
import lombok.NonNull;
2020

21-
interface SingleHeaderMutatingClientInterceptor extends HeaderMutatingClientInterceptor {
21+
public interface SingleHeaderMutatingClientInterceptor extends HeaderMutatingClientInterceptor {
2222
@NonNull Metadata.Key<String> getKey();
2323

2424
@NonNull String getValue();

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/core/AsyncStreamingResultSetTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class AsyncStreamingResultSetTest extends HyperTestBase {
3636
@SneakyThrows
3737
public void testThrowsOnNonsenseQueryAsync() {
3838
val ex = Assertions.assertThrows(DataCloudJDBCException.class, () -> {
39-
try (val connection = HyperTestBase.getHyperQueryConnection();
39+
try (val connection = getHyperQueryConnection();
4040
val statement = connection.createStatement().unwrap(DataCloudStatement.class)) {
4141
val rs = statement.executeAsyncQuery("select * from nonsense");
4242
waitUntilReady(statement);

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/core/partial/ChunkBasedTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private String getQueryId(int max) {
103103

104104
try (val client = getHyperQueryConnection();
105105
val statement = client.createStatement().unwrap(DataCloudStatement.class)) {
106-
statement.executeAsyncQuery(query);
106+
statement.executeQuery(query);
107107
return statement.getQueryId();
108108
}
109109
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright (c) 2024, Salesforce, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.salesforce.datacloud.jdbc.examples;
17+
18+
import static java.lang.Math.min;
19+
20+
import com.salesforce.datacloud.jdbc.core.DataCloudConnection;
21+
import com.salesforce.datacloud.jdbc.core.DataCloudQueryStatus;
22+
import com.salesforce.datacloud.jdbc.core.DataCloudResultSet;
23+
import com.salesforce.datacloud.jdbc.core.partial.RowBased;
24+
import com.salesforce.datacloud.jdbc.hyper.HyperTestBase;
25+
import io.grpc.ManagedChannelBuilder;
26+
import java.sql.ResultSet;
27+
import java.sql.SQLException;
28+
import java.sql.Statement;
29+
import java.util.*;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.junit.jupiter.api.Test;
32+
33+
/**
34+
* This example uses a locally spawned Hyper instance to demonstrate best practices around connecting to Hyper.
35+
* This consciously only uses the JDBC API in the core and no helpers (outside of this class) to provide self contained
36+
* examples.
37+
*/
38+
@Slf4j
39+
public class SubmitQueryAndConsumeResultsTest extends HyperTestBase {
40+
/**
41+
* This example shows how to create a Data Cloud Connection while still having full control over concerns like
42+
* authorization and tracing.
43+
*/
44+
@Test
45+
public void testBareBonesExecuteQuery() throws SQLException {
46+
// The connection properties
47+
Properties properties = new Properties();
48+
49+
// You can bring your own gRPC channels, setup in the way you like (mTLS / Plaintext / ...) and your own
50+
// interceptors as well as executors.
51+
ManagedChannelBuilder<?> channel = ManagedChannelBuilder.forAddress("127.0.0.1", instance.getPort())
52+
.usePlaintext();
53+
54+
// Use the JDBC Driver interface
55+
try (DataCloudConnection conn = DataCloudConnection.fromChannel(channel, properties)) {
56+
try (Statement stmt = conn.createStatement()) {
57+
ResultSet rs = stmt.executeQuery("SELECT s FROM generate_series(1,10) s");
58+
while (rs.next()) {
59+
System.out.println("Retrieved value:" + rs.getLong(1));
60+
}
61+
}
62+
}
63+
}
64+
65+
/**
66+
* Analyze the query status, as we have a query status we know that the query was last observed in a non failing
67+
* state.
68+
*
69+
* Offset must always be larger or equal to get row count (which would happen for typical next based pagination)
70+
*/
71+
private static long rowBasedStatusObjectRowsCheck(DataCloudQueryStatus queryStatus, long offset, long pageLimit) {
72+
// Check if we can at least return some data
73+
if (queryStatus.getRowCount() > offset) {
74+
return min(queryStatus.getRowCount() - offset, pageLimit);
75+
}
76+
// A negative count signals that no data is available
77+
return -1;
78+
}
79+
80+
/**
81+
* Checks if the query status signals that all results are produced
82+
*/
83+
private static boolean allResultsProduced(DataCloudQueryStatus queryStatus) {
84+
return queryStatus.isResultProduced() || queryStatus.isExecutionFinished();
85+
}
86+
87+
/**
88+
* This example shows how to use the row based pagination mode to get results segmented by approximate row count.
89+
* For the example we access the results in 2 row ranges and have an implementation where the application doesn't
90+
* know how many results would be produced in the end
91+
*/
92+
@Test
93+
public void testRowBasedPagination() throws SQLException {
94+
final int pageRowLimit = 2;
95+
long offset = 0;
96+
long page = 0;
97+
98+
// The connection properties
99+
Properties properties = new Properties();
100+
101+
// You can bring your own gRPC channels, setup in the way you like (mTLS / Plaintext / ...) and your own
102+
// interceptors as well as executors.
103+
ManagedChannelBuilder<?> channel = ManagedChannelBuilder.forAddress("127.0.0.1", instance.getPort())
104+
.usePlaintext();
105+
106+
try (DataCloudConnection conn = DataCloudConnection.fromChannel(channel, properties)) {
107+
// Submit the query and consume the initial page
108+
String queryId;
109+
try (Statement stmt = conn.createStatement()) {
110+
log.warn("Executing query using a single `ExecuteQuery` RPC Call");
111+
ResultSet rs = stmt.executeQuery("SELECT s FROM generate_series(1,11) s");
112+
queryId = ((DataCloudResultSet) rs).getQueryId();
113+
// For this result set we as a consumer must currently implement the pagination limit ourselves
114+
int i = 0;
115+
while (rs.next() && (i++ < pageRowLimit)) {
116+
++offset;
117+
System.out.println("Retrieved value: " + rs.getLong(1) + " on page " + page);
118+
}
119+
++page;
120+
}
121+
122+
// Consume further pages until the full result is consumed (could also be done on a new connection if
123+
// needed)
124+
// NIT: We should provide an API on the original result set to access the `DataCloudQueryStatus` that way,
125+
// if the query is already finished we don't need to do another network round-trip.
126+
Optional<DataCloudQueryStatus> cachedStatus = Optional.empty();
127+
while (true) {
128+
// Try to make sure we have a status object
129+
if (!cachedStatus.isPresent()) {
130+
// Identify if there is more data?
131+
long lambdaOffset = offset;
132+
// In case of query error this could throw an runtime exception
133+
// NIT: What is the timeout enforced here?
134+
log.warn("Fetching query status using a single `GetQueryInfo` RPC call");
135+
// NIT: Semantically I would want takeWhile here which is only available in Java 11
136+
cachedStatus = conn.getQueryStatus(queryId)
137+
.filter(queryStatus ->
138+
(rowBasedStatusObjectRowsCheck(queryStatus, lambdaOffset, pageRowLimit) > 0)
139+
|| allResultsProduced(queryStatus))
140+
.findFirst();
141+
142+
// Query is still running
143+
// NIT: Check how we should handle this in the presence of timeouts
144+
if (!cachedStatus.isPresent()) {
145+
continue;
146+
}
147+
}
148+
149+
long availableRows = rowBasedStatusObjectRowsCheck(cachedStatus.get(), offset, pageRowLimit);
150+
// Check if query completed and thus we can't produce more results
151+
if (availableRows <= 0) {
152+
if (allResultsProduced(cachedStatus.get())) {
153+
break;
154+
} else {
155+
// We need to fetch a new status in the next iteration
156+
// Due to the long-polling nature of `conn.getQueryStatus` this doesn't result in a busy
157+
// spinning loop even if the query is still executing
158+
cachedStatus = Optional.empty();
159+
continue;
160+
}
161+
}
162+
163+
// At this point we know that rows are available
164+
log.warn("Fetching query status using a single `GetQueryResult` RPC call");
165+
try (ResultSet rs =
166+
conn.getRowBasedResultSet(queryId, offset, pageRowLimit, RowBased.Mode.SINGLE_RPC)) {
167+
while (rs.next()) {
168+
++offset;
169+
System.out.println("Retrieved value: " + rs.getLong(1) + " on page " + page);
170+
}
171+
++page;
172+
}
173+
}
174+
}
175+
log.warn("Completed");
176+
}
177+
}

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/hyper/HyperServerProcess.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public HyperServerProcess(HyperServerConfig.HyperServerConfigBuilder config) {
9797
}
9898
}
9999

100-
int getPort() {
100+
public int getPort() {
101101
return port;
102102
}
103103

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/hyper/HyperTestBase.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,18 @@
4949
@Slf4j
5050
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
5151
public class HyperTestBase {
52-
private static HyperServerProcess instance;
52+
public HyperServerProcess instance;
5353

5454
@SneakyThrows
55-
public static void assertEachRowIsTheSame(ResultSet rs, AtomicInteger prev) {
55+
public final void assertEachRowIsTheSame(ResultSet rs, AtomicInteger prev) {
5656
val expected = prev.incrementAndGet();
5757
val a = rs.getBigDecimal(1).intValue();
5858
assertThat(expected).isEqualTo(a);
5959
}
6060

6161
@SafeVarargs
6262
@SneakyThrows
63-
public static void assertWithConnection(
63+
public final void assertWithConnection(
6464
ThrowingConsumer<DataCloudConnection> assertion, Map.Entry<String, String>... settings) {
6565
try (val connection =
6666
getHyperQueryConnection(settings == null ? ImmutableMap.of() : ImmutableMap.ofEntries(settings))) {
@@ -70,7 +70,7 @@ public static void assertWithConnection(
7070

7171
@SafeVarargs
7272
@SneakyThrows
73-
public static void assertWithStatement(
73+
public final void assertWithStatement(
7474
ThrowingConsumer<DataCloudStatement> assertion, Map.Entry<String, String>... settings) {
7575
try (val connection = getHyperQueryConnection(
7676
settings == null ? ImmutableMap.of() : ImmutableMap.ofEntries(settings));
@@ -79,12 +79,19 @@ public static void assertWithStatement(
7979
}
8080
}
8181

82-
public static DataCloudConnection getHyperQueryConnection() {
82+
public DataCloudConnection getHyperQueryConnection() {
8383
return getHyperQueryConnection(ImmutableMap.of());
8484
}
8585

86-
public static DataCloudConnection getHyperQueryConnection(Map<String, String> connectionSettings) {
87-
return instance.getConnection(connectionSettings);
86+
@SneakyThrows
87+
public DataCloudConnection getHyperQueryConnection(Map<String, String> connectionSettings) {
88+
val properties = new Properties();
89+
properties.putAll(connectionSettings);
90+
log.info("Creating connection to port {}", instance.getPort());
91+
ManagedChannelBuilder<?> channel = ManagedChannelBuilder.forAddress("127.0.0.1", instance.getPort())
92+
.usePlaintext();
93+
94+
return DataCloudConnection.fromChannel(channel, properties);
8895
}
8996

9097
@SneakyThrows

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/interceptor/EmittedHeaderTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ private static Map<String, String> getHeadersFor(Properties properties) {
117117
.start();
118118
val channel = InProcessChannelBuilder.forName(name).usePlaintext();
119119

120-
try (val connection = DataCloudConnection.fromChannel(channel, properties);
120+
try (val connection = DataCloudConnection.fromTokenSupplier(null, channel, properties);
121121
val statement = connection.createStatement().unwrap(DataCloudStatement.class)) {
122122
statement.executeAsyncQuery("select 1");
123123
}

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<flatten-maven-plugin.version>1.6.0</flatten-maven-plugin.version>
2020
<git-build-hook-maven-plugin.version>3.5.0</git-build-hook-maven-plugin.version>
2121
<!-- https://tableau.github.io/hyper-db/docs/releases#download -->
22-
<hyperapi.version>0.0.21200.re11c8cb9</hyperapi.version>
22+
<hyperapi.version>0.0.21408.rf5a406c0</hyperapi.version>
2323
<hyperd.directory>${project.build.directory}/hyper</hyperd.directory>
2424
<junit-bom.version>5.11.3</junit-bom.version>
2525
<maven-depndency-plugin.version>3.8.1</maven-depndency-plugin.version>

0 commit comments

Comments
 (0)