From a0eb18d6c095d6ec8cbcd1106580eefcfa64f55d Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Thu, 19 Dec 2024 10:41:00 -0500 Subject: [PATCH 1/9] Increase scheduling --- src/clusterfuzz/_internal/cron/schedule_fuzz.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 882e13d76d8..e0113716318 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -191,7 +191,7 @@ def schedule_fuzz_tasks() -> bool: # TODO(metzman): Put the CPU-based scheduling in tworkers. available_cpus = get_available_cpus(project, 'us-east4') # TODO(metzman): Remove this as we move from experimental code to production. - available_cpus = min(available_cpus, 3500) + available_cpus = min(available_cpus, 4000) fuzz_tasks = get_fuzz_tasks(available_cpus) if not fuzz_tasks: logs.error('No fuzz tasks found to schedule.') From a2af5d4f5600904470e4198ec9ce5c2368ac907f Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Thu, 19 Dec 2024 11:14:32 -0500 Subject: [PATCH 2/9] tmp --- src/clusterfuzz/_internal/cron/schedule_fuzz.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index e0113716318..efb4a0c16a5 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -59,13 +59,15 @@ def get_available_cpus(project: str, region: str) -> int: assert preemptible_quota or cpu_quota if not preemptible_quota['limit']: - # Preemptible quota is not set. Obey the CPU quota since that limitss us. + # Preemptible quota is not set. Obey the CPU quota since that limits us. quota = cpu_quota else: quota = preemptible_quota assert quota['limit'], quota - return quota['limit'] - quota['usage'] + # TODO(metzman): Do this in a more configurable way. + limit = min(quota['limit'], 100_000) + return limit - quota['usage'] class BaseFuzzTaskScheduler: @@ -188,10 +190,7 @@ def schedule_fuzz_tasks() -> bool: batch_config = local_config.BatchConfig() project = batch_config.get('project') - # TODO(metzman): Put the CPU-based scheduling in tworkers. available_cpus = get_available_cpus(project, 'us-east4') - # TODO(metzman): Remove this as we move from experimental code to production. - available_cpus = min(available_cpus, 4000) fuzz_tasks = get_fuzz_tasks(available_cpus) if not fuzz_tasks: logs.error('No fuzz tasks found to schedule.') From 59d07b45ab9c0940c7b0afd623397fc53ff3f6ad Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Thu, 19 Dec 2024 11:17:48 -0500 Subject: [PATCH 3/9] add some limit --- src/clusterfuzz/_internal/cron/schedule_fuzz.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index efb4a0c16a5..e0b0edc636c 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -191,6 +191,7 @@ def schedule_fuzz_tasks() -> bool: batch_config = local_config.BatchConfig() project = batch_config.get('project') available_cpus = get_available_cpus(project, 'us-east4') + available_cpus = min(available_cpus, 20_000) fuzz_tasks = get_fuzz_tasks(available_cpus) if not fuzz_tasks: logs.error('No fuzz tasks found to schedule.') From 1cd269fe18f91ee6dde90c5cd46c13a72a7ec046 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Thu, 19 Dec 2024 12:56:39 -0500 Subject: [PATCH 4/9] Make schedule_fuzz smarter and more aggressive. Previously, it would just schedule about ~1500 tasks unless the regions were totally full. Now we will schedule up to 15K tasks. Also, we will take into account batch's queueing (there could be other reasons for this besides CPU quota, though there shouldn't) and tasks that were already scheduler but not sent to batch or preprocessed so we don't overload the queue. --- .../_internal/cron/schedule_fuzz.py | 50 ++++++++++++++++--- .../_internal/google_cloud_utils/batch.py | 13 +++-- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 7e76a88bbdd..9b8320e78f8 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -17,6 +17,7 @@ import random import time from typing import Dict +from typing import List from googleapiclient import discovery @@ -25,9 +26,13 @@ from clusterfuzz._internal.config import local_config from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.datastore import ndb_utils +from clusterfuzz._internal.google_cloud_utils import batch from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.metrics import logs +# TODO(metzman): Actually implement this. +CPUS_PER_FUZZ_JOB = 2 + def _get_quotas(project, region): gcp_credentials = credentials.get_default()[0] @@ -36,6 +41,16 @@ def _get_quotas(project, region): region=region, project=project).execute()['quotas'] +def count_unacked(project: str, topic: str): + creds, _ = credentials.get_default() + subscription_path = f'projects/{project}/subscriptions/{topic}' + client = discovery.build('pubsub', 'v1', credentials=creds) + sub = client.projects().subscriptions().get( # pylint: disable=no-member + subscription=subscription_path).execute() + # TODO(metzman): Not all of these are fuzz_tasks. Deal with that. + return float(sub.get('numUnackedMessages', 0)) + + def get_available_cpus_for_region(project: str, region: str) -> int: """Returns the number of available CPUs in the current GCE region.""" quotas = _get_quotas(project, region) @@ -81,8 +96,7 @@ def get_fuzz_tasks(self): def _get_cpus_per_fuzz_job(self, job_name): del job_name - # TODO(metzman): Actually implement this. - return 2 + return CPUS_PER_FUZZ_JOB class FuzzTaskCandidate: @@ -188,16 +202,38 @@ def get_batch_regions(batch_config): return list(set(config['gce_region'] for config in mapping.values())) +def get_available_cpus(project: str, regions: List[str]) -> int: + """Returns the available CPUs for fuzz tasks.""" + # TODO(metzman): This doesn't distinguish between fuzz and non-fuzz + # tasks (nor preemptible and non-preemptible CPUs). Fix this. + waiting_tasks = sum( + batch.count_queued_or_scheduled_tasks(project, region) + for region in regions) + waiting_tasks += count_unacked(project, 'preprocess') + waiting_tasks += count_unacked(project, 'utasks_main') + soon_commited_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB + logs.info(f'Soon committed CPUs: {soon_commited_cpus}') + available_cpus = sum( + get_available_cpus_for_region(project, region) for region in regions) + available_cpus = max(available_cpus - soon_commited_cpus, 0) + + # Don't schedule more than 7.5K tasks at once. So we don't overload + # batch. + available_cpus = min(available_cpus, 15_000 * len(regions)) + return available_cpus + + def schedule_fuzz_tasks() -> bool: """Schedules fuzz tasks.""" start = time.time() batch_config = local_config.BatchConfig() - regions = set(get_batch_regions(batch_config)) project = batch_config.get('project') - available_cpus = sum( - get_available_cpus_for_region(project, region) for region in regions) - # Don't schedule more than 5K tasks at once. - available_cpus = min(available_cpus, 10_000 * len(regions)) + regions = set(get_batch_regions(batch_config)) + available_cpus = get_available_cpus(project, regions) + logs.error(f'{available_cpus} available CPUs.') + if not available_cpus: + return False + fuzz_tasks = get_fuzz_tasks(available_cpus) if not fuzz_tasks: logs.error('No fuzz tasks found to schedule.') diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 607d5677273..ac4d3bf7e3c 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -66,10 +66,7 @@ def _create_batch_client_new(): """Creates a batch client.""" - creds, project = credentials.get_default() - if not project: - project = utils.get_application_id() - + creds, _ = credentials.get_default() return batch.BatchServiceClient(credentials=creds) @@ -301,6 +298,14 @@ def _get_subconfig(batch_config, instance_spec): return all_subconfigs[weighted_subconfig.name] +def count_queued_or_scheduled_tasks(project, region): + region = f'projects/{project}/locations/{region}' + jobs_filter = 'status.state=SCHEDULED OR status.state=QUEUED' + req = batch.types.ListJobsRequest(parent=region, filter=jobs_filter) + return sum(job.task_groups[0].task_count + for job in _batch_client().list_jobs(request=req)) + + def _get_specs_from_config(batch_tasks) -> Dict: """Gets the configured specifications for a batch workload.""" if not batch_tasks: From e2bd7b41eb0ed6fe322dae98f61856429f58654f Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Thu, 19 Dec 2024 17:23:22 -0500 Subject: [PATCH 5/9] Correct things --- src/clusterfuzz/_internal/base/concurrency.py | 7 ++ .../_internal/cron/schedule_fuzz.py | 99 ++++++++++++++----- .../_internal/google_cloud_utils/batch.py | 24 +++-- 3 files changed, 97 insertions(+), 33 deletions(-) diff --git a/src/clusterfuzz/_internal/base/concurrency.py b/src/clusterfuzz/_internal/base/concurrency.py index 36fcb39e472..5a6b929973d 100644 --- a/src/clusterfuzz/_internal/base/concurrency.py +++ b/src/clusterfuzz/_internal/base/concurrency.py @@ -14,7 +14,9 @@ """Tools for concurrency/parallelism.""" from concurrent import futures import contextlib +import itertools import multiprocessing +import multiprocessing.pool from clusterfuzz._internal.system import environment @@ -34,6 +36,11 @@ def map(self, f, l): def imap_unordered(self, f, l): return list(map(f, l)) + def starmap_async(self, f, l): + async_result = multiprocessing.pool.AsyncResult() + async_result._set(list(starmap(f, l))) + return async_result + @contextlib.contextmanager def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None): diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 9b8320e78f8..a0bbfe6eb57 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -14,13 +14,16 @@ """Cron job to schedule fuzz tasks that run on batch.""" import collections +import multiprocessing import random import time from typing import Dict from typing import List +from google.cloud import monitoring_v3 from googleapiclient import discovery +from clusterfuzz._internal.base import concurrency from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils from clusterfuzz._internal.config import local_config @@ -34,26 +37,43 @@ CPUS_PER_FUZZ_JOB = 2 -def _get_quotas(project, region): - gcp_credentials = credentials.get_default()[0] - compute = discovery.build('compute', 'v1', credentials=gcp_credentials) +def _get_quotas(creds, project, region): + compute = discovery.build('compute', 'v1', credentials=creds) return compute.regions().get( # pylint: disable=no-member region=region, project=project).execute()['quotas'] -def count_unacked(project: str, topic: str): - creds, _ = credentials.get_default() - subscription_path = f'projects/{project}/subscriptions/{topic}' - client = discovery.build('pubsub', 'v1', credentials=creds) - sub = client.projects().subscriptions().get( # pylint: disable=no-member - subscription=subscription_path).execute() +def count_unacked(creds, project_id, subscription_id): + """Counts the unacked messages in |subscription_id|.""" # TODO(metzman): Not all of these are fuzz_tasks. Deal with that. - return float(sub.get('numUnackedMessages', 0)) + metric = 'pubsub.googleapis.com/subscription/num_undelivered_messages' + query_filter = (f'metric.type="{metric}" AND ' + f'resource.labels.subscription_id="{subscription_id}"') + time_now = time.time() + # Get the last 5 minutes. + time_interval = monitoring_v3.TimeInterval( + end_time={'seconds': int(time_now)}, + start_time={'seconds': int(time_now - 5 * 60)}, + ) + client = monitoring_v3.MetricServiceClient(credentials=creds) + results = client.list_time_series( + request={ + 'filter': query_filter, + 'interval': time_interval, + 'name': f'projects/{project_id}', + # "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + }) + # Get the latest point. + for result in results: + if len(result.points) == 0: + continue + return int(result.points[0].value.int64_value) -def get_available_cpus_for_region(project: str, region: str) -> int: +def get_available_cpus_for_region(creds, project: str, region: str) -> int: """Returns the number of available CPUs in the current GCE region.""" - quotas = _get_quotas(project, region) + + quotas = _get_quotas(creds, project, region) # Sometimes, the preemptible quota is 0, which means the number of preemptible # CPUs is actually limited by the CPU quota. @@ -81,8 +101,11 @@ def get_available_cpus_for_region(project: str, region: str) -> int: assert quota['limit'], quota # TODO(metzman): Do this in a more configurable way. - limit = min(quota['limit'], 100_000) - return limit - quota['usage'] + # We need this because us-central1 and us-east4 have different numbers of + # cores alloted to us in their quota. Treat them the same to simplify things. + limit = quota['limit'] + limit -= quota['usage'] + return min(limit, 100_000) class BaseFuzzTaskScheduler: @@ -198,37 +221,63 @@ def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]: def get_batch_regions(batch_config): - mapping = batch_config.get('mapping') - return list(set(config['gce_region'] for config in mapping.values())) + fuzz_subconf_names = { + subconf['name'] for subconf in batch_config.get( + 'mapping.LINUX-PREEMPTIBLE-UNPRIVILEGED.subconfigs') + } + + subconfs = batch_config.get('subconfigs') + return list( + set(subconfs[subconf]['region'] + for subconf in subconfs + if subconf in fuzz_subconf_names)) def get_available_cpus(project: str, regions: List[str]) -> int: """Returns the available CPUs for fuzz tasks.""" # TODO(metzman): This doesn't distinguish between fuzz and non-fuzz # tasks (nor preemptible and non-preemptible CPUs). Fix this. - waiting_tasks = sum( - batch.count_queued_or_scheduled_tasks(project, region) - for region in regions) - waiting_tasks += count_unacked(project, 'preprocess') - waiting_tasks += count_unacked(project, 'utasks_main') + # Get total scheduled and queued. + creds = credentials.get_default()[0] + count_args = ((project, region) for region in regions) + with concurrency.make_pool() as pool: + # These calls are extremely slow. + result = pool.starmap_async(batch.count_queued_or_scheduled_tasks, + count_args) + waiting_tasks = count_unacked(creds, project, 'preprocess') + waiting_tasks += count_unacked(creds, project, 'utask_main') + region_counts = zip(*result.get()) # Group all queued and all scheduled. + + # Add up all queued and scheduled. + region_counts = [sum(tup) for tup in region_counts] + logs.info(f'Region counts: {region_counts}') + if region_counts[0] > 1000: + # Check queued tasks. + logs.info('Too many jobs queued, not scheduling more fuzzing.') + return 0 + waiting_tasks += sum(region_counts) # Add up queued and scheduled. soon_commited_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB logs.info(f'Soon committed CPUs: {soon_commited_cpus}') available_cpus = sum( - get_available_cpus_for_region(project, region) for region in regions) + get_available_cpus_for_region(creds, project, region) + for region in regions) + logs.info('Actually free CPUs (before subtracting soon ' + f'occupied): {available_cpus}') available_cpus = max(available_cpus - soon_commited_cpus, 0) - # Don't schedule more than 7.5K tasks at once. So we don't overload + # Don't schedule more than 5K tasks at once. So we don't overload # batch. - available_cpus = min(available_cpus, 15_000 * len(regions)) + available_cpus = min(available_cpus, 10_000 * len(regions)) return available_cpus def schedule_fuzz_tasks() -> bool: """Schedules fuzz tasks.""" + multiprocessing.set_start_method('spawn') start = time.time() batch_config = local_config.BatchConfig() project = batch_config.get('project') - regions = set(get_batch_regions(batch_config)) + regions = get_batch_regions(batch_config) available_cpus = get_available_cpus(project, regions) logs.error(f'{available_cpus} available CPUs.') if not available_cpus: diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index ac4d3bf7e3c..74300ee84d9 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -298,14 +298,6 @@ def _get_subconfig(batch_config, instance_spec): return all_subconfigs[weighted_subconfig.name] -def count_queued_or_scheduled_tasks(project, region): - region = f'projects/{project}/locations/{region}' - jobs_filter = 'status.state=SCHEDULED OR status.state=QUEUED' - req = batch.types.ListJobsRequest(parent=region, filter=jobs_filter) - return sum(job.task_groups[0].task_count - for job in _batch_client().list_jobs(request=req)) - - def _get_specs_from_config(batch_tasks) -> Dict: """Gets the configured specifications for a batch workload.""" if not batch_tasks: @@ -360,3 +352,19 @@ def _get_specs_from_config(batch_tasks) -> Dict: ) specs[(task.command, task.job_type)] = spec return specs + + +def count_queued_or_scheduled_tasks(project: str, + region: str) -> Tuple[int, int]: + """Counts the number of queued and scheduled tasks.""" + region = f'projects/{project}/locations/{region}' + jobs_filter = 'status.state="SCHEDULED" OR status.state="QUEUED"' + req = batch.types.ListJobsRequest(parent=region, filter=jobs_filter) + queued = 0 + scheduled = 0 + for job in _batch_client().list_jobs(request=req): + if job.status.state == 'SCHEDULED': + scheduled += job.task_groups[0].task_count + elif job.status.state == 'QUEUED': + queued += job.task_groups[0].task_count + return (queued, scheduled) From ca582d7ac20d5e1a7762a657c2b0ecdfbe3c80ff Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Thu, 19 Dec 2024 19:07:56 -0500 Subject: [PATCH 6/9] fix --- src/clusterfuzz/_internal/cron/schedule_fuzz.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index a0bbfe6eb57..caf538e43af 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -61,13 +61,15 @@ def count_unacked(creds, project_id, subscription_id): 'filter': query_filter, 'interval': time_interval, 'name': f'projects/{project_id}', - # "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + 'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, }) # Get the latest point. for result in results: if len(result.points) == 0: continue - return int(result.points[0].value.int64_value) + result = int(result.points[0].value.int64_value) + logs.info(f'Unacked in {subscription_id}: {result}') + return result def get_available_cpus_for_region(creds, project: str, region: str) -> int: @@ -265,9 +267,9 @@ def get_available_cpus(project: str, regions: List[str]) -> int: f'occupied): {available_cpus}') available_cpus = max(available_cpus - soon_commited_cpus, 0) - # Don't schedule more than 5K tasks at once. So we don't overload - # batch. - available_cpus = min(available_cpus, 10_000 * len(regions)) + # Don't schedule more than 10K tasks at once. So we don't overload batch. + print('len_regions', len(regions)) + available_cpus = min(available_cpus, 20_000 * len(regions)) return available_cpus From 7b1c9b645e55bcdfd79fa919029d1417769903a2 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Thu, 19 Dec 2024 19:36:24 -0500 Subject: [PATCH 7/9] Fix --- src/clusterfuzz/_internal/base/concurrency.py | 6 --- .../_internal/base/tasks/__init__.py | 4 +- .../_internal/cron/schedule_fuzz.py | 50 +++++++++---------- 3 files changed, 28 insertions(+), 32 deletions(-) diff --git a/src/clusterfuzz/_internal/base/concurrency.py b/src/clusterfuzz/_internal/base/concurrency.py index 5a6b929973d..967296629f8 100644 --- a/src/clusterfuzz/_internal/base/concurrency.py +++ b/src/clusterfuzz/_internal/base/concurrency.py @@ -14,7 +14,6 @@ """Tools for concurrency/parallelism.""" from concurrent import futures import contextlib -import itertools import multiprocessing import multiprocessing.pool @@ -36,11 +35,6 @@ def map(self, f, l): def imap_unordered(self, f, l): return list(map(f, l)) - def starmap_async(self, f, l): - async_result = multiprocessing.pool.AsyncResult() - async_result._set(list(starmap(f, l))) - return async_result - @contextlib.contextmanager def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None): diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 60738d27471..4105e3b3477 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -48,7 +48,9 @@ MAX_LEASED_TASKS_LIMIT = 1000 MAX_TASKS_LIMIT = 100000 -MAX_PUBSUB_MESSAGES_PER_REQ = 1000 +# The stated limit is 1000, but in reality meassages do not get delivered +# around this limit. We should probably switch to the real client library. +MAX_PUBSUB_MESSAGES_PER_REQ = 250 # Various variables for task leasing and completion times (in seconds). TASK_COMPLETION_BUFFER = 90 * 60 diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index caf538e43af..4b39412cb8d 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -243,9 +243,9 @@ def get_available_cpus(project: str, regions: List[str]) -> int: creds = credentials.get_default()[0] count_args = ((project, region) for region in regions) with concurrency.make_pool() as pool: - # These calls are extremely slow. - result = pool.starmap_async(batch.count_queued_or_scheduled_tasks, - count_args) + # These calls are extremely slow (about 1 minute total). + result = pool.starmap_async( # pylint: disable=no-member + batch.count_queued_or_scheduled_tasks, count_args) waiting_tasks = count_unacked(creds, project, 'preprocess') waiting_tasks += count_unacked(creds, project, 'utask_main') region_counts = zip(*result.get()) # Group all queued and all scheduled. @@ -258,14 +258,14 @@ def get_available_cpus(project: str, regions: List[str]) -> int: logs.info('Too many jobs queued, not scheduling more fuzzing.') return 0 waiting_tasks += sum(region_counts) # Add up queued and scheduled. - soon_commited_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB - logs.info(f'Soon committed CPUs: {soon_commited_cpus}') + soon_occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB + logs.info(f'Soon occupied CPUs: {soon_occupied_cpus}') available_cpus = sum( get_available_cpus_for_region(creds, project, region) for region in regions) logs.info('Actually free CPUs (before subtracting soon ' f'occupied): {available_cpus}') - available_cpus = max(available_cpus - soon_commited_cpus, 0) + available_cpus = max(available_cpus - soon_occupied_cpus, 0) # Don't schedule more than 10K tasks at once. So we don't overload batch. print('len_regions', len(regions)) @@ -276,28 +276,28 @@ def get_available_cpus(project: str, regions: List[str]) -> int: def schedule_fuzz_tasks() -> bool: """Schedules fuzz tasks.""" multiprocessing.set_start_method('spawn') - start = time.time() batch_config = local_config.BatchConfig() project = batch_config.get('project') regions = get_batch_regions(batch_config) - available_cpus = get_available_cpus(project, regions) - logs.error(f'{available_cpus} available CPUs.') - if not available_cpus: - return False - - fuzz_tasks = get_fuzz_tasks(available_cpus) - if not fuzz_tasks: - logs.error('No fuzz tasks found to schedule.') - return False - - logs.info(f'Adding {fuzz_tasks} to preprocess queue.') - tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) - logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') - - end = time.time() - total = end - start - logs.info(f'Task scheduling took {total} seconds.') - return True + while True: + start = time.time() + available_cpus = get_available_cpus(project, regions) + logs.error(f'{available_cpus} available CPUs.') + if not available_cpus: + continue + + fuzz_tasks = get_fuzz_tasks(available_cpus) + if not fuzz_tasks: + logs.error('No fuzz tasks found to schedule.') + continue + + logs.info(f'Adding {fuzz_tasks} to preprocess queue.') + tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) + logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') + + end = time.time() + total = end - start + logs.info(f'Task scheduling took {total} seconds.') def main(): From 05c77bdf1a13a35746da3d97709c40f103f28503 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Thu, 19 Dec 2024 20:56:12 -0500 Subject: [PATCH 8/9] fix --- .../tests/appengine/handlers/cron/schedule_fuzz_test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index 004768456d3..7cd22829c55 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -17,6 +17,7 @@ from clusterfuzz._internal.cron import schedule_fuzz from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.tests.test_libs import helpers as test_helpers from clusterfuzz._internal.tests.test_libs import test_utils @@ -84,6 +85,7 @@ class TestGetAvailableCpusForRegion(unittest.TestCase): def setUp(self): test_helpers.patch(self, ['clusterfuzz._internal.cron.schedule_fuzz._get_quotas']) + self.creds = credentials.get_default() def test_usage(self): """Tests that get_available_cpus_for_region handles usage properly.""" @@ -93,7 +95,8 @@ def test_usage(self): 'usage': 2 }] self.assertEqual( - schedule_fuzz.get_available_cpus_for_region('project', 'region'), 3) + schedule_fuzz.get_available_cpus_for_region(self.creds, 'project', + 'region'), 3) def test_cpus_and_preemptible_cpus(self): """Tests that get_available_cpus_for_region handles usage properly.""" @@ -107,4 +110,5 @@ def test_cpus_and_preemptible_cpus(self): 'usage': 5 }] self.assertEqual( - schedule_fuzz.get_available_cpus_for_region('region', 'project'), 5) + schedule_fuzz.get_available_cpus_for_region(self.creds, 'region', + 'project'), 5) From 003dda9d6e458652c0b8172bf55666c6485edb49 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Thu, 19 Dec 2024 21:11:00 -0500 Subject: [PATCH 9/9] more --- src/clusterfuzz/_internal/base/concurrency.py | 1 - src/python/bot/startup/run_bot.py | 3 --- 2 files changed, 4 deletions(-) diff --git a/src/clusterfuzz/_internal/base/concurrency.py b/src/clusterfuzz/_internal/base/concurrency.py index 967296629f8..36fcb39e472 100644 --- a/src/clusterfuzz/_internal/base/concurrency.py +++ b/src/clusterfuzz/_internal/base/concurrency.py @@ -15,7 +15,6 @@ from concurrent import futures import contextlib import multiprocessing -import multiprocessing.pool from clusterfuzz._internal.system import environment diff --git a/src/python/bot/startup/run_bot.py b/src/python/bot/startup/run_bot.py index 8c674fba2cb..c9bb37fa4a6 100644 --- a/src/python/bot/startup/run_bot.py +++ b/src/python/bot/startup/run_bot.py @@ -194,9 +194,6 @@ def main(): sys.exit(-1) fuzzers_init.run() - - logs.info(f'PID is {os.getpid()}') - if environment.is_trusted_host(ensure_connected=False): from clusterfuzz._internal.bot.untrusted_runner import host host.init()