-
Notifications
You must be signed in to change notification settings - Fork 566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Try scheduling as much as available #4528
Changes from 5 commits
a0eb18d
a2af5d4
59d07b4
2cd0118
1cd269f
e2bd7b4
ca582d7
7b1c9b6
05c77bd
003dda9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import random | ||
import time | ||
from typing import Dict | ||
from typing import List | ||
|
||
from googleapiclient import discovery | ||
|
||
|
@@ -25,9 +26,13 @@ | |
from clusterfuzz._internal.config import local_config | ||
from clusterfuzz._internal.datastore import data_types | ||
from clusterfuzz._internal.datastore import ndb_utils | ||
from clusterfuzz._internal.google_cloud_utils import batch | ||
from clusterfuzz._internal.google_cloud_utils import credentials | ||
from clusterfuzz._internal.metrics import logs | ||
|
||
# TODO(metzman): Actually implement this. | ||
CPUS_PER_FUZZ_JOB = 2 | ||
|
||
|
||
def _get_quotas(project, region): | ||
gcp_credentials = credentials.get_default()[0] | ||
|
@@ -36,6 +41,16 @@ def _get_quotas(project, region): | |
region=region, project=project).execute()['quotas'] | ||
|
||
|
||
def count_unacked(project: str, topic: str): | ||
creds, _ = credentials.get_default() | ||
subscription_path = f'projects/{project}/subscriptions/{topic}' | ||
client = discovery.build('pubsub', 'v1', credentials=creds) | ||
sub = client.projects().subscriptions().get( # pylint: disable=no-member | ||
subscription=subscription_path).execute() | ||
# TODO(metzman): Not all of these are fuzz_tasks. Deal with that. | ||
return float(sub.get('numUnackedMessages', 0)) | ||
|
||
|
||
def get_available_cpus_for_region(project: str, region: str) -> int: | ||
"""Returns the number of available CPUs in the current GCE region.""" | ||
quotas = _get_quotas(project, region) | ||
|
@@ -59,13 +74,15 @@ def get_available_cpus_for_region(project: str, region: str) -> int: | |
assert preemptible_quota or cpu_quota | ||
|
||
if not preemptible_quota['limit']: | ||
# Preemptible quota is not set. Obey the CPU quota since that limitss us. | ||
# Preemptible quota is not set. Obey the CPU quota since that limits us. | ||
quota = cpu_quota | ||
else: | ||
quota = preemptible_quota | ||
assert quota['limit'], quota | ||
|
||
return quota['limit'] - quota['usage'] | ||
# TODO(metzman): Do this in a more configurable way. | ||
limit = min(quota['limit'], 100_000) | ||
return limit - quota['usage'] | ||
|
||
|
||
class BaseFuzzTaskScheduler: | ||
|
@@ -79,8 +96,7 @@ def get_fuzz_tasks(self): | |
|
||
def _get_cpus_per_fuzz_job(self, job_name): | ||
del job_name | ||
# TODO(metzman): Actually implement this. | ||
return 2 | ||
return CPUS_PER_FUZZ_JOB | ||
|
||
|
||
class FuzzTaskCandidate: | ||
|
@@ -186,15 +202,38 @@ def get_batch_regions(batch_config): | |
return list(set(config['gce_region'] for config in mapping.values())) | ||
|
||
|
||
def get_available_cpus(project: str, regions: List[str]) -> int: | ||
"""Returns the available CPUs for fuzz tasks.""" | ||
# TODO(metzman): This doesn't distinguish between fuzz and non-fuzz | ||
# tasks (nor preemptible and non-preemptible CPUs). Fix this. | ||
waiting_tasks = sum( | ||
batch.count_queued_or_scheduled_tasks(project, region) | ||
for region in regions) | ||
waiting_tasks += count_unacked(project, 'preprocess') | ||
waiting_tasks += count_unacked(project, 'utasks_main') | ||
soon_commited_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB | ||
logs.info(f'Soon committed CPUs: {soon_commited_cpus}') | ||
available_cpus = sum( | ||
get_available_cpus_for_region(project, region) for region in regions) | ||
available_cpus = max(available_cpus - soon_commited_cpus, 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that we take the queue size into account, we can go back to running this very frequently, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm...I had the opposite thought, that because we can schedule so many more at once, there's no need to run it so often. I think there can be a slight delay between publishing and reaching the queue, so probably above 5 minutes makes most sense. |
||
|
||
# Don't schedule more than 7.5K tasks at once. So we don't overload | ||
# batch. | ||
available_cpus = min(available_cpus, 15_000 * len(regions)) | ||
return available_cpus | ||
|
||
|
||
def schedule_fuzz_tasks() -> bool: | ||
"""Schedules fuzz tasks.""" | ||
start = time.time() | ||
batch_config = local_config.BatchConfig() | ||
regions = set(get_batch_regions(batch_config)) | ||
project = batch_config.get('project') | ||
available_cpus = sum( | ||
get_available_cpus_for_region(project, region) for region in regions) | ||
available_cpus = min(available_cpus, 3500 * len(regions)) | ||
regions = set(get_batch_regions(batch_config)) | ||
available_cpus = get_available_cpus(project, regions) | ||
logs.error(f'{available_cpus} available CPUs.') | ||
if not available_cpus: | ||
return False | ||
|
||
fuzz_tasks = get_fuzz_tasks(available_cpus) | ||
if not fuzz_tasks: | ||
logs.error('No fuzz tasks found to schedule.') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option to simulate these behaviors is https://simpy.readthedocs.io/
https://brooker.co.za/blog/2022/04/11/simulation.html
It is hard to imagine what these policies imply, by just the description