@@ -18,15 +18,14 @@ use std::fmt::Display;
18
18
* limitations under the License.
19
19
*/
20
20
use std:: io:: { self , ErrorKind } ;
21
- use std:: net:: ToSocketAddrs ;
22
21
use std:: sync:: { Arc , Mutex } ;
23
22
use std:: time:: Duration ;
24
23
25
24
use bytes:: { BufMut , Bytes , BytesMut } ;
26
25
use futures:: sink:: SinkExt ;
27
26
use futures:: stream:: StreamExt ;
28
27
29
- use tokio:: net:: TcpStream ;
28
+ use tokio:: net:: { lookup_host , TcpStream } ;
30
29
use tokio:: runtime;
31
30
use tokio:: sync:: mpsc:: error:: TryRecvError ;
32
31
use tokio:: sync:: mpsc:: { channel, Receiver , Sender } ;
@@ -334,35 +333,38 @@ impl Client {
334
333
offset
335
334
) ,
336
335
Some ( server) => {
337
- let server = server. clone ( ) ;
338
- let addr = server. to_socket_addrs ( ) . unwrap ( ) . next ( ) . unwrap ( ) ;
339
- trace ! ( "really connecting: i={} addr={:?}" , offset, addr) ;
340
- match TcpStream :: connect ( addr) . await {
336
+ let lookup_server = server. clone ( ) ;
337
+ let mut addrs = match lookup_host ( lookup_server) . await {
341
338
Err ( e) => {
342
339
error ! (
343
- "Couldn't connect to {} [{}], will retry after {:? }" ,
344
- server, e, RECONNECT_BACKOFF
340
+ "Couldn't lookup address for server({}): { }" ,
341
+ server, e
345
342
) ;
346
- let ctx_conn = ctx_conn. clone ( ) ;
347
- // Retry in BACKOFF seconds -- TODO: keep track and do exponential
348
- runtime:: Handle :: current ( ) . spawn ( async move {
349
- sleep ( RECONNECT_BACKOFF ) . await ;
350
- ctx_conn. send ( offset) . await
351
- } ) ;
343
+ continue ;
352
344
}
353
- Ok ( wholestream) => {
354
- info ! ( "Connected to {}" , server) ;
355
- let pc = PacketCodec { } ;
356
- /*
357
- if !connector_active_servers.lock().unwrap()[offset] {
345
+ Ok ( addrs) => addrs,
346
+ } ;
347
+
348
+ // Perhaps here we should loop and return whether or not we connected to one of them
349
+ let abort_connector = loop {
350
+ let addr = match addrs. next ( ) {
351
+ None => break false ,
352
+ Some ( addr) => addr,
353
+ } ;
354
+ trace ! ( "really connecting: i={} addr={:?}" , offset, addr) ;
355
+ match TcpStream :: connect ( addr) . await {
356
+ Err ( e) => {
358
357
warn ! (
359
- "Received offset of disconnected server, ignoring"
358
+ "Couldn't connect to {} with address {} [{}]" ,
359
+ server, addr, e
360
360
) ;
361
+ // try the next address if we got multiple
361
362
continue ;
362
363
}
363
- This probably should not be needed.
364
- */
365
- let ( mut sink, mut stream) = match tls {
364
+ Ok ( wholestream) => {
365
+ info ! ( "Connected to {}" , server) ;
366
+ let pc = PacketCodec { } ;
367
+ let ( mut sink, mut stream) = match tls {
366
368
#[ cfg( not( feature = "tls" ) ) ]
367
369
Some ( _) => unreachable ! ( "We shouldn't have a tls config without feature = tls" ) ,
368
370
#[ cfg( feature = "tls" ) ]
@@ -401,91 +403,109 @@ impl Client {
401
403
pc. framed ( WrappedStream :: from ( wholestream) ) . split ( )
402
404
}
403
405
} ;
404
- if let Some ( ref client_id) = client_id {
405
- let req = new_req ( SET_CLIENT_ID , client_id. clone ( ) ) ;
406
- if let Err ( e) = sink. send ( req) . await {
407
- debug ! (
406
+ if let Some ( ref client_id) = client_id {
407
+ let req = new_req ( SET_CLIENT_ID , client_id. clone ( ) ) ;
408
+ if let Err ( e) = sink. send ( req) . await {
409
+ debug ! (
408
410
"Connection {:?} can't send packets. ({:?})" ,
409
411
sink, e
410
412
) ;
411
- continue ;
413
+ continue ;
414
+ }
412
415
}
413
- }
414
- let ( tx, mut rx) = channel ( CLIENT_CHANNEL_BOUND_SIZE ) ; // XXX pick a good value or const
415
- let tx = tx. clone ( ) ;
416
- let tx2 = tx. clone ( ) ;
417
- let connserver = server. clone ( ) ;
418
- let handler = ConnHandler :: new (
419
- & client_id,
420
- connserver. into ( ) ,
421
- tx2,
422
- handler_client_data. clone ( ) ,
423
- true ,
424
- ) ;
425
- trace ! ( "Inserting at {}" , offset) ;
426
- connector_conns
427
- . lock ( )
428
- . unwrap ( )
429
- . insert ( offset, handler. clone ( ) ) ;
430
- trace ! ( "Inserted at {}" , offset) ;
431
- let reader_conns = connector_conns. clone ( ) ;
432
- let reader_ctx = ctx2. clone ( ) ;
433
- let reader = async move {
416
+ let ( tx, mut rx) = channel ( CLIENT_CHANNEL_BOUND_SIZE ) ; // XXX pick a good value or const
434
417
let tx = tx. clone ( ) ;
435
- while let Some ( frame) = stream. next ( ) . await {
436
- trace ! ( "Frame read: {:?}" , frame) ;
437
- let response = match frame {
438
- Err ( e) => Err ( e) ,
439
- Ok ( frame) => {
440
- let handler = handler. clone ( ) ;
441
- debug ! ( "Locking handler" ) ;
442
- let mut handler = handler;
443
- debug ! ( "Locked handler" ) ;
444
- handler. call ( frame)
418
+ let tx2 = tx. clone ( ) ;
419
+ let connserver = server. clone ( ) ;
420
+ let handler = ConnHandler :: new (
421
+ & client_id,
422
+ connserver. into ( ) ,
423
+ tx2,
424
+ handler_client_data. clone ( ) ,
425
+ true ,
426
+ ) ;
427
+ trace ! ( "Inserting at {}" , offset) ;
428
+ connector_conns
429
+ . lock ( )
430
+ . unwrap ( )
431
+ . insert ( offset, handler. clone ( ) ) ;
432
+ trace ! ( "Inserted at {}" , offset) ;
433
+ let reader_conns = connector_conns. clone ( ) ;
434
+ let reader_ctx = ctx2. clone ( ) ;
435
+ let reader = async move {
436
+ let tx = tx. clone ( ) ;
437
+ while let Some ( frame) = stream. next ( ) . await {
438
+ trace ! ( "Frame read: {:?}" , frame) ;
439
+ let response = match frame {
440
+ Err ( e) => Err ( e) ,
441
+ Ok ( frame) => {
442
+ let handler = handler. clone ( ) ;
443
+ debug ! ( "Locking handler" ) ;
444
+ let mut handler = handler;
445
+ debug ! ( "Locked handler" ) ;
446
+ handler. call ( frame)
447
+ }
448
+ } ;
449
+ if let Err ( e) = response {
450
+ error ! ( "conn dropped?: {}" , e) ;
451
+ break ;
452
+ }
453
+ if let Err ( _) = tx. send ( response. unwrap ( ) ) . await
454
+ {
455
+ error ! ( "receiver dropped" )
445
456
}
446
- } ;
447
- if let Err ( e) = response {
448
- error ! ( "conn dropped?: {}" , e) ;
449
- break ;
450
457
}
451
- if let Err ( _) = tx. send ( response. unwrap ( ) ) . await {
452
- error ! ( "receiver dropped" )
458
+ reader_conns
459
+ . lock ( )
460
+ . unwrap ( )
461
+ . get_mut ( offset)
462
+ . and_then ( |conn| Some ( conn. set_active ( false ) ) ) ;
463
+ if let Err ( e) = reader_ctx. send ( offset) . await {
464
+ error ! (
465
+ "Can't send to connector, aborting! {}" ,
466
+ e
467
+ ) ;
453
468
}
454
- }
455
- reader_conns
456
- . lock ( )
457
- . unwrap ( )
458
- . get_mut ( offset)
459
- . and_then ( |conn| Some ( conn. set_active ( false ) ) ) ;
460
- if let Err ( e) = reader_ctx. send ( offset) . await {
461
- error ! ( "Can't send to connector, aborting! {}" , e) ;
462
- }
463
- } ;
464
- let writer_conns = connector_conns. clone ( ) ;
465
- let writer = async move {
466
- while let Some ( packet) = rx. recv ( ) . await {
467
- trace ! ( "Sending {:?}" , & packet) ;
468
- if let Err ( _) = sink. send ( packet) . await {
469
- error ! ( "Connection ({}) dropped" , offset) ;
470
- writer_conns
471
- . lock ( )
472
- . unwrap ( )
473
- . get_mut ( offset)
474
- . and_then ( |conn| {
475
- Some ( conn. set_active ( false ) )
476
- } ) ;
469
+ } ;
470
+ let writer_conns = connector_conns. clone ( ) ;
471
+ let writer = async move {
472
+ while let Some ( packet) = rx. recv ( ) . await {
473
+ trace ! ( "Sending {:?}" , & packet) ;
474
+ if let Err ( _) = sink. send ( packet) . await {
475
+ error ! ( "Connection ({}) dropped" , offset) ;
476
+ writer_conns
477
+ . lock ( )
478
+ . unwrap ( )
479
+ . get_mut ( offset)
480
+ . and_then ( |conn| {
481
+ Some ( conn. set_active ( false ) )
482
+ } ) ;
483
+ }
477
484
}
485
+ } ;
486
+ runtime:: Handle :: current ( ) . spawn ( reader) ;
487
+ runtime:: Handle :: current ( ) . spawn ( writer) ;
488
+ if let Err ( e) = ctdtx. send ( offset) . await {
489
+ // Connected channel is closed, shut it all down
490
+ info ! ( "Shutting down connector because connected channel returned error ({})" , e) ;
491
+ break true ;
478
492
}
479
- } ;
480
- runtime:: Handle :: current ( ) . spawn ( reader) ;
481
- runtime:: Handle :: current ( ) . spawn ( writer) ;
482
- if let Err ( e) = ctdtx. send ( offset) . await {
483
- // Connected channel is closed, shut it all down
484
- info ! ( "Shutting down connector because connected channel returned error ({})" , e) ;
485
- break ;
486
493
}
487
494
}
495
+ } ;
496
+ if abort_connector {
497
+ break ;
488
498
}
499
+ let ctx_conn = ctx_conn. clone ( ) ;
500
+ error ! (
501
+ "Could not connect to any addresses for {}, retrying in {:?}" ,
502
+ server, RECONNECT_BACKOFF
503
+ ) ;
504
+ // Retry in BACKOFF seconds -- TODO: keep track and do exponential
505
+ runtime:: Handle :: current ( ) . spawn ( async move {
506
+ sleep ( RECONNECT_BACKOFF ) . await ;
507
+ ctx_conn. send ( offset) . await
508
+ } ) ;
489
509
}
490
510
}
491
511
}
0 commit comments