Skip to content

Commit

Permalink
Add per-connection driver properties (#99)
Browse files Browse the repository at this point in the history
Co-authored-by: Joseph Grogan <jogrogan@linkedin.com>
  • Loading branch information
ryannedolan and jogrogan authored Feb 7, 2025
1 parent 2e84b5a commit bada97a
Show file tree
Hide file tree
Showing 34 changed files with 389 additions and 188 deletions.
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ junit = "junit:junit:4.12"
kafka-clients = "org.apache.kafka:kafka-clients:3.2.0"
kubernetes-client = "io.kubernetes:client-java:18.0.0"
kubernetes-extended-client = "io.kubernetes:client-java-extended:18.0.0"
slf4j-simple = "org.slf4j:slf4j-simple:1.7.30"
slf4j-api = "org.slf4j:slf4j-api:1.7.30"
slf4j-simple = "org.slf4j:slf4j-simple:1.7.36"
slf4j-api = "org.slf4j:slf4j-api:1.7.36"
sqlline = "sqlline:sqlline:1.12.0"
quidem = "net.hydromatic:quidem:0.11"
venice = "com.linkedin.venice:venice-common:0.4.376"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.sql.SQLException;
import java.sql.Wrapper;
import java.util.Properties;


/** Registers a set of tables, possibly within schemas and sub-schemas. */
Expand All @@ -11,5 +12,5 @@ public interface Catalog {

String description();

void register(Wrapper parentSchema) throws SQLException;
void register(Wrapper parentSchema, Properties connectionProperties) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

public interface ConfigProvider {

Properties loadConfig(String namespace) throws Exception;
Properties loadConfig(Properties connectionProperties) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.linkedin.hoptimator;

import java.util.Collection;
import java.util.Properties;


public interface ConnectorProvider {

<T> Collection<Connector<T>> connectors(Class<T> clazz);
<T> Collection<Connector<T>> connectors(Class<T> clazz, Properties connectionProperties);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.linkedin.hoptimator;

import java.util.Collection;
import java.util.Properties;


public interface DeployerProvider {

<T> Collection<Deployer<T>> deployers(Class<T> clazz);
<T> Collection<Deployer<T>> deployers(Class<T> clazz, Properties connectionProperties);
}
19 changes: 10 additions & 9 deletions hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import java.util.List;
import java.util.Scanner;

import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.rel.RelRoot;
import org.jline.reader.Completer;

import com.linkedin.hoptimator.SqlDialect;
import com.linkedin.hoptimator.jdbc.HoptimatorConnection;
import com.linkedin.hoptimator.jdbc.HoptimatorDriver;
import com.linkedin.hoptimator.util.DeploymentService;
import com.linkedin.hoptimator.util.planner.PipelineRel;
Expand Down Expand Up @@ -73,7 +73,7 @@ public String matches(String line) {

@Override
public void execute(String line, DispatchCallback dispatchCallback) {
if (!(sqlline.getConnection() instanceof CalciteConnection)) {
if (!(sqlline.getConnection() instanceof HoptimatorConnection)) {
sqlline.error("This connection doesn't support `!pipeline`.");
dispatchCallback.setToFailure();
return;
Expand All @@ -85,10 +85,10 @@ public void execute(String line, DispatchCallback dispatchCallback) {
return;
}
String sql = split[1];
CalciteConnection conn = (CalciteConnection) sqlline.getConnection();
HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
try {
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
PipelineRel.Implementor plan = DeploymentService.plan(root);
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.connectionProperties());
sqlline.output(plan.sql().apply(SqlDialect.ANSI));
} catch (SQLException e) {
sqlline.error(e);
Expand Down Expand Up @@ -142,7 +142,7 @@ public String matches(String line) {

@Override
public void execute(String line, DispatchCallback dispatchCallback) {
if (!(sqlline.getConnection() instanceof CalciteConnection)) {
if (!(sqlline.getConnection() instanceof HoptimatorConnection)) {
sqlline.error("This connection doesn't support `!specify`.");
dispatchCallback.setToFailure();
return;
Expand All @@ -154,10 +154,11 @@ public void execute(String line, DispatchCallback dispatchCallback) {
return;
}
String sql = split[1];
CalciteConnection conn = (CalciteConnection) sqlline.getConnection();
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
try {
List<String> specs = DeploymentService.plan(root).pipeline("sink").specify();
List<String> specs = DeploymentService.plan(root, conn.connectionProperties())
.pipeline("sink").specify();
specs.forEach(x -> sqlline.output(x + "\n\n---\n\n"));
} catch (SQLException e) {
sqlline.error(e);
Expand Down
46 changes: 46 additions & 0 deletions hoptimator-demodb/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,55 @@
plugins {
id 'java'
id 'maven-publish'
}

dependencies {
implementation project(':hoptimator-api')
implementation project(':hoptimator-util')
implementation libs.calcite.core
}

publishing {
repositories {
maven {
name 'GitHubPackages'
url = 'https://maven.pkg.github.com/linkedin/Hoptimator'
credentials {
username = System.getenv('GITHUB_ACTOR')
password = System.getenv('GITHUB_TOKEN')
}
}
maven {
name 'LinkedInJFrog'
url 'https://linkedin.jfrog.io/artifactory/hoptimator'
credentials {
username = System.getenv('JFROG_USERNAME')
password = System.getenv('JFROG_API_KEY')
}
}
}
publications {
maven(MavenPublication) {
groupId = 'com.linkedin.hoptimator'
artifactId = 'hoptimator-demodb'
version = System.getenv('VERSION')
from components.java
pom {
name = 'hoptimator-api'
description = 'In-memory database driver for testing'
url = 'https://github.com/linkedin/Hoptimator'
licenses {
license {
name = 'BSD 2-Clause'
url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE'
}
}
scm {
connection = 'scm:git:git://github.com:linkedin/Hoptimator.git'
developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git'
url = 'https://github.com/linkedin/Hoptimator'
}
}
}
}
}
12 changes: 11 additions & 1 deletion hoptimator-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ java {
targetCompatibility = JavaVersion.VERSION_11
}

// Work-around for gradle test fixtures bug, which spontaneously started publishing
// junit-bom to maven, for some reason.
components.java.withVariantsFromConfiguration(configurations.testFixturesApiElements) {
skip()
}

components.java.withVariantsFromConfiguration(configurations.testFixturesRuntimeElements) {
skip()
}

publishing {
repositories {
maven {
Expand Down Expand Up @@ -97,4 +107,4 @@ publishing {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

import com.linkedin.hoptimator.Catalog;
import com.linkedin.hoptimator.CatalogProvider;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.linkedin.hoptimator.jdbc;

import java.sql.Connection;
import java.sql.Statement;
import java.sql.SQLException;
import java.util.Properties;

import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalcitePrepare;

import com.linkedin.hoptimator.util.DelegatingConnection;


public class HoptimatorConnection extends DelegatingConnection {

private final CalciteConnection connection;
private final Properties connectionProperties;

public HoptimatorConnection(CalciteConnection connection, Properties connectionProperties) {
super(connection);
this.connection = connection;
this.connectionProperties = connectionProperties;
}

@Override
public Statement createStatement() throws SQLException {
return connection.createStatement();
}

public Properties connectionProperties() {
return connectionProperties;
}

public CalcitePrepare.Context createPrepareContext() {
return connection.createPrepareContext();
}

public CalciteConnection calciteConnection() {
return connection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
Expand Down Expand Up @@ -68,7 +69,12 @@


public final class HoptimatorDdlExecutor extends ServerDdlExecutor {
public static final HoptimatorDdlExecutor INSTANCE = new HoptimatorDdlExecutor();

private final Properties connectionProperties;

public HoptimatorDdlExecutor(Properties connectionProperties) {
this.connectionProperties = connectionProperties;
}

@SuppressWarnings("unused") // used via reflection
public static final SqlParserImplFactory PARSER_FACTORY = new SqlParserImplFactory() {
Expand Down Expand Up @@ -112,9 +118,9 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
try {
ValidationService.validateOrThrow(viewTable, TranslatableTable.class);
if (create.getReplace()) {
DeploymentService.update(viewTable, ViewTable.class);
DeploymentService.update(viewTable, ViewTable.class, connectionProperties);
} else {
DeploymentService.create(viewTable, ViewTable.class);
DeploymentService.create(viewTable, ViewTable.class, connectionProperties);
}
schemaPlus.add(viewName, viewTable);
} catch (Exception e) {
Expand Down Expand Up @@ -185,8 +191,9 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
}

// Plan a pipeline to materialize the view.
RelRoot root = HoptimatorDriver.convert(context, sql).root;
PipelineRel.Implementor plan = DeploymentService.plan(root);
RelRoot root = new HoptimatorDriver.Prepare(connectionProperties)
.convert(context, sql).root;
PipelineRel.Implementor plan = DeploymentService.plan(root, connectionProperties);
plan.setSink(database, sinkPath, rowType, Collections.emptyMap());
Pipeline pipeline = plan.pipeline(viewName);

Expand All @@ -195,9 +202,9 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
ValidationService.validateOrThrow(hook, MaterializedView.class);
pipeline.update();
if (create.getReplace()) {
DeploymentService.update(hook, MaterializedView.class);
DeploymentService.update(hook, MaterializedView.class, connectionProperties);
} else {
DeploymentService.create(hook, MaterializedView.class);
DeploymentService.create(hook, MaterializedView.class, connectionProperties);
}
schemaPlus.add(viewName, materializedViewTable);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
import java.util.function.Supplier;

import org.apache.calcite.avatica.DriverVersion;
import org.apache.calcite.jdbc.CalciteConnection;
Expand All @@ -23,19 +24,20 @@
public class HoptimatorDriver extends Driver {

public HoptimatorDriver() {
super(Prepare::new);
super();
}

static {
new HoptimatorDriver().register();
private HoptimatorDriver(Supplier<CalcitePrepare> prepareFactory) {
super(prepareFactory);
}

public static CalcitePrepare.ConvertResult convert(CalcitePrepare.Context context, String sql) {
return new Prepare().convert(context, sql);
static {
new HoptimatorDriver().register();
}

public static CalcitePrepare.ConvertResult convert(CalciteConnection connection, String sql) {
return convert(connection.createPrepareContext(), sql);
public static CalcitePrepare.ConvertResult convert(HoptimatorConnection conn, String sql) {
CalcitePrepare.Context context = conn.createPrepareContext();
return new Prepare(conn.connectionProperties()).convert(context, sql);
}

@Override
Expand All @@ -54,6 +56,11 @@ public Connection connect(String url, Properties props) throws SQLException {
return null;
}
try {
if (prepareFactory == null) {
// funky way of extending Driver with a custom Prepare:
return withPrepareFactory(() -> new Prepare(props))
.connect(url, props);
}
Connection connection = super.connect(url, props);
if (connection == null) {
throw new IOException("Could not connect to " + url);
Expand All @@ -74,31 +81,42 @@ public Connection connect(String url, Properties props) throws SQLException {
if (catalogs.length == 0 || catalogs[0].length() == 0) {
// load all catalogs (typical usage)
for (Catalog catalog : CatalogService.catalogs()) {
catalog.register(wrappedRootSchema);
catalog.register(wrappedRootSchema, props);
}
} else {
// load specific catalogs when loaded as `jdbc:hoptimator://foo,bar`
for (String catalog : catalogs) {
CatalogService.catalog(catalog).register(wrappedRootSchema);
CatalogService.catalog(catalog).register(wrappedRootSchema, props);
}
}

return connection;
return new HoptimatorConnection(calciteConnection, props);
} catch (Exception e) {
throw new SQLException("Problem loading " + url, e);
}
}

@Override
public Driver withPrepareFactory(Supplier<CalcitePrepare> prepareFactory) {
return new HoptimatorDriver(prepareFactory);
}

public static class Prepare extends CalcitePrepareImpl {

private final Properties connectionProperties;

Prepare(Properties connectionProperties) {
this.connectionProperties = connectionProperties;
}

@Override
protected SqlParser.Config parserConfig() {
return SqlParser.config().withParserFactory(HoptimatorDdlExecutor.PARSER_FACTORY);
}

@Override
public void executeDdl(Context context, SqlNode node) {
new HoptimatorDdlExecutor().executeDdl(context, node);
new HoptimatorDdlExecutor(connectionProperties).executeDdl(context, node);
}
}
}
Loading

0 comments on commit bada97a

Please sign in to comment.