Skip to content

Commit

Permalink
Added RuntimeJob.queue_info() method (#1210)
Browse files Browse the repository at this point in the history
* Added support for RuntimeJob.queueinfo

* Moved methods for consistency with main

* Changed backend_status to job.status

* lint

* update test

---------

Co-authored-by: Kevin Tian <kevin.tian@ibm.com>
  • Loading branch information
merav-aharoni and kt474 authored Dec 11, 2023
1 parent 4d06ac3 commit dfc0fe8
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 18 deletions.
63 changes: 63 additions & 0 deletions qiskit_ibm_runtime/runtime_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from qiskit_ibm_runtime import qiskit_runtime_service

from .utils.utils import validate_job_tags
from .utils.queueinfo import QueueInfo
from .constants import API_TO_JOB_ERROR_MESSAGE, API_TO_JOB_STATUS, DEFAULT_DECODERS
from .exceptions import (
IBMApiError,
Expand Down Expand Up @@ -136,6 +137,7 @@ def __init__(
self._session_id = session_id
self._tags = tags
self._usage_estimation: Dict[str, Any] = {}
self._queue_info: QueueInfo = None

decoder = result_decoder or DEFAULT_DECODERS.get(program_id, None) or ResultDecoder
if isinstance(decoder, Sequence):
Expand Down Expand Up @@ -686,3 +688,64 @@ def usage_estimation(self) -> Dict[str, Any]:
}

return self._usage_estimation

def queue_position(self, refresh: bool = False) -> Optional[int]:
"""Return the position of the job in the server queue.
Note:
The position returned is within the scope of the provider
and may differ from the global queue position.
Args:
refresh: If ``True``, re-query the server to get the latest value.
Otherwise return the cached value.
Returns:
Position in the queue or ``None`` if position is unknown or not applicable.
"""
if refresh:
api_metadata = self._api_client.job_metadata(self.job_id())
self._queue_info = QueueInfo(
position_in_queue=api_metadata.get("position_in_queue"),
status=self.status(),
estimated_start_time=api_metadata.get("estimated_start_time"),
estimated_completion_time=api_metadata.get("estimated_completion_time"),
)

if self._queue_info:
return self._queue_info.position
return None

def queue_info(self) -> Optional[QueueInfo]:
"""Return queue information for this job.
The queue information may include queue position, estimated start and
end time, and dynamic priorities for the hub, group, and project. See
:class:`QueueInfo` for more information.
Note:
The queue information is calculated after the job enters the queue.
Therefore, some or all of the information may not be immediately
available, and this method may return ``None``.
Returns:
A :class:`QueueInfo` instance that contains queue information for
this job, or ``None`` if queue information is unknown or not
applicable.
"""
# Get latest queue information.
api_metadata = self._api_client.job_metadata(self.job_id())
self._queue_info = QueueInfo(
position_in_queue=api_metadata.get("position_in_queue"),
status=self.status(),
estimated_start_time=api_metadata.get("estimated_start_time"),
estimated_completion_time=api_metadata.get("estimated_completion_time"),
)
# Return queue information only if it has any useful information.
if self._queue_info and any(
value is not None
for attr, value in self._queue_info.__dict__.items()
if not attr.startswith("_") and attr != "job_id"
):
return self._queue_info
return None
166 changes: 166 additions & 0 deletions qiskit_ibm_runtime/utils/queueinfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2021.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Queue information for a job."""

import warnings
from datetime import datetime
from typing import Any, Optional, Union, Dict
import dateutil.parser

from ..utils import utc_to_local, duration_difference


class QueueInfo:
"""Queue information for a job."""

_data = {} # type: Dict

def __init__(
self,
position_in_queue: Optional[int] = None,
status: Optional[str] = None,
estimated_start_time: Optional[Union[str, datetime]] = None,
estimated_completion_time: Optional[Union[str, datetime]] = None,
hub_priority: Optional[float] = None,
group_priority: Optional[float] = None,
project_priority: Optional[float] = None,
job_id: Optional[str] = None,
**kwargs: Any
) -> None:
"""QueueInfo constructor.
Args:
position: Position in the queue.
status: Job status.
estimated_start_time: Estimated start time for the job, in UTC.
estimated_complete_time: Estimated complete time for the job, in UTC.
hub_priority: Dynamic priority for the hub.
group_priority: Dynamic priority for the group.
project_priority: Dynamic priority for the project.
job_id: Job ID.
kwargs: Additional attributes.
"""
self.position = int(position_in_queue) if position_in_queue else None
self._status = status
if isinstance(estimated_start_time, str):
estimated_start_time = dateutil.parser.isoparse(estimated_start_time)
if isinstance(estimated_completion_time, str):
estimated_completion_time = dateutil.parser.isoparse(estimated_completion_time)
self._estimated_start_time_utc = estimated_start_time
self._estimated_complete_time_utc = estimated_completion_time
self.hub_priority = hub_priority
self.group_priority = group_priority
self.project_priority = project_priority
self.job_id = job_id

self._data = kwargs

def __repr__(self) -> str:
"""Return the string representation of ``QueueInfo``.
Note:
The estimated start and end time are displayed in local time
for convenience.
Returns:
A string representation of ``QueueInfo``.
Raises:
TypeError: If the `estimated_start_time` or `estimated_end_time`
value is not valid.
"""
status = self._get_value(self._status)

with warnings.catch_warnings():
warnings.simplefilter("ignore")
est_start_time = (
self.estimated_start_time.isoformat() if self.estimated_start_time else None
)
est_complete_time = (
self.estimated_complete_time.isoformat() if self.estimated_complete_time else None
)

queue_info = [
"job_id='{}'".format(self.job_id),
"_status='{}'".format(status),
"estimated_start_time='{}'".format(est_start_time),
"estimated_complete_time='{}'".format(est_complete_time),
"position={}".format(self.position),
"hub_priority={}".format(self.hub_priority),
"group_priority={}".format(self.group_priority),
"project_priority={}".format(self.project_priority),
]

return "<{}({})>".format(self.__class__.__name__, ", ".join(queue_info))

def __getattr__(self, name: str) -> Any:
try:
return self._data[name]
except KeyError:
raise AttributeError("Attribute {} is not defined.".format(name)) from None

def format(self) -> str:
"""Build a user-friendly report for the job queue information.
Returns:
The job queue information report.
"""
status = self._status

with warnings.catch_warnings():
warnings.simplefilter("ignore")
est_start_time = (
duration_difference(self.estimated_start_time)
if self.estimated_start_time
else self._get_value(self.estimated_start_time)
)
est_complete_time = (
duration_difference(self.estimated_complete_time)
if self.estimated_complete_time
else self._get_value(self.estimated_complete_time)
)

queue_info = [
"Job {} queue information:".format(self._get_value(self.job_id)),
" queue position: {}".format(self._get_value(self.position)),
" status: {}".format(status),
" estimated start time: {}".format(est_start_time),
" estimated completion time: {}".format(est_complete_time),
" hub priority: {}".format(self._get_value(self.hub_priority)),
" group priority: {}".format(self._get_value(self.group_priority)),
" project priority: {}".format(self._get_value(self.project_priority)),
]

return "\n".join(queue_info)

def _get_value(self, value: Optional[Any], default_value: str = "unknown") -> Optional[Any]:
"""Return the input value if it exists or the default.
Returns:
The input value if it is not ``None``, else the input default value.
"""
return value or default_value

@property
def estimated_start_time(self) -> Optional[datetime]:
"""Return estimated start time in local time."""
if self._estimated_start_time_utc is None:
return None
return utc_to_local(self._estimated_start_time_utc)

@property
def estimated_complete_time(self) -> Optional[datetime]:
"""Return estimated complete time in local time."""
if self._estimated_complete_time_utc is None:
return None
return utc_to_local(self._estimated_complete_time_utc)
5 changes: 5 additions & 0 deletions releasenotes/notes/queueinfo-5e1bb815228425bb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
Added a method ``RuntimeJob.queue_info()`` to get the queue information
from the backend. This feature was transferred from ``qiskit_ibm_provider``.
28 changes: 10 additions & 18 deletions test/integration/test_ibm_job_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import uuid
import time
from datetime import datetime, timedelta
from unittest import skip
from unittest import skip, SkipTest

from dateutil import tz
from qiskit.compiler import transpile
Expand All @@ -32,7 +32,7 @@
integration_test_setup,
)
from ..ibm_test_case import IBMTestCase
from ..utils import most_busy_backend
from ..utils import most_busy_backend, cancel_job_safe


class TestIBMJobAttributes(IBMTestCase):
Expand Down Expand Up @@ -200,13 +200,13 @@ def test_time_per_step(self):
step, time_data, start_datetime, end_datetime
),
)

rjob = self.service.job(job.job_id())
self.assertTrue(rjob.time_per_step())

@skip("queue_info supported in provider but not here")
def test_queue_info(self):
"""Test retrieving queue information."""
if self.dependencies.channel == "ibm_cloud":
raise SkipTest("Not supported on cloud channel.")
# Find the most busy backend.
backend = most_busy_backend(self.service)
leave_states = list(JOB_FINAL_STATES) + [JobStatus.RUNNING]
Expand All @@ -230,19 +230,11 @@ def test_queue_info(self):
)
msg = "Job {} is queued but has no ".format(job.job_id())
self.assertIsNotNone(queue_info, msg + "queue info.")
for attr, value in queue_info.__dict__.items():
self.assertIsNotNone(value, msg + attr)
self.assertTrue(
all(
0 < priority <= 1.0
for priority in [
queue_info.hub_priority,
queue_info.group_priority,
queue_info.project_priority,
]
),
"Unexpected queue info {} for job {}".format(queue_info, job.job_id()),
)

self.assertTrue(queue_info.format())
self.assertTrue(repr(queue_info))
elif job._status is not None:
self.assertIsNone(job.queue_position())
self.log.warning("Unable to retrieve queue information")

# Cancel job so it doesn't consume more resources.
cancel_job_safe(job, self.log)

0 comments on commit dfc0fe8

Please sign in to comment.