Skip to content

Commit 3e9636a

Browse files
committed
Move tracing to sql operation
1 parent 4b97547 commit 3e9636a

File tree

2 files changed

+36
-34
lines changed

2 files changed

+36
-34
lines changed

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java

+6-17
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database,
120120
schema, stream, tmpTable, stage);
121121

122122
AirbyteSentry.executeWithTracing("PrepareStreamStage",
123-
() -> prepareStream(database, snowflakeSqlOperations, schema, tmpTable, stage),
123+
() -> {
124+
snowflakeSqlOperations.createSchemaIfNotExists(database, schema);
125+
snowflakeSqlOperations.createTableIfNotExists(database, schema, tmpTable);
126+
snowflakeSqlOperations.createStageIfNotExists(database, stage);
127+
},
124128
Map.of("schema", schema, "stream", stream, "tmpTable", tmpTable, "stage", stage));
125129

126130
LOGGER.info("Preparing stage in destination completed for schema {} stream {}", schema, stream);
@@ -130,19 +134,6 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database,
130134
};
131135
}
132136

133-
private static void prepareStream(final JdbcDatabase database,
134-
final SnowflakeStagingSqlOperations snowflakeSqlOperations,
135-
final String schema,
136-
final String tmpTable,
137-
final String stage) throws Exception {
138-
AirbyteSentry.executeWithTracing("CreateSchemaIfNotExists",
139-
() -> snowflakeSqlOperations.createSchemaIfNotExists(database, schema));
140-
AirbyteSentry.executeWithTracing("CreateTmpTableIfNotExists",
141-
() -> snowflakeSqlOperations.createTableIfNotExists(database, schema, tmpTable));
142-
AirbyteSentry.executeWithTracing("CreateStageIfNotExists",
143-
() -> snowflakeSqlOperations.createStageIfNotExists(database, stage));
144-
}
145-
146137
private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) {
147138
return new AirbyteStreamNameNamespacePair(config.getStreamName(), config.getNamespace());
148139
}
@@ -191,9 +182,7 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
191182
streamName, schemaName, srcTableName, dstTableName, path);
192183

193184
try {
194-
AirbyteSentry.executeWithTracing("CopyIntoTmpTableFromStage",
195-
() -> sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName),
196-
Map.of("schema", schemaName, "stream", streamName, "tmpTable", srcTableName, "finalTable", dstTableName));
185+
sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName);
197186
} catch (final Exception e) {
198187
sqlOperations.cleanUpStage(database, path);
199188
LOGGER.info("Cleaning stage path {}", path);

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java

+30-17
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66

77
import io.airbyte.db.jdbc.JdbcDatabase;
88
import io.airbyte.integrations.base.JavaBaseConstants;
9+
import io.airbyte.integrations.base.sentry.AirbyteSentry;
910
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
1011
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1112
import io.airbyte.protocol.models.AirbyteRecordMessage;
1213
import java.io.File;
1314
import java.nio.file.Files;
1415
import java.sql.SQLException;
1516
import java.util.List;
17+
import java.util.Map;
1618
import java.util.UUID;
1719
import org.slf4j.Logger;
1820
import org.slf4j.LoggerFactory;
@@ -22,48 +24,57 @@ public class SnowflakeStagingSqlOperations extends JdbcSqlOperations implements
2224
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class);
2325

