Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simulated Engine Implementation #4638

Merged
merged 21 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cirq-google/cirq_google/engine/local_simulation_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2021 The Cirq Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum


class LocalSimulationType(enum.Enum):
SYNCHRONOUS = 1
ASYNCHRONOUS = 2
ASYNCHRONOUS_WITH_DELAY = 3
45 changes: 45 additions & 0 deletions cirq-google/cirq_google/engine/simulated_local_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2021 The Cirq Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes for running against Google's Quantum Cloud Service.

As an example, to run a circuit against the xmon simulator on the cloud,
engine = cirq_google.Engine(project_id='my-project-id')
program = engine.create_program(circuit)
result0 = program.run(params=params0, repetitions=10)
result1 = program.run(params=params1, repetitions=10)

In order to run on must have access to the Quantum Engine API. Access to this
API is (as of June 22, 2018) restricted to invitation only.
"""
from typing import List

from cirq_google.engine.abstract_local_engine import AbstractLocalEngine
from cirq_google.engine.abstract_local_processor import AbstractLocalProcessor


class SimulatedLocalEngine(AbstractLocalEngine):
"""Collection of processors backed by local samplers.

This class is a wrapper around `AbstractLocalEngine` and
adds no additional functionality and exists for naming consistency
and for possible future extension.

This class assumes that all processors are local. Processors
are given during initialization. Program and job querying
functionality is done by serially querying all child processors.

"""

def __init__(self, processors: List[AbstractLocalProcessor]):
super().__init__(processors)
183 changes: 183 additions & 0 deletions cirq-google/cirq_google/engine/simulated_local_engine_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Copyright 2021 The Cirq Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from typing import Dict, Optional, Union
import pytest

import cirq
import cirq_google
import sympy
import numpy as np

from cirq_google.api import v2
from cirq_google.engine.abstract_local_job_test import NothingJob
from cirq_google.engine.abstract_local_program_test import NothingProgram
from cirq_google.engine.abstract_local_processor import AbstractLocalProcessor
from cirq_google.engine.abstract_program import AbstractProgram
from cirq_google.engine.simulated_local_engine import SimulatedLocalEngine
from cirq_google.engine.simulated_local_processor import SimulatedLocalProcessor

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised there's so much in this test. Can we limit if to behavior above and beyond what's covered in the parent class tests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am loathe to remove these tests. Even if they are redundant, it is nice to test that all the cross-references work with the final child class.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.


class ProgramDictProcessor(AbstractLocalProcessor):
"""A processor that has a dictionary of programs for testing."""

def __init__(self, programs: Dict[str, AbstractProgram], **kwargs):
super().__init__(**kwargs)
self._programs = programs

def get_calibration(self, *args, **kwargs):
pass

def get_latest_calibration(self, *args, **kwargs):
pass

def get_current_calibration(self, *args, **kwargs):
pass

def get_device(self, *args, **kwargs):
pass

def get_device_specification(self, *args, **kwargs):
pass

def health(self, *args, **kwargs):
pass

def list_calibrations(self, *args, **kwargs):
pass

def run(self, *args, **kwargs):
pass

def run_batch(self, *args, **kwargs):
pass

def run_calibration(self, *args, **kwargs):
pass

def run_sweep(self, *args, **kwargs):
pass

def get_sampler(self, *args, **kwargs):
pass

def supported_languages(self, *args, **kwargs):
pass

def list_programs(
self,
created_before: Optional[Union[datetime.datetime, datetime.date]] = None,
created_after: Optional[Union[datetime.datetime, datetime.date]] = None,
has_labels: Optional[Dict[str, str]] = None,
):
"""Lists all programs regardless of filters.

