Skip to content

Commit 247a6f5

Browse files
committed
first step to support stored procedures
Support for stored procedures added with tests for MySQL. The tests are taken from http://docs.oracle.com/javase/tutorial/jdbc/basics/storedprocedures.html The out parameters can not be renamed to Elasticsearch field names right now, this will be done as a followup.
1 parent e606421 commit 247a6f5

File tree

16 files changed

+221
-58
lines changed

16 files changed

+221
-58
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
<groupId>org.xbib.elasticsearch.plugin</groupId>
99
<artifactId>elasticsearch-river-jdbc</artifactId>
10-
<version>1.3.0.3</version>
10+
<version>1.3.0.4</version>
1111

1212
<packaging>jar</packaging>
1313

src/main/java/org/xbib/elasticsearch/plugin/jdbc/SQLCommand.java

+18-8
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
import java.io.InputStreamReader;
99
import java.io.Reader;
1010
import java.util.Arrays;
11-
import java.util.HashMap;
1211
import java.util.LinkedList;
1312
import java.util.List;
1413
import java.util.Map;
1514
import java.util.regex.Pattern;
1615

16+
import static org.elasticsearch.common.collect.Lists.newLinkedList;
17+
import static org.elasticsearch.common.collect.Maps.newHashMap;
18+
1719
/**
1820
* The SQL command
1921
*/
@@ -23,9 +25,9 @@ public class SQLCommand {
2325

2426
private static final Pattern STATEMENT_PATTERN = Pattern.compile("^\\s*(update|insert)", Pattern.CASE_INSENSITIVE);
2527

26-
private List<Object> params = new LinkedList<Object>();
28+
private List<Object> params = newLinkedList();
2729

28-
private Map<String, Object> results = new HashMap<String, Object>();
30+
private Map<String, Object> register = newHashMap();
2931

3032
private boolean callable;
3133

@@ -80,12 +82,20 @@ public boolean isQuery() {
8082
return p3 < 0 || p1 < p2 && p1 < p3;
8183
}
8284

83-
public void setResults(Map<String, Object> results) {
84-
this.results = results;
85+
/**
86+
* A register is for parameters of a callable statement.
87+
* @param register a map for registering parameters
88+
*/
89+
public void setRegister(Map<String, Object> register) {
90+
this.register = register;
8591
}
8692

87-
public Map<String, Object> getResults() {
88-
return results;
93+
/**
94+
* Get the parameters of a callable statement
95+
* @return the register map
96+
*/
97+
public Map<String, Object> getRegister() {
98+
return register;
8999
}
90100

91101
@SuppressWarnings({"unchecked"})
@@ -110,7 +120,7 @@ public static List<SQLCommand> parse(Map<String, Object> settings) {
110120
command.setCallable(XContentMapValues.nodeBooleanValue(m.get("callable")));
111121
}
112122
if (m.containsKey("register")) {
113-
command.setResults(XContentMapValues.nodeMapValue(m.get("register"), null));
123+
command.setRegister(XContentMapValues.nodeMapValue(m.get("register"), null));
114124
}
115125
} else if (entry instanceof String) {
116126
command.setSQL((String) entry);

src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java

+30-19
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import java.util.Map;
4444
import java.util.TimeZone;
4545

46+
import static org.elasticsearch.common.collect.Lists.newLinkedList;
47+
4648
/**
4749
* Simple river source.
4850
* <p/>
@@ -383,21 +385,22 @@ private void executeCallable(SQLCommand command) throws Exception {
383385
if (!command.getParameters().isEmpty()) {
384386
bind(statement, command.getParameters());
385387
}
386-
if (!command.getResults().isEmpty()) {
387-
register(statement, command.getResults());
388+
if (!command.getRegister().isEmpty()) {
389+
register(statement, command.getRegister());
388390
}
389391
boolean hasRows = statement.execute();
390392
RiverMouthKeyValueStreamListener<Object, Object> listener = new RiverMouthKeyValueStreamListener<Object, Object>()
391393
.output(context.getRiverMouth());
392-
if (!hasRows) {
393-
// merge from registered params
394-
merge(statement, command, listener);
395-
} else {
394+
if (hasRows) {
395+
logger.debug("callable execution created result set");
396396
while (hasRows) {
397-
// merge result set
397+
// merge result set, but use register
398398
merge(statement.getResultSet(), listener);
399399
hasRows = statement.getMoreResults();
400400
}
401+
} else {
402+
// no result set, merge from registered params only
403+
merge(statement, command, listener);
401404
}
402405
} finally {
403406
close(statement);
@@ -442,17 +445,22 @@ public void merge(ResultSet results, KeyValueStreamListener listener)
442445
@SuppressWarnings({"unchecked"})
443446
public void merge(CallableStatement statement, SQLCommand command, KeyValueStreamListener listener)
444447
throws SQLException, IOException {
445-
Map<String, Object> map = command.getResults();
448+
Map<String, Object> map = command.getRegister();
446449
if (map.isEmpty()) {
450+
// no register given, return without doing anything
447451
return;
448452
}
449-
List<String> keys = new LinkedList<String>();
450-
List<Object> values = new LinkedList<Object>();
453+
List<String> keys = newLinkedList();
454+
List<Object> values = newLinkedList();
451455
for (Map.Entry<String, Object> entry : map.entrySet()) {
452-
keys.add(entry.getKey());
453-
Map<String, Object> m = (Map<String, Object>) entry.getValue();
454-
values.add(statement.getObject((Integer) m.get("pos")));
456+
String k = entry.getKey();
457+
Map<String, Object> v = (Map<String, Object>) entry.getValue();
458+
Integer pos = (Integer) v.get("pos"); // the parameter position of the value
459+
String field = (String) v.get("field"); // the field for indexing the value (if not key name)
460+
keys.add(field != null ? field : k);
461+
values.add(statement.getObject(pos));
455462
}
463+
logger.trace("merge callable statement result: keys={} values={}", keys, values);
456464
listener.keys(keys);
457465
listener.values(values);
458466
listener.end();
@@ -532,11 +540,18 @@ public SimpleRiverSource register(CallableStatement statement, Map<String, Objec
532540
return this;
533541
}
534542
for (Map.Entry<String, Object> me : values.entrySet()) {
535-
// { "fieldname" : { "pos": n, "type" : "VARCHAR" }, ... }
543+
// { "key" : { "pos": n, "type" : "VARCHAR", "field" : "fieldname" }, ... }
536544
Map<String, Object> m = (Map<String, Object>) me.getValue();
537545
Integer n = (Integer) m.get("pos");
538546
String type = (String) m.get("type");
539-
register(statement, n, type);
547+
if (n != null && type != null) {
548+
logger.info("n={} type={}", n, toJDBCType(type));
549+
try {
550+
statement.registerOutParameter(n, toJDBCType(type));
551+
} catch (Throwable t) {
552+
logger.warn("can't register out parameter " + n + " of type " + type);
553+
}
554+
}
540555
}
541556
return this;
542557
}
@@ -938,10 +953,6 @@ private void bind(PreparedStatement statement, int i, Object value) throws SQLEx
938953
}
939954
}
940955

941-
private void register(CallableStatement statement, Integer pos, String type) throws SQLException {
942-
statement.registerOutParameter(pos, toJDBCType(type));
943-
}
944-
945956
/**
946957
* Parse of value of result set
947958
*
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package org.xbib.elasticsearch.river.jdbc.strategy.simple;
22

3+
import org.elasticsearch.action.search.SearchResponse;
4+
import org.elasticsearch.index.query.QueryBuilders;
35
import org.testng.annotations.Parameters;
46
import org.testng.annotations.Test;
57
import org.xbib.elasticsearch.plugin.jdbc.RiverContext;
68
import org.xbib.elasticsearch.river.jdbc.RiverSource;
79
import org.xbib.elasticsearch.support.helper.AbstractRiverNodeTest;
810

9-
import java.sql.Connection;
10-
import java.sql.Statement;
11-
1211
public class RiverStoredProcedureTests extends AbstractRiverNodeTest {
1312

1413
@Override
@@ -22,18 +21,24 @@ public RiverContext getRiverContext() {
2221
}
2322

2423
@Test
25-
@Parameters({"river5", "sql1", "sql2"})
26-
public void testSimpleStoredProcedure(String riverResource, String sql, String storedProcSQL)
24+
@Parameters({"river8"})
25+
public void testSimpleStoredProcedure(String riverResource)
26+
throws Exception {
27+
createRiver(riverResource);
28+
waitForInactiveRiver();
29+
assertHits("1", 5);
30+
}
31+
32+
@Test
33+
@Parameters({"river9"})
34+
public void testRegisterStoredProcedure(String riverResource)
2735
throws Exception {
28-
createRandomProducts(sql, 100);
29-
// create stored procedure
30-
Connection connection = source.getConnectionForWriting();
31-
Statement statement = connection.createStatement();
32-
statement.execute(storedProcSQL);
33-
statement.close();
34-
source.closeWriting();
3536
createRiver(riverResource);
3637
waitForInactiveRiver();
38+
assertHits("1", 1);
39+
SearchResponse response = client("1").prepareSearch("my_jdbc_river_index")
40+
.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
41+
assertEquals("{supplierName=Acme, Inc.}", response.getHits().getHits()[0].getSource().toString());
3742
}
3843

3944
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.xbib.elasticsearch.river.jdbc.strategy.simple.storedprocedure;
2+
3+
import java.sql.Connection;
4+
import java.sql.DriverManager;
5+
import java.sql.PreparedStatement;
6+
import java.sql.ResultSet;
7+
import java.sql.SQLException;
8+
import java.sql.Statement;
9+
10+
public class StoredProcedureJavaDBSample {
11+
12+
public static void showSuppliers(ResultSet[] rs)
13+
throws SQLException {
14+
15+
Connection con = DriverManager.getConnection("jdbc:default:connection");
16+
Statement stmt = null;
17+
18+
String query =
19+
"select SUPPLIERS.SUP_NAME, " +
20+
"COFFEES.COF_NAME " +
21+
"from SUPPLIERS, COFFEES " +
22+
"where SUPPLIERS.SUP_ID = " +
23+
"COFFEES.SUP_ID " +
24+
"order by SUP_NAME";
25+
26+
stmt = con.createStatement();
27+
rs[0] = stmt.executeQuery(query);
28+
}
29+
30+
public static void getSupplierOfCoffee(String coffeeName, String[] supplierName)
31+
throws SQLException {
32+
33+
Connection con = DriverManager.getConnection("jdbc:default:connection");
34+
PreparedStatement pstmt = null;
35+
ResultSet rs = null;
36+
37+
String query =
38+
"select SUPPLIERS.SUP_NAME " +
39+
"from SUPPLIERS, COFFEES " +
40+
"where " +
41+
"SUPPLIERS.SUP_ID = COFFEES.SUP_ID " +
42+
"and ? = COFFEES.COF_NAME";
43+
44+
pstmt = con.prepareStatement(query);
45+
pstmt.setString(1, coffeeName);
46+
rs = pstmt.executeQuery();
47+
48+
if (rs.next()) {
49+
supplierName[0] = rs.getString(1);
50+
} else {
51+
supplierName[0] = null;
52+
}
53+
}
54+
55+
}

src/test/java/org/xbib/elasticsearch/support/helper/AbstractRiverNodeTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ public void afterMethod(String stopurl, String user, String password, @Optional
153153
protected void createRiver(String resource) {
154154
try {
155155
waitForYellow("1");
156-
Map<String, Object> map = XContentHelper.convertToMap(Streams.copyToByteArray(getClass().getResourceAsStream(resource)), false).v2();
156+
byte[] b = Streams.copyToByteArray(getClass().getResourceAsStream(resource));
157+
Map<String, Object> map = XContentHelper.convertToMap(b, false).v2();
157158
XContentBuilder builder = jsonBuilder()
158159
.startObject()
159160
.field("type", "jdbc")

src/test/resources/log4j2.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<AppenderRef ref="Console" />
1111
</Root>
1212
<!--
13-
<Logger name="NodeClient" level="OFF">
13+
<Logger name="BulkNodeClient" level="INFO">
1414
<AppenderRef ref="Console" />
1515
</Logger>
1616
-->

src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/derby/river-5.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"password" : "",
66
"sql" : [
77
{
8-
"statement : "{call count_products(?)}",
8+
"statement" : "{call count_products(?)}",
99
"parameter" : [ 1 ],
1010
"callable" : true
1111
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
drop table IF EXISTS SUPPLIERS
2+
drop table IF EXISTS COFFEES
3+
drop procedure IF EXISTS SHOW_SUPPLIERS
4+
create table SUPPLIERS (SUP_ID integer NOT NULL, SUP_NAME varchar(40) NOT NULL, STREET varchar(40) NOT NULL, CITY varchar(20) NOT NULL, STATE char(2) NOT NULL, ZIP char(5), PRIMARY KEY (SUP_ID));
5+
create table COFFEES (COF_NAME varchar(32) NOT NULL, SUP_ID int NOT NULL, PRICE numeric(10,2) NOT NULL, SALES integer NOT NULL, TOTAL integer NOT NULL, PRIMARY KEY (COF_NAME), FOREIGN KEY (SUP_ID) REFERENCES SUPPLIERS (SUP_ID));
6+
insert into SUPPLIERS values(49, 'Superior Coffee', '1 Party Place', 'Mendocino', 'CA', '95460')
7+
insert into SUPPLIERS values(101, 'Acme, Inc.', '99 Market Street', 'Groundsville', 'CA', '95199')
8+
insert into SUPPLIERS values(150, 'The High Ground', '100 Coffee Lane', 'Meadows', 'CA', '93966')
9+
insert into COFFEES values('Colombian', 00101, 7.99, 0, 0)
10+
insert into COFFEES values('French_Roast', 00049, 8.99, 0, 0)
11+
insert into COFFEES values('Espresso', 00150, 9.99, 0, 0)
12+
insert into COFFEES values('Colombian_Decaf', 00101, 8.99, 0, 0)
13+
insert into COFFEES values('French_Roast_Decaf', 00049, 9.99, 0, 0)
14+
create procedure SHOW_SUPPLIERS() begin select SUPPLIERS.SUP_NAME, COFFEES.COF_NAME from SUPPLIERS, COFFEES where SUPPLIERS.SUP_ID = COFFEES.SUP_ID order by SUP_NAME; end
15+
create procedure GET_SUPPLIER_OF_COFFEE(IN coffeeName varchar(32), OUT supplierName varchar(40)) begin select SUPPLIERS.SUP_NAME into supplierName from SUPPLIERS, COFFEES where SUPPLIERS.SUP_ID = COFFEES.SUP_ID and coffeeName = COFFEES.COF_NAME; select supplierName; end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
drop table COFFEES
2+
drop table SUPPLIERS
3+
drop procedure SHOW_SUPPLIERS
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"jdbc" : {
3+
"url" : "jdbc:mysql://localhost:3306/test",
4+
"user" : "",
5+
"password" : "",
6+
"sql" : "select * from products",
7+
"schedule" : "0/5 0-59 0-23 ? * *",
8+
"bulk_flush_interval" : "1s",
9+
"index" : "my_jdbc_river_index",
10+
"type" : "my_jdbc_river_type"
11+
}
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"jdbc" : {
3+
"url" : "jdbc:mysql://localhost:3306/test",
4+
"user" : "",
5+
"password" : "",
6+
7+
"sql" : [
8+
{
9+
"statement" : "select message from logs where {fn timestampdiff(SQL_TSI_HOUR, modified ,?)} > 0",
10+
"parameter" : [ "$now" ]
11+
}
12+
],
13+
"schedule" : "0/5 0-59 0-23 ? * *",
14+
"index" : "my_jdbc_river_index",
15+
"type" : "my_jdbc_river_type",
16+
"timezone" : "Asia/Jerusalem",
17+
"locale" : "iw_IL"
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"jdbc" : {
3+
"url" : "jdbc:mysql://localhost:3306/test",
4+
"user" : "",
5+
"password" : "",
6+
"sql" : [
7+
{
8+
"callable" : true,
9+
"statement" : "{call SHOW_SUPPLIERS()}"
10+
}
11+
],
12+
"index" : "my_jdbc_river_index",
13+
"type" : "my_jdbc_river_type"
14+
}
15+
}

0 commit comments

Comments
 (0)