@@ -80,8 +80,6 @@ final class Pool
80
80
/** @var resource[] */
81
81
private array $ read_streams = [];
82
82
83
- private bool $ did_have_error = false ;
84
-
85
83
/** @var ?Closure(mixed): void */
86
84
private ?Closure $ task_done_closure = null ;
87
85
@@ -297,6 +295,20 @@ private static function streamForChild(array $sockets)
297
295
return $ for_write ;
298
296
}
299
297
298
+ private function killAllChildren (): void
299
+ {
300
+ foreach ($ this ->child_pid_list as $ child_pid ) {
301
+ /**
302
+ * SIGTERM does not exist on windows
303
+ *
304
+ * @psalm-suppress UnusedPsalmSuppress
305
+ * @psalm-suppress UndefinedConstant
306
+ * @psalm-suppress MixedArgument
307
+ */
308
+ posix_kill ($ child_pid , SIGTERM );
309
+ }
310
+ }
311
+
300
312
/**
301
313
* Read the results that each child process has serialized on their write streams.
302
314
* The results are returned in an array, one for each worker. The order of the results
@@ -319,6 +331,7 @@ private function readResultsFromChildren(): array
319
331
$ content = array_fill_keys (array_keys ($ streams ), '' );
320
332
321
333
$ terminationMessages = [];
334
+ $ done = [];
322
335
323
336
// Read the data off of all the stream.
324
337
while (count ($ streams ) > 0 ) {
@@ -361,34 +374,25 @@ private function readResultsFromChildren(): array
361
374
if ($ message instanceof ForkProcessDoneMessage) {
362
375
$ terminationMessages [] = $ message ->data ;
363
376
} elseif ($ message instanceof ForkTaskDoneMessage) {
377
+ $ done [(int )$ file ] = true ;
364
378
if ($ this ->task_done_closure !== null ) {
365
379
($ this ->task_done_closure )($ message ->data );
366
380
}
367
381
} elseif ($ message instanceof ForkProcessErrorMessage) {
368
- // Kill all children
369
- foreach ($ this ->child_pid_list as $ child_pid ) {
370
- /**
371
- * SIGTERM does not exist on windows
372
- *
373
- * @psalm-suppress UnusedPsalmSuppress
374
- * @psalm-suppress UndefinedConstant
375
- * @psalm-suppress MixedArgument
376
- */
377
- posix_kill ($ child_pid , SIGTERM );
378
- }
382
+ $ this ->killAllChildren ();
379
383
throw new Exception ($ message ->message );
380
384
} else {
381
- error_log ( ' Child should return ForkMessage - response type= ' . gettype ( $ message ) );
382
- $ this -> did_have_error = true ;
385
+ $ this -> killAllChildren ( );
386
+ throw new Exception ( ' Child should return ForkMessage - response type= ' . gettype ( $ message )) ;
383
387
}
384
388
}
385
389
}
386
390
387
391
// If the stream has closed, stop trying to select on it.
388
392
if (feof ($ file )) {
389
- if ($ content [(int )$ file ] !== '' ) {
390
- error_log ( ' Child did not send full message before closing the connection ' );
391
- $ this -> did_have_error = true ;
393
+ if ($ content [(int )$ file ] !== '' || ! isset ( $ done [( int ) $ file ]) ) {
394
+ $ this -> killAllChildren ( );
395
+ throw new Exception ( ' Child did not send full message before closing the connection ' ) ;
392
396
}
393
397
394
398
fclose ($ file );
@@ -450,21 +454,13 @@ public function wait(): array
450
454
* @psalm-suppress UndefinedConstant
451
455
*/
452
456
if ($ term_sig !== SIGALRM ) {
453
- $ this ->did_have_error = true ;
454
- error_log ("Child terminated with return code $ return_code and signal $ term_sig " );
457
+ $ this ->killAllChildren () ;
458
+ throw new Exception ("Child terminated with return code $ return_code and signal $ term_sig " );
455
459
}
456
460
}
457
461
}
458
462
}
459
463
460
464
return $ content ;
461
465
}
462
-
463
- /**
464
- * Returns true if this had an error, e.g. due to memory limits or due to a child process crashing.
465
- */
466
- public function didHaveError (): bool
467
- {
468
- return $ this ->did_have_error ;
469
- }
470
466
}
0 commit comments