Skip to content

Commit 68953cc

Browse files
authoredOct 29, 2022
Add batch and segment size options (#163)
Closes #136
1 parent 45a0913 commit 68953cc

File tree

4 files changed

+366
-60
lines changed

4 files changed

+366
-60
lines changed
 

‎src/Input/ParallelizationInput.php

+93-25
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ final class ParallelizationInput
3232
public const PROCESSES_OPTION = 'processes';
3333
public const MAIN_PROCESS_OPTION = 'main-process';
3434
public const CHILD_OPTION = 'child';
35+
public const BATCH_SIZE = 'batch-size';
36+
public const SEGMENT_SIZE = 'segment-size';
3537

3638
public const OPTIONS = [
3739
self::PROCESSES_OPTION,
@@ -52,18 +54,33 @@ final class ParallelizationInput
5254
private $findNumberOfProcesses;
5355

5456
private ?string $item;
57+
5558
private bool $childProcess;
5659

60+
/**
61+
* @var positive-int|null
62+
*/
63+
private ?int $batchSize;
64+
65+
/**
66+
* @var positive-int|null
67+
*/
68+
private ?int $segmentSize;
69+
5770
/**
5871
* @internal Use the static factory methods instead.
5972
*
6073
* @param positive-int|callable():positive-int $numberOfOrFindNumberOfProcesses
74+
* @param positive-int|null $batchSize
75+
* @param positive-int|null $segmentSize
6176
*/
6277
public function __construct(
6378
bool $mainProcess,
6479
$numberOfOrFindNumberOfProcesses,
6580
?string $item,
66-
bool $childProcess
81+
bool $childProcess,
82+
?int $batchSize,
83+
?int $segmentSize
6784
) {
6885
$this->mainProcess = $mainProcess;
6986
$this->item = $item;
@@ -74,6 +91,9 @@ public function __construct(
7491
} else {
7592
$this->findNumberOfProcesses = $numberOfOrFindNumberOfProcesses;
7693
}
94+
95+
$this->batchSize = $batchSize;
96+
$this->segmentSize = $segmentSize;
7797
}
7898

7999
public static function fromInput(InputInterface $input): self
@@ -87,6 +107,10 @@ public static function fromInput(InputInterface $input): self
87107
$mainProcess = $input->getOption(self::MAIN_PROCESS_OPTION);
88108
/** @var bool $isChild */
89109
$isChild = $input->getOption(self::CHILD_OPTION);
110+
/** @var string|int|null $batchSize */
111+
$batchSize = $input->getOption(self::BATCH_SIZE);
112+
/** @var string|int|null $segmentSize */
113+
$segmentSize = $input->getOption(self::SEGMENT_SIZE);
90114

91115
if ($hasItem) {
92116
$item = self::validateItem($item);
@@ -114,11 +138,24 @@ public static function fromInput(InputInterface $input): self
114138
);
115139
}
116140

141+
$batchSize = self::coerceAndValidatePositiveInt(
142+
$batchSize,
143+
'batch size',
144+
true,
145+
);
146+
$segmentSize = self::coerceAndValidatePositiveInt(
147+
$segmentSize,
148+
'segment size',
149+
true,
150+
);
151+
117152
return new self(
118153
$mainProcess,
119154
$validatedNumberOfProcesses,
120155
$hasItem ? $item : null,
121156
$isChild,
157+
$batchSize,
158+
$segmentSize,
122159
);
123160
}
124161

@@ -152,6 +189,18 @@ public static function configureCommand(Command $command): void
152189
null,
153190
InputOption::VALUE_NONE,
154191
'Set on child processes.',
192+
)
193+
->addOption(
194+
self::BATCH_SIZE,
195+
null,
196+
InputOption::VALUE_REQUIRED,
197+
'Set the batch size.',
198+
)
199+
->addOption(
200+
self::SEGMENT_SIZE,
201+
null,
202+
InputOption::VALUE_REQUIRED,
203+
'Set the segment size.',
155204
);
156205
}
157206

@@ -182,6 +231,22 @@ public function isChildProcess(): bool
182231
return $this->childProcess;
183232
}
184233

