Skip to content

Commit c2b7dd1

Browse files
add support for chunk based access from DataCloudConnection (forcedotcom#25)
1 parent 49841ee commit c2b7dd1

File tree

3 files changed

+194
-0
lines changed

3 files changed

+194
-0
lines changed

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

+13
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.salesforce.datacloud.jdbc.auth.AuthenticationSettings;
2323
import com.salesforce.datacloud.jdbc.auth.DataCloudTokenProcessor;
2424
import com.salesforce.datacloud.jdbc.auth.TokenProcessor;
25+
import com.salesforce.datacloud.jdbc.core.partial.ChunkBased;
2526
import com.salesforce.datacloud.jdbc.core.partial.RowBased;
2627
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
2728
import com.salesforce.datacloud.jdbc.http.ClientBuilder;
@@ -195,6 +196,18 @@ public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long
195196
return StreamingResultSet.of(queryId, executor, iterator);
196197
}
197198

199+
@Unstable
200+
public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId, long limit) {
201+
log.info("Get chunk-based result set. queryId={}, chunkId={}, limit={}", queryId, chunkId, limit);
202+
val iterator = ChunkBased.of(executor, queryId, chunkId, limit);
203+
return StreamingResultSet.of(queryId, executor, iterator);
204+
}
205+
206+
@Unstable
207+
public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId) {
208+
return getChunkBasedResultSet(queryId, chunkId, 1);
209+
}
210+
198211
/**
199212
* Use this to determine when a given query is complete by filtering the responses and a subsequent findFirst()
200213
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) 2024, Salesforce, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.salesforce.datacloud.jdbc.core.partial;
17+
18+
import com.salesforce.datacloud.jdbc.core.HyperGrpcClientExecutor;
19+
import com.salesforce.datacloud.jdbc.util.Unstable;
20+
import java.util.Iterator;
21+
import java.util.NoSuchElementException;
22+
import java.util.concurrent.atomic.AtomicLong;
23+
import lombok.AccessLevel;
24+
import lombok.NonNull;
25+
import lombok.RequiredArgsConstructor;
26+
import salesforce.cdp.hyperdb.v1.QueryResult;
27+
28+
@Unstable
29+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
30+
public class ChunkBased implements Iterator<QueryResult> {
31+
public static ChunkBased of(
32+
@NonNull HyperGrpcClientExecutor client, @NonNull String queryId, long chunkId, long limit) {
33+
return new ChunkBased(client, queryId, new AtomicLong(chunkId), chunkId + limit);
34+
}
35+
36+
@NonNull private final HyperGrpcClientExecutor client;
37+
38+
@NonNull private final String queryId;
39+
40+
private final AtomicLong chunkId;
41+
42+
private final long limitId;
43+
44+
private Iterator<QueryResult> iterator;
45+
46+
@Override
47+
public boolean hasNext() {
48+
if (iterator == null) {
49+
iterator = client.getQueryResult(queryId, chunkId.getAndIncrement(), false);
50+
}
51+
52+
if (iterator.hasNext()) {
53+
return true;
54+
}
55+
56+
if (chunkId.get() < limitId) {
57+
iterator = client.getQueryResult(queryId, chunkId.getAndIncrement(), true);
58+
}
59+
60+
return iterator.hasNext();
61+
}
62+
63+
@Override
64+
public QueryResult next() {
65+
if (!hasNext()) {
66+
throw new NoSuchElementException();
67+
}
68+
69+
return iterator.next();
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) 2024, Salesforce, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.salesforce.datacloud.jdbc.core.partial;
17+
18+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
19+
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
20+
21+
import com.salesforce.datacloud.jdbc.core.DataCloudQueryStatus;
22+
import com.salesforce.datacloud.jdbc.core.DataCloudStatement;
23+
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
24+
import com.salesforce.datacloud.jdbc.hyper.HyperTestBase;
25+
import io.grpc.StatusRuntimeException;
26+
import java.util.List;
27+
import java.util.concurrent.atomic.AtomicLong;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
import java.util.stream.Collectors;
30+
import lombok.SneakyThrows;
31+
import lombok.extern.slf4j.Slf4j;
32+
import lombok.val;
33+
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.Test;
35+
36+
@Slf4j
37+
class ChunkBasedTest extends HyperTestBase {
38+
private List<Integer> sut(String queryId, long chunkId, long limit) {
39+
try (val connection = getHyperQueryConnection()) {
40+
val rs = limit == 1
41+
? connection.getChunkBasedResultSet(queryId, chunkId)
42+
: connection.getChunkBasedResultSet(queryId, chunkId, limit);
43+
return RowBasedTest.toStream(rs).collect(Collectors.toList());
44+
}
45+
}
46+
47+
private static final int smallSize = 5;
48+
private static final int largeSize = 1024 * 1024 * 10;
49+
private String small;
50+
private String large;
51+
52+
@BeforeAll
53+
void setupQueries() {
54+
small = getQueryId(smallSize);
55+
large = getQueryId(largeSize);
56+
}
57+
58+
@SneakyThrows
59+
@Test
60+
void canGetSimpleChunk() {
61+
val actual = sut(small, 0, 1);
62+
assertThat(actual).containsExactly(1, 2, 3, 4, 5);
63+
}
64+
65+
@SneakyThrows
66+
@Test
67+
void failsOnChunkOverrun() {
68+
assertThatThrownBy(() -> sut(small, 0, 2))
69+
.isInstanceOf(DataCloudJDBCException.class)
70+
.hasMessage("Failed to load next batch")
71+
.hasCauseInstanceOf(StatusRuntimeException.class)
72+
.hasRootCauseMessage("OUT_OF_RANGE: The requested chunk id '1' is out of range");
73+
}
74+
75+
@SneakyThrows
76+
@Test
77+
void consecutiveChunksIncludeAllData() {
78+
val status = new AtomicReference<DataCloudQueryStatus>();
79+
val last = new AtomicLong(0);
80+
try (val connection = getHyperQueryConnection()) {
81+
while (connection
82+
.getQueryStatus(large)
83+
.peek(status::set)
84+
.noneMatch(t -> t.isExecutionFinished() || t.isResultProduced())) {
85+
log.info("waiting for query to finish. queryId={}", large);
86+
}
87+
88+
val rs = connection.getChunkBasedResultSet(large, 0, status.get().getChunkCount());
89+
90+
while (rs.next()) {
91+
assertThat(rs.getLong(1)).isEqualTo(last.incrementAndGet());
92+
}
93+
}
94+
95+
assertThat(last.get()).isEqualTo(largeSize);
96+
}
97+
98+
@SneakyThrows
99+
private String getQueryId(int max) {
100+
val query = String.format(
101+
"select a, cast(a as numeric(38,18)) b, cast(a as numeric(38,18)) c, cast(a as numeric(38,18)) d from generate_series(1, %d) as s(a) order by a asc",
102+
max);
103+
104+
try (val client = getHyperQueryConnection();
105+
val statement = client.createStatement().unwrap(DataCloudStatement.class)) {
106+
statement.executeAsyncQuery(query);
107+
return statement.getQueryId();
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)