@@ -11,6 +11,9 @@ module RunLoop
11
11
class TimeoutError < RuntimeError
12
12
end
13
13
14
+ class WriteFailedError < RuntimeError
15
+ end
16
+
14
17
module Core
15
18
16
19
START_DELIMITER = "OUTPUT_JSON:\n "
@@ -488,23 +491,47 @@ def self.jruby?
488
491
RUBY_PLATFORM == 'java'
489
492
end
490
493
491
- def self . write_request ( run_loop , cmd )
494
+ def self . write_request ( run_loop , cmd , logger = nil )
492
495
repl_path = run_loop [ :repl_path ]
493
496
index = run_loop [ :index ]
494
497
cmd_str = "#{ index } :#{ escape_host_command ( cmd ) } "
495
- log ( cmd_str ) if ENV [ 'DEBUG' ] == '1'
496
- File . open ( repl_path , 'w' ) { |f | f . puts ( cmd_str ) }
498
+ should_log = ( ENV [ 'DEBUG' ] == '1' )
499
+ RunLoop . log_info ( logger , cmd_str ) if should_log
500
+ write_succeeded = false
501
+ 2 . times do |i |
502
+ RunLoop . log_info ( logger , "Trying write of command #{ cmd_str } at index #{ index } " ) if should_log
503
+ File . open ( repl_path , 'w' ) { |f | f . puts ( cmd_str ) }
504
+ write_succeeded = validate_index_written ( run_loop , index , logger )
505
+ break if write_succeeded
506
+ end
507
+ unless write_succeeded
508
+ RunLoop . log_info ( logger , 'Failing...Raising RunLoop::WriteFailedError' ) if should_log
509
+ raise RunLoop ::WriteFailedError . new ( "Trying write of command #{ cmd_str } at index #{ index } " )
510
+ end
497
511
run_loop [ :index ] = index + 1
498
512
499
513
index
500
514
end
501
515
516
+ def self . validate_index_written ( run_loop , index , logger )
517
+ begin
518
+ Timeout ::timeout ( 10 , TimeoutError ) do
519
+ Core . read_response ( run_loop , index , 10 , 'last_index' )
520
+ end
521
+ RunLoop . log_info ( logger , "validate index written for index #{ index } ok" )
522
+ return true
523
+ rescue TimeoutError => _
524
+ RunLoop . log_info ( logger , "validate index written for index #{ index } failed. Retrying." )
525
+ return false
526
+ end
527
+ end
528
+
502
529
def self . escape_host_command ( cmd )
503
530
backquote = "\\ "
504
531
cmd . gsub ( backquote , backquote *4 )
505
532
end
506
533
507
- def self . read_response ( run_loop , expected_index , empty_file_timeout = 10 )
534
+ def self . read_response ( run_loop , expected_index , empty_file_timeout = 10 , search_for_property = 'index' )
508
535
509
536
log_file = run_loop [ :log_file ]
510
537
initial_offset = run_loop [ :initial_offset ] || 0
@@ -566,7 +593,7 @@ def self.read_response(run_loop, expected_index, empty_file_timeout=10)
566
593
if ENV [ 'DEBUG_READ' ] =='1'
567
594
p parsed_result
568
595
end
569
- json_index_if_present = parsed_result [ 'index' ]
596
+ json_index_if_present = parsed_result [ search_for_property ]
570
597
if json_index_if_present && json_index_if_present == expected_index
571
598
result = parsed_result
572
599
break
@@ -792,16 +819,12 @@ def self.send_command(run_loop, cmd, options={timeout: 60}, num_retries=0, last_
792
819
expected_index = run_loop [ :index ]
793
820
result = nil
794
821
begin
795
- expected_index = Core . write_request ( run_loop , cmd )
796
- rescue Errno ::EINTR => intr_error
822
+ expected_index = Core . write_request ( run_loop , cmd , logger )
823
+ rescue RunLoop :: WriteFailedError , Errno ::EINTR => write_error
797
824
# Attempt recover from interrupt by attempting to read result (assuming write went OK)
798
825
# or retry if attempted read result fails
799
826
run_loop [ :index ] = expected_index # restore expected index in case it changed
800
- log_info ( logger , "Core.write_request was interrupted: #{ intr_error } . Attempting recovery..." )
801
- sleep ( 1 ) # Arbitrary wait in hope that the system condition causing the interrupt passes
802
- if intr_error && intr_error . backtrace
803
- log_info ( logger , "backtrace: #{ intr_error . backtrace . join ( "\n " ) } " )
804
- end
827
+ log_info ( logger , "Core.write_request failed: #{ write_error } . Attempting recovery..." )
805
828
log_info ( logger , "Attempting read in case the request was received... Please wait (#{ interrupt_retry_timeout } )..." )
806
829
begin
807
830
Timeout ::timeout ( interrupt_retry_timeout , TimeoutError ) do
@@ -813,7 +836,7 @@ def self.send_command(run_loop, cmd, options={timeout: 60}, num_retries=0, last_
813
836
return result
814
837
rescue TimeoutError => _
815
838
log_info ( logger , "Read did not result in a response for index #{ expected_index } ... Retrying send_command..." )
816
- return send_command ( run_loop , cmd , options , num_retries +1 , intr_error )
839
+ return send_command ( run_loop , cmd , options , num_retries +1 , write_error )
817
840
end
818
841
end
819
842
0 commit comments