234+
/**
235+
* @return positive-int|null
236+
*/
237+
public function getBatchSize(): ?int
238+
{
239+
return $this->batchSize;
240+
}
241+
242+
/**
243+
* @return positive-int|null
244+
*/
245+
public function getSegmentSize(): ?int
246+
{
247+
return $this->segmentSize;
248+
}
249+
185250
/**
186251
* @param mixed $item
187252
*/
@@ -210,36 +275,39 @@ private static function validateItem($item): string
210275
*/
211276
private static function coerceNumberOfProcesses(string $numberOfProcesses): int
212277
{
213-
Assert::numeric(
278+
return self::coerceAndValidatePositiveInt(
214279
$numberOfProcesses,
215-
sprintf(
216-
'Expected the number of process defined to be a valid numeric value. Got "%s".',
217-
$numberOfProcesses,
218-
),
280+
'maximum number of parallel processes',
281+
false,
219282
);
283+
}
220284

221-
$castedNumberOfProcesses = (int) $numberOfProcesses;
285+
/**
286+
* @param string|int|null $value
287+
*
288+
* @return ($nullable is true ? positive-int|null : positive-int)
289+
*/
290+
private static function coerceAndValidatePositiveInt(
291+
$value,
292+
string $name,
293+
bool $nullable
294+
): ?int {
295+
if ($nullable && null === $value) {
296+
return null;
297+
}
222298

223-
Assert::same(
224-
// We cast it again in string to make sure since it is more convenient to pass an
225-
// int in the tests or when calling the command directly without passing by the CLI
226-
(string) $numberOfProcesses,
227-
(string) $castedNumberOfProcesses,
228-
sprintf(
229-
'Expected the number of process defined to be an integer. Got "%s".',
230-
$numberOfProcesses,
231-
),
299+
$message = sprintf(
300+
'Expected the %s to be an integer greater than or equal to 1. Got "%s".',
301+
$name,
302+
$value,
232303
);
233304

234-
Assert::greaterThan(
235-
$castedNumberOfProcesses,
236-
0,
237-
sprintf(
238-
'Expected the number of processes to be 1 or greater. Got "%s".',
239-
$castedNumberOfProcesses,
240-
),
241-
);
305+
Assert::integerish($value, $message);
306+
307+
$value = (int) $value;
308+
309+
Assert::positiveInteger($value, $message);
242310

243-
return $castedNumberOfProcesses;
311+
return $value;
244312
}
245313
}

‎src/ParallelExecutor.php

+10-4
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,12 @@ public function execute(
169169
Logger $logger
170170
): int {
171171
if ($parallelizationInput->isChildProcess()) {
172-
return $this->executeChildProcess($input, $output, $logger);
172+
return $this->executeChildProcess(
173+
$parallelizationInput,
174+
$input,
175+
$output,
176+
$logger,
177+
);
173178
}
174179

175180
return $this->executeMainProcess(
@@ -198,8 +203,8 @@ private function executeMainProcess(
198203
): int {
199204
($this->runBeforeFirstCommand)($input, $output);
200205

201-
$batchSize = $this->batchSize;
202-
$desiredSegmentSize = $this->segmentSize;
206+
$batchSize = $parallelizationInput->getBatchSize() ?? $this->batchSize;
207+
$desiredSegmentSize = $parallelizationInput->getSegmentSize() ?? $this->segmentSize;
203208

204209
$itemIterator = ChunkedItemsIterator::fromItemOrCallable(
205210
$parallelizationInput->getItem(),
@@ -269,13 +274,14 @@ private function executeMainProcess(
269274
* @return 0|positive-int
270275
*/
271276
private function executeChildProcess(
277+
ParallelizationInput $parallelizationInput,
272278
InputInterface $input,
273279
OutputInterface $output,
274280
Logger $logger
275281
): int {
276282
$itemIterator = ChunkedItemsIterator::fromStream(
277283
$this->childSourceStream,
278-
$this->batchSize,
284+
$parallelizationInput->getBatchSize() ?? $this->batchSize,
279285
);
280286

281287
$progressSymbol = $this->progressSymbol;

0 commit comments

Comments
 (0)
Please sign in to comment.