Skip to content

Commit c934b05

Browse files
committed
Stored procedure support
Add field name renaming in the register map. Change of RiverSource API, the SQL command is passed through the result set processing because it carries the callable statement register map information for the field names.
1 parent 247a6f5 commit c934b05

File tree

9 files changed

+188
-62
lines changed

9 files changed

+188
-62
lines changed

README.md

+57-5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch.
4545

4646
| Elasticsearch version | Plugin | Release date |
4747
| ------------------------ | -----------| -------------|
48+
| 1.3.1 | 1.3.0.4 | Aug 5, 2014 |
4849
| 1.3.1 | 1.3.0.3 | Aug 4, 2014 |
4950
| 1.3.1 | 1.3.0.2 | Aug 2, 2014 |
5051
| 1.3.1 | 1.3.0.1 | Jul 31, 2014 |
@@ -60,7 +61,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch.
6061

6162
## Installation
6263

63-
./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.3/elasticsearch-river-jdbc-1.3.0.3-plugin.zip
64+
./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.4/elasticsearch-river-jdbc-1.3.0.4-plugin.zip
6465

6566
Do not forget to restart the node after installing.
6667

@@ -75,6 +76,7 @@ Change into this directory to invoke the `./bin/plugin` command line tool.
7576

7677
| File | SHA1 |
7778
| ---------------------------------------------| -----------------------------------------|
79+
| elasticsearch-river-jdbc-1.3.0.4-plugin.zip | dcb412285f6274ef07c05068311dacb745fe8046 |
7880
| elasticsearch-river-jdbc-1.3.0.3-plugin.zip | 7e3fe518c716305a7878fddb299f0c263fb5ed4b |
7981
| elasticsearch-river-jdbc-1.3.0.2-plugin.zip | 7f87af3055223d15238da9c81ae95ff6ea0ce934 |
8082
| elasticsearch-river-jdbc-1.3.0.1-plugin.zip | ee58c51acfb4bc2294939c655ff2f790890808bc |
@@ -231,8 +233,7 @@ Example:
231233
"sql" : [
232234
{
233235
"statement" : "select ... from ... where a = ?, b = ?, c = ?",
234-
"parameter" : [ "value for a", "value for b", "value for c" ],
235-
"callable" : false
236+
"parameter" : [ "value for a", "value for b", "value for c" ]
236237
},
237238
{
238239
"statement" : ...
@@ -241,6 +242,8 @@ Example:
241242
242243
`sql.statement` - the SQL statement
243244
245+
`sql.callable` - boolean flag, if true, the SQL statement is interpreted as a JDBC CallableStatement for stored procedures (default: false).
246+
244247
`sql.parameter` - bind parameters for the SQL statement (in order). Some special values can be used with the following meanings:
245248
246249
* `$now` - the current timestamp
@@ -255,8 +258,6 @@ Example:
255258
* `$river.state.timestamp` - last timestamp of river activity (from river state)
256259
* `$river.state.counter` - counter from river state, counts the numbers of runs
257260
258-
`sql.callable` - boolean flag, if true, the SQL statement is interpreted as a JDBC CallableStatement (default: false). Note: callable statement support is experimental and not well tested.
259-
260261
`locale` - the default locale (used for parsing numerical values, floating point character. Recommended values is "en_US")
261262
262263
`timezone` - the timezone for JDBC setTimestamp() calls when binding parameters with timestamp values
@@ -694,6 +695,57 @@ will result into the following JSON documents
694695
id=<random> {"product":"Apples","created":1338501600000,"department":"German Fruits","quantity":2,"customer":"Good"}
695696
id=<random> {"product":"Oranges","created":1338501600000,"department":"English Fruits","quantity":3,"customer":"Bad"}
696697
698+
# Stored procedures or callable statements
699+
700+
Stored procedures can also be used for fetchng data, like this example fo MySQL illustrates.
701+
See also [Using Stored Procedures](http://docs.oracle.com/javase/tutorial/jdbc/basics/storedprocedures.html)
702+
from where the example is taken.
703+
704+
create procedure GET_SUPPLIER_OF_COFFEE(
705+
IN coffeeName varchar(32),
706+
OUT supplierName varchar(40))
707+
begin
708+
select SUPPLIERS.SUP_NAME into supplierName
709+
from SUPPLIERS, COFFEES
710+
where SUPPLIERS.SUP_ID = COFFEES.SUP_ID
711+
and coffeeName = COFFEES.COF_NAME;
712+
select supplierName;
713+
end
714+
715+
Now it is possible to call the procedure from the JDBC plugin and index the result in Elasticsearch.
716+
717+
{
718+
"jdbc" : {
719+
"url" : "jdbc:mysql://localhost:3306/test",
720+
"user" : "",
721+
"password" : "",
722+
"sql" : [
723+
{
724+
"callable" : true,
725+
"statement" : "{call GET_SUPPLIER_OF_COFFEE(?,?)}",
726+
"parameter" : [
727+
"Colombian"
728+
],
729+
"register" : {
730+
"mySupplierName" : { "pos" : 2, "type" : "varchar" }
731+
}
732+
}
733+
],
734+
"index" : "my_jdbc_river_index",
735+
"type" : "my_jdbc_river_type"
736+
}
737+
}
738+
739+
Note, the `parameter` lists the input parameters in the order they should be applied, like in an
740+
ordinary statement. The `register` declares a list of output parameters in the particular order
741+
the `pos` number indicates. It is required to declare the JDBC type in the `type` attribute.
742+
`mySupplierName`, the key of the output parameter, is used as the Elasticsearch field name specification,
743+
like the column name specification in an ordinary SQL statement, because column names are not available
744+
in callable statement result sets.
745+
746+
If there is more than one result sets returned by a callable statement,
747+
the JDBC plugin enters a loop and iterates through all result sets.
748+
697749
# Monitoring the JDBC river state
698750
699751
While a river/feed is running, you can monitor the activity by using the `_state` command.

src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.xbib.elasticsearch.river.jdbc;
22

33
import org.xbib.elasticsearch.plugin.jdbc.RiverContext;
4+
import org.xbib.elasticsearch.plugin.jdbc.SQLCommand;
45
import org.xbib.keyvalue.KeyValueStreamListener;
56

67
import java.io.IOException;
@@ -156,6 +157,20 @@ public interface RiverSource {
156157

157158
void beforeRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
158159

160+
boolean nextRow(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
161+
162+
void afterRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
163+
164+
/**
165+
* This routine is executed before the result set is evaluated
166+
* @param command the SQL command that created this result set
167+
* @param results the result set
168+
* @param listener listener for the key/value stream generated from the result set
169+
* @throws SQLException
170+
* @throws IOException
171+
*/
172+
void beforeRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
173+
159174
/**
160175
* Action for the next row of the result set to be processed
161176
*
@@ -165,9 +180,18 @@ public interface RiverSource {
165180
* @throws SQLException when SQL execution gives an error
166181
* @throws IOException when input/output error occurs
167182
*/
168-
boolean nextRow(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
183+
boolean nextRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
169184

170-
void afterRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
185+
/**
186+
* After the result set is processed, this method is called.
187+
*
188+
* @param command the SQL command that created this result set
189+
* @param results the result set
190+
* @param listener listener for the key/value stream generated from the result set
191+
* @throws SQLException
192+
* @throws IOException
193+
*/
194+
void afterRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
171195

172196
/**
173197
* Parse a value in a row column

src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/column/ColumnRiverSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private void fetch(Connection connection, SQLCommand command, OpInfo opInfo, Tim
106106
KeyValueStreamListener<Object, Object> listener =
107107
new ColumnKeyValueStreamListener<Object, Object>(opInfo.opType)
108108
.output(context.getRiverMouth());
109-
merge(result, listener);
109+
merge(command, result, listener);
110110
} catch (Exception e) {
111111
throw new IOException(e);
112112
} finally {

0 commit comments

Comments
 (0)