Skip to content

Commit

Permalink
add pending and pending_at states for being used in scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
CamronStaley committed Sep 17, 2024
1 parent 2173d35 commit ff04f07
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 6 deletions.
4 changes: 2 additions & 2 deletions docs/source/cli/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -989,9 +989,9 @@ List delegated operations.
only list operations for this dataset
-s STATE, --state STATE
only list operations with this state. Supported
values are ('QUEUED', 'RUNNING', 'COMPLETED', 'FAILED')
values are ('QUEUED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED')
--sort-by SORT_BY how to sort the operations. Supported values are
('QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')
('QUEUED_AT', 'PENDING_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')
--reverse whether to sort in reverse order
-l LIMIT, --limit LIMIT
a maximum number of operations to show
Expand Down
6 changes: 3 additions & 3 deletions fiftyone/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3150,15 +3150,15 @@ def setup(parser):
default=None,
help=(
"only list operations with this state. Supported values are "
"('QUEUED', 'RUNNING', 'COMPLETED', 'FAILED')"
"('QUEUED', 'PENDING', RUNNING', 'COMPLETED', 'FAILED')"
),
)
parser.add_argument(
"--sort-by",
default="QUEUED_AT",
help=(
"how to sort the operations. Supported values are "
"('QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')"
"('QUEUED_AT', 'PENDING_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')"
),
)
parser.add_argument(
Expand Down Expand Up @@ -3390,7 +3390,7 @@ def setup(parser):
default=None,
help=(
"delete operations in this state. Supported values are "
"('QUEUED', 'COMPLETED', 'FAILED')"
"('QUEUED', 'PENDING', 'COMPLETED', 'FAILED')"
),
)
parser.add_argument(
Expand Down
1 change: 1 addition & 0 deletions fiftyone/factory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class SortByField(object):

UPDATED_AT = "updated_at"
QUEUED_AT = "queued_at"
PENDING_AT = "pending_at"
COMPLETED_AT = "completed_at"
STARTED_AT = "started_at"
FAILED_AT = "failed_at"
Expand Down
43 changes: 42 additions & 1 deletion fiftyone/factory/repos/delegated_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ def get_queued_operations(
"subclass must implement get_queued_operations()"
)

def get_pending_operations(
self, operator: str = None, dataset_name=None
) -> List[DelegatedOperationDocument]:
"""Get all pending operations."""
raise NotImplementedError(
"subclass must implement get_pending_operations()"
)

def get_running_operations(
self, operator: str = None, dataset_name=None
) -> List[DelegatedOperationDocument]:
"""Get all running operations."""
raise NotImplementedError(
"subclass must implement get_running_operations()"
)

def list_operations(
self,
operator: str = None,
Expand Down Expand Up @@ -267,7 +283,10 @@ def update_run_state(
"result": execution_result_json,
}
}
elif run_state == ExecutionRunState.RUNNING:
elif (
run_state == ExecutionRunState.RUNNING
or run_state == ExecutionRunState.PENDING
):
update = {
"$set": {
"run_state": run_state,
Expand Down Expand Up @@ -341,6 +360,28 @@ def get_queued_operations(
run_state=ExecutionRunState.QUEUED,
)

def get_pending_operations(
self,
operator: str = None,
dataset_name: ObjectId = None,
) -> List[DelegatedOperationDocument]:
return self.list_operations(
operator=operator,
dataset_name=dataset_name,
run_state=ExecutionRunState.PENDING,
)

def get_running_operations(
self,
operator: str = None,
dataset_name: ObjectId = None,
) -> List[DelegatedOperationDocument]:
return self.list_operations(
operator=operator,
dataset_name=dataset_name,
run_state=ExecutionRunState.RUNNING,
)

def list_operations(
self,
operator: str = None,
Expand Down
2 changes: 2 additions & 0 deletions fiftyone/factory/repos/delegated_operation_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
self.pinned = False
self.completed_at = None
self.failed_at = None
self.pending_at = None
self.result = None
self.id = None
self._doc = None
Expand All @@ -68,6 +69,7 @@ def from_pymongo(self, doc: dict):
doc["completed_at"] if "completed_at" in doc else None
)
self.failed_at = doc["failed_at"] if "failed_at" in doc else None
self.pending_at = doc["pending_at"] if "pending_at" in doc else None
self.pinned = doc["pinned"] if "pinned" in doc else None
self.dataset_id = doc["dataset_id"] if "dataset_id" in doc else None
self.run_link = doc["run_link"] if "run_link" in doc else None
Expand Down
39 changes: 39 additions & 0 deletions fiftyone/operators/delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ def set_running(self, doc_id, progress=None, run_link=None):
progress=progress,
)

def set_pending(self, doc_id):
"""Sets the given delegated operation to pending state.
Args:
doc_id: the ID of the delegated operation
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""
return self._repo.update_run_state(
_id=doc_id, run_state=ExecutionRunState.PENDING
)

def set_completed(
self,
doc_id,
Expand Down Expand Up @@ -235,6 +246,34 @@ def get_queued_operations(self, operator=None, dataset_name=None):
operator=operator, dataset_name=dataset_name
)

def get_pending_operations(self, operator=None, dataset_name=None):
"""Returns all pending delegated operations.
Args:
operator (None): the optional name of the operator to return all
the pending delegated operations for
dataset_name (None): the optional name of the dataset to return all
the pending delegated operations for
Returns:
a list of :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""
return self._repo.get_pending_operations(
operator=operator, dataset_name=dataset_name
)

def get_running_operations(self, operator=None, dataset_name=None):
"""Returns all running delegated operations.
Args:
operator (None): the optional name of the operator to return all
the running delegated operations for
dataset_name (None): the optional name of the dataset to return all
the running delegated operations for
Returns:
a list of :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""
return self._repo.get_running_operations(
operator=operator, dataset_name=dataset_name
)

def get(self, doc_id):
"""Returns the delegated operation with the given ID.
Expand Down
1 change: 1 addition & 0 deletions fiftyone/operators/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ExecutionRunState(object):
"""Enumeration of the available operator run states."""

QUEUED = "queued"
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
Expand Down

0 comments on commit ff04f07

Please sign in to comment.