This isn't really correct, but we don't want to test test functionality."""
return self._programs.values()

def get_program(self, program_id: str) -> AbstractProgram:
return self._programs[program_id]


def test_get_processor():
processor1 = ProgramDictProcessor(programs=[], processor_id='test')
engine = SimulatedLocalEngine([processor1])
assert engine.get_processor('test') == processor1
assert engine.get_processor('test').engine() == engine
with pytest.raises(KeyError):
engine.get_processor('abracadabra')
with pytest.raises(ValueError, match='Invalid processor'):
engine.get_sampler(processor_id=['a', 'b', 'c'])


def test_list_processor():
processor1 = ProgramDictProcessor(programs=[], processor_id='proc')
processor2 = ProgramDictProcessor(programs=[], processor_id='crop')
engine = SimulatedLocalEngine([processor1, processor2])
assert engine.get_processor('proc') == processor1
assert engine.get_processor('crop') == processor2
assert engine.get_processor('proc').engine() == engine
assert engine.get_processor('crop').engine() == engine
assert set(engine.list_processors()) == {processor1, processor2}


def test_get_programs():
program1 = NothingProgram([cirq.Circuit()], None)
job1 = NothingJob(
job_id='test', processor_id='test1', parent_program=program1, repetitions=100, sweeps=[]
)
program1.add_job('jerb', job1)
job1.add_labels({'color': 'blue'})

program2 = NothingProgram([cirq.Circuit()], None)
job2 = NothingJob(
job_id='test', processor_id='test2', parent_program=program2, repetitions=100, sweeps=[]
)
program2.add_job('jerb2', job2)
job2.add_labels({'color': 'red'})

processor1 = ProgramDictProcessor(programs={'prog1': program1}, processor_id='proc')
processor2 = ProgramDictProcessor(programs={'prog2': program2}, processor_id='crop')
engine = SimulatedLocalEngine([processor1, processor2])

assert engine.get_program('prog1') == program1

with pytest.raises(KeyError, match='does not exis'):
_ = engine.get_program('yoyo')

assert set(engine.list_programs()) == {program1, program2}
assert set(engine.list_jobs()) == {job1, job2}
assert engine.list_jobs(has_labels={'color': 'blue'}) == [job1]
assert engine.list_jobs(has_labels={'color': 'red'}) == [job2]


def test_full_simulation():
engine = SimulatedLocalEngine([SimulatedLocalProcessor(processor_id='tester')])
q = cirq.GridQubit(5, 4)
circuit = cirq.Circuit(cirq.X(q) ** sympy.Symbol('t'), cirq.measure(q, key='m'))
sweep = cirq.Points(key='t', points=[1, 0])
job = engine.get_processor('tester').run_sweep(circuit, params=sweep, repetitions=100)
assert job.engine() == engine
assert job.program().engine() == engine
results = job.results()
assert np.all(results[0].measurements['m'] == 1)
assert np.all(results[1].measurements['m'] == 0)


def test_sampler():
engine = SimulatedLocalEngine([SimulatedLocalProcessor(processor_id='tester')])
q = cirq.GridQubit(5, 4)
circuit = cirq.Circuit(cirq.X(q) ** sympy.Symbol('t'), cirq.measure(q, key='m'))
sweep = cirq.Points(key='t', points=[1, 0])
results = engine.get_sampler('tester').run_sweep(circuit, params=sweep, repetitions=100)
assert np.all(results[0].measurements['m'] == 1)
assert np.all(results[1].measurements['m'] == 0)


def test_get_calibration_from_job():
cal_proto = v2.metrics_pb2.MetricsSnapshot(timestamp_ms=10000)
cal = cirq_google.Calibration(cal_proto)
proc = SimulatedLocalProcessor(processor_id='test_proc', calibrations={10000: cal})
engine = SimulatedLocalEngine([proc])
job = engine.get_processor('test_proc').run_sweep(cirq.Circuit(), params={}, repetitions=100)
assert job.get_processor() == proc
assert job.get_calibration() == cal


def test_no_calibration_from_job():
proc = SimulatedLocalProcessor(processor_id='test_proc')
engine = SimulatedLocalEngine([proc])
job = engine.get_processor('test_proc').run_sweep(cirq.Circuit(), params={}, repetitions=100)
assert job.get_processor() == proc
assert job.get_calibration() is None
123 changes: 123 additions & 0 deletions cirq-google/cirq_google/engine/simulated_local_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright 2021 The Cirq Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An implementation of AbstractJob that uses in-memory constructs
and a provided sampler to execute circuits."""
from typing import cast, List, Optional, Tuple

