Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce an input class #29

Merged
merged 11 commits into from
May 25, 2020
97 changes: 33 additions & 64 deletions src/Parallelization.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Terminal;
use Symfony\Component\DependencyInjection\ContainerInterface;
Expand Down Expand Up @@ -128,26 +126,7 @@ private static function getWorkingDirectory(ContainerInterface $container): stri
*/
protected static function configureParallelization(Command $command): void
{
$command
->addArgument(
'item',
InputArgument::OPTIONAL,
'The item to process'
)
->addOption(
'processes',
'p',
InputOption::VALUE_OPTIONAL,
'The number of parallel processes to run',
null
)
->addOption(
'child',
null,
InputOption::VALUE_NONE,
'Set on child processes'
)
;
ParallelizationInput::configureParallelization($command);
}

/**
Expand Down Expand Up @@ -284,7 +263,16 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return 0;
}

$this->executeMasterProcess($input, $output);
$parallelizationInput = new ParallelizationInput(
$input,
function (InputInterface $input): array {
return $this->fetchItems($input);
},
$this->getSegmentSize(),
$this->getBatchSize()
);

$this->executeMasterProcess($parallelizationInput, $input, $output);

return 0;
}
Expand All @@ -297,50 +285,26 @@ protected function execute(InputInterface $input, OutputInterface $output): int
* items of the processed data set and terminates. As long as there is data
* left to process, new child processes are spawned automatically.
*/
protected function executeMasterProcess(InputInterface $input, OutputInterface $output): void
{
protected function executeMasterProcess(
ParallelizationInput $parallelizationInput,
InputInterface $input,
OutputInterface $output
): void {
$this->runBeforeFirstCommand($input, $output);

$numberOfProcessesDefined = null !== $input->getOption('processes');
$numberOfProcesses = $numberOfProcessesDefined ? (int) $input->getOption('processes') : 1;
$hasItem = (bool) $input->getArgument('item');
$items = $hasItem ? [$input->getArgument('item')] : $this->fetchItems($input);
$count = count($items);
$segmentSize = 1 === $numberOfProcesses && !$numberOfProcessesDefined ? $count : $this->getSegmentSize();
$batchSize = $this->getBatchSize();
$rounds = 1 === $numberOfProcesses ? 1 : ceil($count * 1.0 / $segmentSize);
$batches = ceil($segmentSize * 1.0 / $batchSize) * $rounds;

Assert::greaterThan(
$numberOfProcesses,
0,
sprintf(
'Requires at least one process. Got "%s"',
$input->getOption('processes')
)
);

if (!$hasItem && 1 !== $numberOfProcesses) {
// Shouldn't check this when only one item has been specified or
// when no child processes is used
Assert::greaterThanEq(
$segmentSize,
$batchSize,
sprintf(
'The segment size should always be greater or equal to '
.'the batch size. Got respectively "%d" and "%d"',
$segmentSize,
$batchSize
)
);
}
$numberOfProcesses = $parallelizationInput->getNumberOfProcesses();
$segmentSize = $parallelizationInput->getSegmentSize();
$count = $parallelizationInput->getItemsCount();
$rounds = $parallelizationInput->getRounds();
$batches = $parallelizationInput->getBatches();
$items = $parallelizationInput->getItems();

$output->writeln(sprintf(
'Processing %d %s in segments of %d, batches of %d, %d %s, %d %s in %d %s',
$count,
$this->getItemName($count),
$segmentSize,
$batchSize,
$parallelizationInput->getBatchSize(),
$rounds,
1 === $rounds ? 'round' : 'rounds',
$batches,
Expand All @@ -354,7 +318,9 @@ protected function executeMasterProcess(InputInterface $input, OutputInterface $
$progressBar->setFormat('debug');
$progressBar->start();

if ($count <= $segmentSize || (1 === $numberOfProcesses && !$numberOfProcessesDefined)) {
if ($count <= $segmentSize
|| (1 === $numberOfProcesses && !$parallelizationInput->isNumberOfProcessesDefined())
) {
// Run in the master process

$itemsChunks = array_chunk(
Expand Down Expand Up @@ -520,12 +486,14 @@ private function runTolerantSingleCommand(
}
}
}

/**
* @param string[] $blackListParams
*
* @return string[]
*/
private function serializeInputOptions(InputInterface $input, array $blackListParams) : array {
private function serializeInputOptions(InputInterface $input, array $blackListParams): array
{
$options = array_diff_key(
array_filter($input->getOptions()),
array_fill_keys($blackListParams, '')
Expand All @@ -536,9 +504,9 @@ private function serializeInputOptions(InputInterface $input, array $blackListPa
$definition = $this->getDefinition();
$option = $definition->getOption($name);

$optionString = "";
$optionString = '';
if (!$option->acceptValue()) {
$optionString .= ' --' . $name;
$optionString .= ' --'.$name;
} elseif ($option->isArray()) {
foreach ($value as $arrayValue) {
$optionString .= ' --'.$name.'='.$arrayValue;
Expand All @@ -549,6 +517,7 @@ private function serializeInputOptions(InputInterface $input, array $blackListPa

$preparedOptionList[] = $optionString;
}

return $preparedOptionList;
}
}
Loading