Skip to content

Commit f2d0b74

Browse files
authored
Minor ParallelExecutor refactoring (#98)
1 parent 36e0e7a commit f2d0b74

File tree

1 file changed

+119
-64
lines changed

1 file changed

+119
-64
lines changed

src/ParallelExecutor.php

+119-64
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use function mb_strlen;
2222
use function sprintf;
2323
use const STDIN;
24+
use Symfony\Component\Console\Input\Input;
2425
use Symfony\Component\Console\Input\InputDefinition;
2526
use Symfony\Component\Console\Input\InputInterface;
2627
use Symfony\Component\Console\Output\OutputInterface;
@@ -158,35 +159,31 @@ public function execute(
158159
Logger $logger
159160
): int {
160161
if ($parallelizationInput->isChildProcess()) {
161-
$this->executeChildProcess($input, $output, $logger);
162-
163-
return 0;
162+
return $this->executeChildProcess($input, $output, $logger);
164163
}
165164

166-
$this->executeMasterProcess(
165+
return $this->executeMainProcess(
167166
$parallelizationInput,
168167
$input,
169168
$output,
170169
$logger,
171170
);
172-
173-
return 0;
174171
}
175172

176173
/**
177-
* Executes the master process.
174+
* Executes the main process.
178175
*
179-
* The master process spawns as many child processes as set in the
176+
* The main process spawns as many child processes as set in the
180177
* "--processes" option. Each of the child processes receives a segment of
181178
* items of the processed data set and terminates. As long as there is data
182179
* left to process, new child processes are spawned automatically.
183180
*/
184-
private function executeMasterProcess(
181+
private function executeMainProcess(
185182
ParallelizationInput $parallelizationInput,
186183
InputInterface $input,
187184
OutputInterface $output,
188185
Logger $logger
189-
): void {
186+
): int {
190187
($this->runBeforeFirstCommand)($input, $output);
191188

192189
$isNumberOfProcessesDefined = $parallelizationInput->isNumberOfProcessesDefined();
@@ -227,64 +224,36 @@ private function executeMasterProcess(
227224

228225
$logger->startProgress($numberOfItems);
229226

230-
if ($numberOfItems <= $segmentSize
231-
|| (1 === $numberOfProcesses && !$parallelizationInput->isNumberOfProcessesDefined())
232-
) {
233-
// Run in the master process
234-
235-
foreach ($itemIterator->getItemChunks() as $items) {
236-
($this->runBeforeBatch)($input, $output, $items);
237-
238-
foreach ($items as $item) {
239-
$this->runTolerantSingleCommand($item, $input, $output, $logger);
240-
241-
$logger->advance();
242-
}
243-
244-
($this->runAfterBatch)($input, $output, $items);
245-
}
246-
} else {
247-
// Distribute if we have multiple segments
248-
$commandTemplate = array_merge(
249-
array_filter([
250-
$this->phpExecutable,
251-
$this->scriptPath,
252-
$this->commandName,
253-
implode(
254-
' ',
255-
array_slice(
256-
array_map('strval', $input->getArguments()),
257-
1,
258-
),
259-
),
260-
'--child',
261-
]),
262-
// Forward all the options except for "processes" to the children
263-
// this way the children can inherit the options such as env
264-
// or no-debug.
265-
InputOptionsSerializer::serialize(
266-
$this->commandDefinition,
227+
if (self::shouldSpawnChildProcesses(
228+
$numberOfItems,
229+
$segmentSize,
230+
$numberOfProcesses,
231+
$parallelizationInput->isNumberOfProcessesDefined(),
232+
)) {
233+
$this
234+
->createProcessLauncher(
235+
$segmentSize,
236+
$numberOfProcesses,
267237
$input,
268-
['child', 'processes'],
269-
),
270-
);
271-
272-
$processLauncher = new ProcessLauncher(
273-
$commandTemplate,
274-
$this->workingDirectory,
275-
$this->extraEnvironmentVariables,
276-
$numberOfProcesses,
277-
$segmentSize,
238+
$logger,
239+
)
240+
->run($itemIterator->getItems());
241+
} else {
242+
$this->processItems(
243+
$itemIterator,
244+
$input,
245+
$output,
278246
$logger,
279-
fn (string $type, string $buffer) => $this->processChildOutput($buffer, $logger),
247+
static fn () => $logger->advance(),
280248
);
281-
282-
$processLauncher->run($itemIterator->getItems());
283249
}
284250

285251
$logger->finish($itemName);
286252

287253
($this->runAfterLastCommand)($input, $output);
254+
255+
// TODO: use the exit code constants once we drop support for Symfony 4.4
256+
return 0;
288257
}
289258

290259
/**
@@ -298,21 +267,43 @@ private function executeChildProcess(
298267
InputInterface $input,
299268
OutputInterface $output,
300269
Logger $logger
301-
): void {
302-
$advancementChar = $this->progressSymbol;
303-
270+
): int {
304271
$itemIterator = ChunkedItemsIterator::fromStream(
305272
STDIN,
306273
$this->batchSize,
307274
);
308275

276+
$progressSymbol = $this->progressSymbol;
277+
278+
$this->processItems(
279+
$itemIterator,
280+
$input,
281+
$output,
282+
$logger,
283+
static fn () => $output->write($progressSymbol),
284+
);
285+
286+
// TODO: use the exit code constants once we drop support for Symfony 4.4
287+
return 0;
288+
}
289+
290+
/**
291+
* @param callable():void $advance
292+
*/
293+
private function processItems(
294+
ChunkedItemsIterator $itemIterator,
295+
InputInterface $input,
296+
OutputInterface $output,
297+
Logger $logger,
298+
callable $advance
299+
): void {
309300
foreach ($itemIterator->getItemChunks() as $items) {
310301
($this->runBeforeBatch)($input, $output, $items);
311302

312303
foreach ($items as $item) {
313304
$this->runTolerantSingleCommand($item, $input, $output, $logger);
314305

315-
$output->write($advancementChar);
306+
$advance();
316307
}
317308

318309
($this->runAfterBatch)($input, $output, $items);
@@ -332,6 +323,70 @@ private function runTolerantSingleCommand(
332323
}
333324
}
334325

326+
/**
327+
* @param 0|positive-int $numberOfItems
328+
* @param positive-int $segmentSize
329+
* @param positive-int $numberOfProcesses
330+
*/
331+
private static function shouldSpawnChildProcesses(
332+
int $numberOfItems,
333+
int $segmentSize,
334+
int $numberOfProcesses,
335+
bool $umberOfProcessesDefined
336+
): bool {
337+
return $numberOfItems > $segmentSize
338+
&& ($numberOfProcesses > 1 || $umberOfProcessesDefined);
339+
}
340+
341+
private function createProcessLauncher(
342+
int $segmentSize,
343+
int $numberOfProcesses,
344+
InputInterface $input,
345+
Logger $logger
346+
): ProcessLauncher {
347+
$childCommand = array_merge(
348+
$this->createChildCommand($input),
349+
// Forward all the options except for "processes" to the children
350+
// this way the children can inherit the options such as env
351+
// or no-debug.
352+
InputOptionsSerializer::serialize(
353+
$this->commandDefinition,
354+
$input,
355+
['child', 'processes'],
356+
),
357+
);
358+
359+
return new ProcessLauncher(
360+
$childCommand,
361+
$this->workingDirectory,
362+
$this->extraEnvironmentVariables,
363+
$numberOfProcesses,
364+
$segmentSize,
365+
$logger,
366+
fn (string $type, string $buffer) => $this->processChildOutput($buffer, $logger),
367+
);
368+
}
369+
370+
/**
371+
* @return list<string>
372+
*/
373+
private function createChildCommand(InputInterface $input): array
374+
{
375+
return array_filter([
376+
$this->phpExecutable,
377+
$this->scriptPath,
378+
$this->commandName,
379+
implode(
380+
' ',
381+
array_slice(
382+
array_map('strval', $input->getArguments()),
383+
1,
384+
),
385+
),
386+
'--child',
387+
]);
388+
}
389+
335390
/**
336391
* Called whenever data is received in the master process from a child process.
337392
*

0 commit comments

Comments
 (0)