2426
@Override
25-
protected void insertRecordsInternal(JdbcDatabase database, List<AirbyteRecordMessage> records, String schemaName, String stage) throws Exception {
27+
protected void insertRecordsInternal(final JdbcDatabase database,
28+
final List<AirbyteRecordMessage> records,
29+
final String schemaName,
30+
final String stage) {
2631
LOGGER.info("actual size of batch for staging: {}", records.size());
2732

2833
if (records.isEmpty()) {
2934
return;
3035
}
3136
try {
3237
loadDataIntoStage(database, stage, records);
33-
} catch (Exception e) {
38+
} catch (final Exception e) {
3439
LOGGER.error("Failed to upload records into stage {}", stage, e);
3540
throw new RuntimeException(e);
3641
}
3742
}
3843

39-
private void loadDataIntoStage(JdbcDatabase database, String stage, List<AirbyteRecordMessage> partition) throws Exception {
44+
private void loadDataIntoStage(final JdbcDatabase database, final String stage, final List<AirbyteRecordMessage> partition) throws Exception {
4045
final File tempFile = Files.createTempFile(UUID.randomUUID().toString(), ".csv").toFile();
4146
writeBatchToFile(tempFile, partition);
4247
database.execute(String.format("PUT file://%s @%s PARALLEL = %d", tempFile.getAbsolutePath(), stage, Runtime.getRuntime().availableProcessors()));
4348
Files.delete(tempFile.toPath());
4449
}
4550

4651
public void createStageIfNotExists(final JdbcDatabase database, final String stageName) throws SQLException {
47-
database.execute(String.format("CREATE STAGE IF NOT EXISTS %s encryption = (type = 'SNOWFLAKE_SSE')" +
48-
" copy_options = (on_error='skip_file');", stageName));
52+
final String query = "CREATE STAGE IF NOT EXISTS %s encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');";
53+
AirbyteSentry.executeWithTracing("CreateStageIfNotExists",
54+
() -> database.execute(String.format(query, stageName)),
55+
Map.of("stage", stageName));
4956
}
5057

51-
public void copyIntoTmpTableFromStage(JdbcDatabase database, String stageName, String dstTableName, String schemaName) throws SQLException {
52-
database.execute(String.format("COPY INTO %s.%s FROM @%s file_format = " +
53-
"(type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"')",
54-
schemaName,
55-
dstTableName,
56-
stageName));
57-
58+
public void copyIntoTmpTableFromStage(final JdbcDatabase database, final String stageName, final String dstTableName, final String schemaName)
59+
throws SQLException {
60+
final String query = "COPY INTO %s.%s FROM @%s file_format = " +
61+
"(type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"')";
62+
AirbyteSentry.executeWithTracing("CopyIntoTableFromStage",
63+
() -> database.execute(String.format(query, schemaName, dstTableName, stageName)),
64+
Map.of("schema", schemaName, "stage", stageName, "table", dstTableName));
5865
}
5966

6067
public void dropStageIfExists(final JdbcDatabase database, final String stageName) throws SQLException {
61-
database.execute(String.format("DROP STAGE IF EXISTS %s;", stageName));
68+
AirbyteSentry.executeWithTracing("DropStageIfExists",
69+
() -> database.execute(String.format("DROP STAGE IF EXISTS %s;", stageName)),
70+
Map.of("stage", stageName));
6271
}
6372

6473
@Override
6574
public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException {
66-
database.execute(createTableQuery(database, schemaName, tableName));
75+
AirbyteSentry.executeWithTracing("CreateTableIfNotExists",
76+
() -> database.execute(createTableQuery(database, schemaName, tableName)),
77+
Map.of("schema", schemaName, "table", tableName));
6778
}
6879

6980
@Override
@@ -77,12 +88,14 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
7788
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
7889
}
7990

80-
public void cleanUpStage(JdbcDatabase database, String path) throws SQLException {
81-
database.execute(String.format("REMOVE @%s;", path));
91+
public void cleanUpStage(final JdbcDatabase database, final String path) throws SQLException {
92+
AirbyteSentry.executeWithTracing("CleanStage",
93+
() -> database.execute(String.format("REMOVE @%s;", path)),
94+
Map.of("path", path));
8295
}
8396

8497
@Override
85-
public boolean isSchemaExists(JdbcDatabase database, String outputSchema) throws Exception {
98+
public boolean isSchemaExists(final JdbcDatabase database, final String outputSchema) throws Exception {
8699
return database.query(SHOW_SCHEMAS).map(schemas -> schemas.get(NAME).asText()).anyMatch(outputSchema::equalsIgnoreCase);
87100
}
88101

0 commit comments

Comments
 (0)