17
17
import io .netty .channel .VoidChannelPromise ;
18
18
import io .netty .handler .codec .ByteToMessageDecoder ;
19
19
import io .netty .util .Attribute ;
20
+ import java .lang .reflect .Method ;
20
21
import java .util .ArrayList ;
21
22
import java .util .HashMap ;
22
23
import java .util .List ;
23
24
import java .util .Map ;
25
+ import java .util .concurrent .CompletableFuture ;
24
26
import org .apache .cassandra .auth .IAuthenticator ;
25
27
import org .apache .cassandra .cql3 .QueryProcessor ;
26
28
import org .apache .cassandra .service .ClientState ;
@@ -62,7 +64,7 @@ protected void initChannel(Channel channel) throws Exception {
62
64
INITIAL_HANDLER ,
63
65
new PipelineChannelInitializer (
64
66
new Envelope .Decoder (),
65
- (channel1 , version ) ->
67
+ (Channel channel1 , ProtocolVersion version ) ->
66
68
new UnixSocketConnection (channel1 , version , connectionTracker )));
67
69
/**
68
70
* The exceptionHandler will take care of handling exceptionCaught(...) events while still
@@ -82,7 +84,6 @@ static class UnixSockMessage extends SimpleChannelInboundHandler<Message.Request
82
84
@ Override
83
85
protected void channelRead0 (ChannelHandlerContext ctx , Message .Request request )
84
86
throws Exception {
85
- final Message .Response response ;
86
87
final UnixSocketConnection connection ;
87
88
long queryStartNanoTime = System .nanoTime ();
88
89
@@ -98,15 +99,35 @@ protected void channelRead0(ChannelHandlerContext ctx, Message.Request request)
98
99
// logger.info("Executing {} {} {}", request, connection.getVersion(),
99
100
// request.getStreamId());
100
101
101
- Message .Response r = request .execute (qstate , queryStartNanoTime );
102
-
103
- // UnixSocket has no auth
104
- response = r instanceof AuthenticateMessage ? new ReadyMessage () : r ;
102
+ // Converged Cassandra/Core 4 added Async processing as part of CNDB-10759. See if we have
103
+ // the method that returns a CompletableFuture.
104
+ try {
105
+ Method requestExecute =
106
+ Message .Request .class .getDeclaredMethod ("execute" , QueryState .class , long .class );
107
+ // get CompletableFuture type
108
+ if (CompletableFuture .class .equals (requestExecute .getReturnType ())) {
109
+ // newer Async processing
110
+ CompletableFuture <Message .Response > future =
111
+ (CompletableFuture <Message .Response >)
112
+ requestExecute .invoke (request , qstate , queryStartNanoTime );
113
+ future .whenComplete (
114
+ (Message .Response response , Throwable ignore ) -> {
115
+ processMessageResponse (response , request , connection , ctx );
116
+ });
117
+ } else if (Message .Response .class .equals (requestExecute .getReturnType ())) {
118
+ // older non-async processing
119
+ Message .Response response =
120
+ (Message .Response ) requestExecute .invoke (request , qstate , queryStartNanoTime );
105
121
106
- response .setStreamId (request .getStreamId ());
107
- response .setWarnings (ClientWarn .instance .getWarnings ());
108
- response .attach (connection );
109
- connection .applyStateTransition (request .type , response .type );
122
+ processMessageResponse (response , request , connection , ctx );
123
+ }
124
+ } catch (NoSuchMethodException ex ) {
125
+ // Unexepected missing method, throw an error and figure out what method signature we have
126
+ logger .error (
127
+ "Expected Cassandra Message.Request.execute() method signature not found. Management API agent will not be able to start Cassandra." ,
128
+ ex );
129
+ throw ex ;
130
+ }
110
131
} catch (Throwable t ) {
111
132
// logger.warn("Exception encountered", t);
112
133
JVMStabilityInspector .inspectThrowable (t );
@@ -119,7 +140,21 @@ protected void channelRead0(ChannelHandlerContext ctx, Message.Request request)
119
140
} finally {
120
141
ClientWarn .instance .resetWarnings ();
121
142
}
143
+ }
122
144
145
+ private void processMessageResponse (
146
+ Message .Response response ,
147
+ Message .Request request ,
148
+ final UnixSocketConnection connection ,
149
+ ChannelHandlerContext ctx ) {
150
+ if (response instanceof AuthenticateMessage ) {
151
+ // UnixSocket has no auth
152
+ response = new ReadyMessage ();
153
+ }
154
+ response .setStreamId (request .getStreamId ());
155
+ response .setWarnings (ClientWarn .instance .getWarnings ());
156
+ response .attach (connection );
157
+ connection .applyStateTransition (request .type , response .type );
123
158
ctx .writeAndFlush (response );
124
159
request .getSource ().release ();
125
160
}
@@ -284,18 +319,56 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> ou
284
319
285
320
promise = new VoidChannelPromise (ctx .channel (), false );
286
321
287
- Message .Response response =
288
- Dispatcher .processRequest (
289
- (ServerConnection ) connection , startup , ClientResourceLimits .Overload .NONE );
290
-
291
- if (response .type .equals (Message .Type .AUTHENTICATE ))
292
- // bypass authentication
293
- response = new ReadyMessage ();
294
-
295
- outbound = response .encode (inbound .header .version );
296
- ctx .writeAndFlush (outbound , promise );
297
- logger .debug ("Configured pipeline: {}" , ctx .pipeline ());
298
- break ;
322
+ // More Converged Cassandra/Core 4 changes for Async processing. This is generally a
323
+ // copy of upstream's InitConnectionHandler.
324
+
325
+ // Try to get the newer processInit static method
326
+ try {
327
+ Method processInit =
328
+ Dispatcher .class .getDeclaredMethod (
329
+ "processInit" , ServerConnection .class , StartupMessage .class );
330
+ ((CompletableFuture <Message .Response >)
331
+ processInit .invoke (null , (ServerConnection ) connection , startup ))
332
+ .whenComplete (
333
+ (Message .Response response , Throwable error ) -> {
334
+ if (error == null ) {
335
+ processStartupResponse (response , inbound , ctx , promise );
336
+ } else {
337
+ ErrorMessage message =
338
+ ErrorMessage .fromException (
339
+ new ProtocolException (
340
+ String .format ("Unexpected error %s" , error .getMessage ())));
341
+ Envelope encoded = message .encode (inbound .header .version );
342
+ ctx .writeAndFlush (encoded );
343
+ }
344
+ });
345
+ break ;
346
+ } catch (NoSuchMethodException nsme ) {
347
+ // try the older processRequest method
348
+ try {
349
+ Method processRequest =
350
+ Dispatcher .class .getDeclaredMethod (
351
+ "processRequest" ,
352
+ ServerConnection .class ,
353
+ StartupMessage .class ,
354
+ ClientResourceLimits .Overload .class );
355
+ Message .Response response =
356
+ (Message .Response )
357
+ processRequest .invoke (
358
+ null ,
359
+ (ServerConnection ) connection ,
360
+ startup ,
361
+ ClientResourceLimits .Overload .NONE );
362
+ processStartupResponse (response , inbound , ctx , promise );
363
+ break ;
364
+ } catch (NoSuchMethodException nsme2 ) {
365
+ // Expected method not found. Log an error and figure out what signature we need
366
+ logger .error (
367
+ "Expected Cassandra Dispatcher.processRequest() method signature not found. Management API agent will not be able to start Cassandra." ,
368
+ nsme2 );
369
+ throw nsme2 ;
370
+ }
371
+ }
299
372
300
373
default :
301
374
ErrorMessage error =
@@ -311,5 +384,19 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> ou
311
384
inbound .release ();
312
385
}
313
386
}
387
+
388
+ private void processStartupResponse (
389
+ Message .Response response ,
390
+ Envelope inbound ,
391
+ ChannelHandlerContext ctx ,
392
+ ChannelPromise promise ) {
393
+ if (response .type .equals (Message .Type .AUTHENTICATE )) {
394
+ // bypass authentication
395
+ response = new ReadyMessage ();
396
+ }
397
+ Envelope encoded = response .encode (inbound .header .version );
398
+ ctx .writeAndFlush (encoded , promise );
399
+ logger .debug ("Configured pipeline: {}" , ctx .pipeline ());
400
+ }
314
401
}
315
402
}
0 commit comments