import cirq
from cirq_google.engine.client import quantum
from cirq_google.engine.calibration_result import CalibrationResult
from cirq_google.engine.abstract_local_job import AbstractLocalJob
from cirq_google.engine.local_simulation_type import LocalSimulationType


class SimulatedLocalJob(AbstractLocalJob):
"""A quantum job backed by a (local) sampler.

This class is designed to execute a local simulator using the
`AbstractEngine` and `AbstractJob` interface. This class will
keep track of the status based on the sampler's results.

If the simulation type is SYNCHRONOUS, the sampler will be called
once the appropriate results method is called. Other methods will
be added later.

This does not support calibration requests.
`
Attributes:
sampler: Sampler to call for results.
simulation_type: Whether sampler execution should be
synchronous or asynchronous.
"""

def __init__(
self,
*args,
sampler: cirq.Sampler = None,
simulation_type: LocalSimulationType = LocalSimulationType.SYNCHRONOUS,
**kwargs,
):
super().__init__(*args, **kwargs)
self._sampler = sampler or cirq.Simulator()
self._simulation_type = simulation_type
self._state = quantum.enums.ExecutionStatus.State.READY
self._type = simulation_type
self._failure_code = ''
self._failure_message = ''

def execution_status(self) -> quantum.enums.ExecutionStatus.State:
"""Return the execution status of the job."""
return self._state

def failure(self) -> Optional[Tuple[str, str]]:
"""Return failure code and message of the job if present."""
return (self._failure_code, self._failure_message)

def cancel(self) -> None:
"""Cancel the job."""
self._state = quantum.enums.ExecutionStatus.State.CANCELLED

def delete(self) -> None:
"""Deletes the job and result, if any."""
self.program().delete_job(self.id())
self._state = quantum.enums.ExecutionStatus.State.STATE_UNSPECIFIED

def batched_results(self) -> List[List[cirq.Result]]:
"""Returns the job results, blocking until the job is complete.

This method is intended for batched jobs. Instead of flattening
results into a single list, this will return a List[Result]
for each circuit in the batch.
"""
if self._type == LocalSimulationType.SYNCHRONOUS:
reps, sweeps = self.get_repetitions_and_sweeps()
parent = self.program()
programs = [parent.get_circuit(n) for n in range(parent.batch_size())]
try:
self._state = quantum.enums.ExecutionStatus.State.SUCCESS
return self._sampler.run_batch(
programs=programs,
params_list=cast(List[cirq.Sweepable], sweeps),
repetitions=reps,
)
except Exception as e:
self._failure_code = '500'
self._failure_message = str(e)
self._state = quantum.enums.ExecutionStatus.State.FAILURE
raise e
raise ValueError('Unsupported simulation type {self._type}')

def results(self) -> List[cirq.Result]:
"""Returns the job results, blocking until the job is complete."""
if self._type == LocalSimulationType.SYNCHRONOUS:
reps, sweeps = self.get_repetitions_and_sweeps()
program = self.program().get_circuit()
try:
self._state = quantum.enums.ExecutionStatus.State.SUCCESS
return self._sampler.run_sweep(
program=program, params=sweeps[0] if sweeps else None, repetitions=reps
)
except Exception as e:
self._failure_code = '500'
self._failure_message = str(e)
self._state = quantum.enums.ExecutionStatus.State.FAILURE
raise e
raise ValueError('Unsupported simulation type {self._type}')

def calibration_results(self) -> List[CalibrationResult]:
"""Returns the results of a run_calibration() call.

This function will fail if any other type of results were returned.
"""
raise NotImplementedError
Loading