10
10
import io .airbyte .integrations .base .IntegrationRunner ;
11
11
import io .airbyte .integrations .base .Source ;
12
12
import io .airbyte .integrations .source .jdbc .AbstractJdbcSource ;
13
+ import java .io .IOException ;
14
+ import java .io .PrintWriter ;
15
+ import java .util .ArrayList ;
16
+ import java .util .List ;
13
17
import java .util .Set ;
18
+ import java .util .concurrent .TimeUnit ;
19
+ import org .apache .commons .lang3 .RandomStringUtils ;
14
20
import org .slf4j .Logger ;
15
21
import org .slf4j .LoggerFactory ;
16
22
@@ -19,6 +25,9 @@ public class Db2Source extends AbstractJdbcSource implements Source {
19
25
private static final Logger LOGGER = LoggerFactory .getLogger (Db2Source .class );
20
26
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver" ;
21
27
28
+ private static final String KEY_STORE_PASS = RandomStringUtils .randomAlphanumeric (8 );
29
+ private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks" ;
30
+
22
31
public Db2Source () {
23
32
super (DRIVER_CLASS , new Db2JdbcStreamingQueryConfiguration ());
24
33
}
@@ -32,14 +41,31 @@ public static void main(final String[] args) throws Exception {
32
41
33
42
@ Override
34
43
public JsonNode toDatabaseConfig (final JsonNode config ) {
35
- return Jsons .jsonNode (ImmutableMap .builder ()
36
- .put ("jdbc_url" , String .format ("jdbc:db2://%s:%s/%s" ,
37
- config .get ("host" ).asText (),
38
- config .get ("port" ).asText (),
39
- config .get ("db" ).asText ()))
44
+ final StringBuilder jdbcUrl = new StringBuilder (String .format ("jdbc:db2://%s:%s/%s" ,
45
+ config .get ("host" ).asText (),
46
+ config .get ("port" ).asText (),
47
+ config .get ("db" ).asText ()));
48
+
49
+ var result = Jsons .jsonNode (ImmutableMap .builder ()
50
+ .put ("jdbc_url" , jdbcUrl .toString ())
40
51
.put ("username" , config .get ("username" ).asText ())
41
52
.put ("password" , config .get ("password" ).asText ())
42
53
.build ());
54
+
55
+ // assume ssl if not explicitly mentioned.
56
+ var additionalParams = obtainConnectionOptions (config .get ("encryption" ));
57
+ if (!additionalParams .isEmpty ()) {
58
+ jdbcUrl .append (":" ).append (String .join (";" , additionalParams ));
59
+ jdbcUrl .append (";" );
60
+ result = Jsons .jsonNode (ImmutableMap .builder ()
61
+ .put ("jdbc_url" , jdbcUrl .toString ())
62
+ .put ("username" , config .get ("username" ).asText ())
63
+ .put ("password" , config .get ("password" ).asText ())
64
+ .put ("connection_properties" , additionalParams )
65
+ .build ());
66
+ }
67
+
68
+ return result ;
43
69
}
44
70
45
71
@ Override
@@ -49,4 +75,53 @@ public Set<String> getExcludedInternalNameSpaces() {
49
75
"SYSPROC" , "SYSPUBLIC" , "SYSSTAT" , "SYSTOOLS" );
50
76
}
51
77
78
+ /* Helpers */
79
+
80
+ private List <String > obtainConnectionOptions (JsonNode encryption ) {
81
+ List <String > additionalParameters = new ArrayList <>();
82
+ if (!encryption .isNull ()) {
83
+ String encryptionMethod = encryption .get ("encryption_method" ).asText ();
84
+ if ("encrypted_verify_certificate" .equals (encryptionMethod )) {
85
+ var keyStorePassword = getKeyStorePassword (encryption .get ("key_store_password" ));
86
+ try {
87
+ convertAndImportCertificate (encryption .get ("ssl_certificate" ).asText (), keyStorePassword );
88
+ } catch (IOException | InterruptedException e ) {
89
+ throw new RuntimeException ("Failed to import certificate into Java Keystore" );
90
+ }
91
+ additionalParameters .add ("sslConnection=true" );
92
+ additionalParameters .add ("sslTrustStoreLocation=" + KEY_STORE_FILE_PATH );
93
+ additionalParameters .add ("sslTrustStorePassword=" + keyStorePassword );
94
+ }
95
+ }
96
+ return additionalParameters ;
97
+ }
98
+
99
+ private static String getKeyStorePassword (JsonNode encryptionKeyStorePassword ) {
100
+ var keyStorePassword = KEY_STORE_PASS ;
101
+ if (!encryptionKeyStorePassword .isNull () || !encryptionKeyStorePassword .isEmpty ()) {
102
+ keyStorePassword = encryptionKeyStorePassword .asText ();
103
+ }
104
+ return keyStorePassword ;
105
+ }
106
+
107
+ private static void convertAndImportCertificate (String certificate , String keyStorePassword )
108
+ throws IOException , InterruptedException {
109
+ Runtime run = Runtime .getRuntime ();
110
+ try (PrintWriter out = new PrintWriter ("certificate.pem" )) {
111
+ out .print (certificate );
112
+ }
113
+ runProcess ("openssl x509 -outform der -in certificate.pem -out certificate.der" , run );
114
+ runProcess (
115
+ "keytool -import -alias rds-root -keystore " + KEY_STORE_FILE_PATH + " -file certificate.der -storepass " + keyStorePassword + " -noprompt" ,
116
+ run );
117
+ }
118
+
119
+ private static void runProcess (String cmd , Runtime run ) throws IOException , InterruptedException {
120
+ Process pr = run .exec (cmd );
121
+ if (!pr .waitFor (30 , TimeUnit .SECONDS )) {
122
+ pr .destroy ();
123
+ throw new RuntimeException ("Timeout while executing: " + cmd );
124
+ }
125
+ }
126
+
52
127
}
0 commit comments