diff --git a/qiskit/primitives/__init__.py b/qiskit/primitives/__init__.py index e1b38e6f2ac2..d7a77eb77015 100644 --- a/qiskit/primitives/__init__.py +++ b/qiskit/primitives/__init__.py @@ -33,6 +33,14 @@ BackendEstimator EstimatorPub +EstimatorV2 +=========== + +.. autosummary:: + :toctree: ../stubs/ + + StatevectorEstimator + Sampler ======= @@ -65,12 +73,13 @@ from .base.sampler_result import SamplerResult from .containers import ( BindingsArray, + EstimatorPub, ObservablesArray, PrimitiveResult, PubResult, SamplerPub, - EstimatorPub, ) from .estimator import Estimator from .sampler import Sampler +from .statevector_estimator import StatevectorEstimator from .statevector_sampler import StatevectorSampler diff --git a/qiskit/primitives/containers/estimator_pub.py b/qiskit/primitives/containers/estimator_pub.py index 992c78bba524..82b50a9b755e 100644 --- a/qiskit/primitives/containers/estimator_pub.py +++ b/qiskit/primitives/containers/estimator_pub.py @@ -17,8 +17,8 @@ from __future__ import annotations -from typing import Tuple, Union from numbers import Real +from typing import Tuple, Union import numpy as np diff --git a/qiskit/primitives/statevector_estimator.py b/qiskit/primitives/statevector_estimator.py new file mode 100644 index 000000000000..530fbad8fa90 --- /dev/null +++ b/qiskit/primitives/statevector_estimator.py @@ -0,0 +1,100 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2023, 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. +""" +Estimator class +""" + +from __future__ import annotations + +from collections.abc import Iterable + +import numpy as np + +from qiskit.quantum_info import SparsePauliOp, Statevector + +from .base import BaseEstimatorV2 +from .containers import EstimatorPub, EstimatorPubLike, PrimitiveResult, PubResult +from .primitive_job import PrimitiveJob +from .utils import bound_circuit_to_instruction + + +class StatevectorEstimator(BaseEstimatorV2): + """ + Simple implementation of :class:`BaseEstimatorV2` with full state vector simulation. + + This class is implemented via :class:`~.Statevector` which turns provided circuits into + pure state vectors. These states are subsequently acted on by :class:~.SparsePauliOp`, + which implies that, at present, this implementation is only compatible with Pauli-based + observables. + """ + + def __init__( + self, *, default_precision: float = 0.0, seed: np.random.Generator | int | None = None + ): + """ + Args: + default_precision: The default precision for the estimator if not specified during run. + seed: The seed or Generator object for random number generation. + If None, a random seeded default RNG will be used. + """ + self._default_precision = default_precision + self._seed = seed + + @property + def default_precision(self) -> int: + """Return the default shots""" + return self._default_precision + + @property + def seed(self) -> np.random.Generator | int | None: + """Return the seed or Generator object for random number generation.""" + return self._seed + + def run( + self, pubs: Iterable[EstimatorPubLike], *, precision: float | None = None + ) -> PrimitiveJob[PrimitiveResult[PubResult]]: + if precision is None: + precision = self._default_precision + coerced_pubs = [EstimatorPub.coerce(pub, precision) for pub in pubs] + + job = PrimitiveJob(self._run, coerced_pubs) + job._submit() + return job + + def _run(self, pubs: list[EstimatorPub]) -> PrimitiveResult[PubResult]: + return PrimitiveResult([self._run_pub(pub) for pub in pubs]) + + def _run_pub(self, pub: EstimatorPub) -> PubResult: + rng = np.random.default_rng(self._seed) + circuit = pub.circuit + observables = pub.observables + parameter_values = pub.parameter_values + precision = pub.precision + bound_circuits = parameter_values.bind_all(circuit) + bc_circuits, bc_obs = np.broadcast_arrays(bound_circuits, observables) + evs = np.zeros_like(bc_circuits, dtype=np.float64) + stds = np.zeros_like(bc_circuits, dtype=np.float64) + for index in np.ndindex(*bc_circuits.shape): + bound_circuit = bc_circuits[index] + observable = bc_obs[index] + final_state = Statevector(bound_circuit_to_instruction(bound_circuit)) + paulis, coeffs = zip(*observable.items()) + obs = SparsePauliOp(paulis, coeffs) # TODO: support non Pauli operators + expectation_value = np.real_if_close(final_state.expectation_value(obs)) + if precision != 0: + if not np.isreal(expectation_value): + raise ValueError("Given operator is not Hermitian and noise cannot be added.") + expectation_value = rng.normal(expectation_value, precision) + evs[index] = expectation_value + data_bin_cls = self._make_data_bin(pub) + data_bin = data_bin_cls(evs=evs, stds=stds) + return PubResult(data_bin, metadata={"precision": precision}) diff --git a/test/python/primitives/test_statevector_estimator.py b/test/python/primitives/test_statevector_estimator.py new file mode 100644 index 000000000000..28c46ce8e0f1 --- /dev/null +++ b/test/python/primitives/test_statevector_estimator.py @@ -0,0 +1,282 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Tests for Estimator.""" + +import unittest + +import numpy as np + +from qiskit.circuit import Parameter, QuantumCircuit +from qiskit.circuit.library import RealAmplitudes +from qiskit.primitives import BindingsArray, EstimatorPub, ObservablesArray, StatevectorEstimator +from qiskit.quantum_info import SparsePauliOp +from qiskit.test import QiskitTestCase + + +class TestStatevectorEstimator(QiskitTestCase): + """Test Estimator""" + + def setUp(self): + super().setUp() + self.ansatz = RealAmplitudes(num_qubits=2, reps=2) + self.observable = SparsePauliOp.from_list( + [ + ("II", -1.052373245772859), + ("IZ", 0.39793742484318045), + ("ZI", -0.39793742484318045), + ("ZZ", -0.01128010425623538), + ("XX", 0.18093119978423156), + ] + ) + self.expvals = -1.0284380963435145, -1.284366511861733 + + self.psi = (RealAmplitudes(num_qubits=2, reps=2), RealAmplitudes(num_qubits=2, reps=3)) + self.params = tuple(psi.parameters for psi in self.psi) + self.hamiltonian = ( + SparsePauliOp.from_list([("II", 1), ("IZ", 2), ("XI", 3)]), + SparsePauliOp.from_list([("IZ", 1)]), + SparsePauliOp.from_list([("ZI", 1), ("ZZ", 1)]), + ) + self.theta = ( + [0, 1, 1, 2, 3, 5], + [0, 1, 1, 2, 3, 5, 8, 13], + [1, 2, 3, 4, 5, 6], + ) + + def test_estimator_run(self): + """Test Estimator.run()""" + psi1, psi2 = self.psi + hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian + theta1, theta2, theta3 = self.theta + estimator = StatevectorEstimator() + + # Specify the circuit and observable by indices. + # calculate [ ] + job = estimator.run([(psi1, hamiltonian1, [theta1])]) + result = job.result() + np.testing.assert_allclose(result[0].data.evs, [1.5555572817900956]) + + # Objects can be passed instead of indices. + # Note that passing objects has an overhead + # since the corresponding indices need to be searched. + # User can append a circuit and observable. + # calculate [ ] + result2 = estimator.run([(psi2, hamiltonian1, theta2)]).result() + np.testing.assert_allclose(result2[0].data.evs, [2.97797666]) + + # calculate [ , ] + result3 = estimator.run([(psi1, [hamiltonian2, hamiltonian3], theta1)]).result() + np.testing.assert_allclose(result3[0].data.evs, [-0.551653, 0.07535239]) + + # calculate [ [, + # ], + # [] ] + result4 = estimator.run( + [(psi1, [hamiltonian1, hamiltonian3], [theta1, theta3]), (psi2, hamiltonian2, theta2)] + ).result() + np.testing.assert_allclose(result4[0].data.evs, [1.55555728, -1.08766318]) + np.testing.assert_allclose(result4[1].data.evs, [0.17849238]) + + def test_estimator_with_pub(self): + """Test estimator with explicit EstimatorPubs.""" + psi1, psi2 = self.psi + hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian + theta1, theta2, theta3 = self.theta + + obs1 = ObservablesArray.coerce([hamiltonian1, hamiltonian3]) + bind1 = BindingsArray.coerce([theta1, theta3]) + pub1 = EstimatorPub(psi1, obs1, bind1) + obs2 = ObservablesArray.coerce(hamiltonian2) + bind2 = BindingsArray.coerce(theta2) + pub2 = EstimatorPub(psi2, obs2, bind2) + + estimator = StatevectorEstimator() + result4 = estimator.run([pub1, pub2]).result() + np.testing.assert_allclose(result4[0].data.evs, [1.55555728, -1.08766318]) + np.testing.assert_allclose(result4[1].data.evs, [0.17849238]) + + def test_estimator_run_no_params(self): + """test for estimator without parameters""" + circuit = self.ansatz.assign_parameters([0, 1, 1, 2, 3, 5]) + est = StatevectorEstimator() + result = est.run([(circuit, self.observable)]).result() + np.testing.assert_allclose(result[0].data.evs, [-1.284366511861733]) + + def test_run_single_circuit_observable(self): + """Test for single circuit and single observable case.""" + est = StatevectorEstimator() + + with self.subTest("No parameter"): + qc = QuantumCircuit(1) + qc.x(0) + op = SparsePauliOp("Z") + param_vals = [None, [], [[]], np.array([]), np.array([[]]), [np.array([])]] + target = [-1] + for val in param_vals: + self.subTest(f"{val}") + result = est.run([(qc, op, val)]).result() + np.testing.assert_allclose(result[0].data.evs, target) + self.assertEqual(result[0].metadata["precision"], 0) + + with self.subTest("One parameter"): + param = Parameter("x") + qc = QuantumCircuit(1) + qc.ry(param, 0) + op = SparsePauliOp("Z") + param_vals = [ + [np.pi], + [[np.pi]], + np.array([np.pi]), + np.array([[np.pi]]), + [np.array([np.pi])], + ] + target = [-1] + for val in param_vals: + self.subTest(f"{val}") + result = est.run([(qc, op, val)]).result() + np.testing.assert_allclose(result[0].data.evs, target) + self.assertEqual(result[0].metadata["precision"], 0) + + with self.subTest("More than one parameter"): + qc = self.psi[0] + op = self.hamiltonian[0] + param_vals = [ + self.theta[0], + [self.theta[0]], + np.array(self.theta[0]), + np.array([self.theta[0]]), + [np.array(self.theta[0])], + ] + target = [1.5555572817900956] + for val in param_vals: + self.subTest(f"{val}") + result = est.run([(qc, op, val)]).result() + np.testing.assert_allclose(result[0].data.evs, target) + self.assertEqual(result[0].metadata["precision"], 0) + + def test_run_1qubit(self): + """Test for 1-qubit cases""" + qc = QuantumCircuit(1) + qc2 = QuantumCircuit(1) + qc2.x(0) + + op = SparsePauliOp.from_list([("I", 1)]) + op2 = SparsePauliOp.from_list([("Z", 1)]) + + est = StatevectorEstimator() + result = est.run([(qc, op)]).result() + np.testing.assert_allclose(result[0].data.evs, [1]) + + result = est.run([(qc, op2)]).result() + np.testing.assert_allclose(result[0].data.evs, [1]) + + result = est.run([(qc2, op)]).result() + np.testing.assert_allclose(result[0].data.evs, [1]) + + result = est.run([(qc2, op2)]).result() + np.testing.assert_allclose(result[0].data.evs, [-1]) + + def test_run_2qubits(self): + """Test for 2-qubit cases (to check endian)""" + qc = QuantumCircuit(2) + qc2 = QuantumCircuit(2) + qc2.x(0) + + op = SparsePauliOp.from_list([("II", 1)]) + op2 = SparsePauliOp.from_list([("ZI", 1)]) + op3 = SparsePauliOp.from_list([("IZ", 1)]) + + est = StatevectorEstimator() + result = est.run([(qc, op)]).result() + np.testing.assert_allclose(result[0].data.evs, [1]) + + result = est.run([(qc2, op)]).result() + np.testing.assert_allclose(result[0].data.evs, [1]) + + result = est.run([(qc, op2)]).result() + np.testing.assert_allclose(result[0].data.evs, [1]) + + result = est.run([(qc2, op2)]).result() + np.testing.assert_allclose(result[0].data.evs, [1]) + + result = est.run([(qc, op3)]).result() + np.testing.assert_allclose(result[0].data.evs, [1]) + + result = est.run([(qc2, op3)]).result() + np.testing.assert_allclose(result[0].data.evs, [-1]) + + def test_run_errors(self): + """Test for errors""" + qc = QuantumCircuit(1) + qc2 = QuantumCircuit(2) + + op = SparsePauliOp.from_list([("I", 1)]) + op2 = SparsePauliOp.from_list([("II", 1)]) + + est = StatevectorEstimator() + with self.assertRaises(ValueError): + est.run([(qc, op2)]).result() + with self.assertRaises(ValueError): + est.run([(qc, op, [[1e4]])]).result() + with self.assertRaises(ValueError): + est.run([(qc2, op2, [[1, 2]])]).result() + with self.assertRaises(ValueError): + est.run([(qc, [op, op2], [[1]])]).result() + with self.assertRaises(ValueError): + est.run([(qc, op)], precision=-1).result() + with self.assertRaises(ValueError): + est.run([(qc, 1j * op)], precision=0.1).result() + + def test_run_numpy_params(self): + """Test for numpy array as parameter values""" + qc = RealAmplitudes(num_qubits=2, reps=2) + op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) + k = 5 + params_array = np.random.rand(k, qc.num_parameters) + params_list = params_array.tolist() + params_list_array = list(params_array) + estimator = StatevectorEstimator() + target = estimator.run([(qc, op, params_list)]).result() + + with self.subTest("ndarrary"): + result = estimator.run([(qc, op, params_array)]).result() + self.assertEqual(len(result[0].data.evs), k) + np.testing.assert_allclose(result[0].data.evs, target[0].data.evs) + + with self.subTest("list of ndarray"): + result = estimator.run([(qc, op, params_list_array)]).result() + self.assertEqual(len(result[0].data.evs), k) + np.testing.assert_allclose(result[0].data.evs, target[0].data.evs) + + def test_precision_seed(self): + """Test for precision and seed""" + estimator = StatevectorEstimator(default_precision=1.0, seed=1) + psi1 = self.psi[0] + hamiltonian1 = self.hamiltonian[0] + theta1 = self.theta[0] + job = estimator.run([(psi1, hamiltonian1, [theta1])]) + result = job.result() + np.testing.assert_allclose(result[0].data.evs, [1.901141473854881]) + # The result of the second run is the same + job = estimator.run([(psi1, hamiltonian1, [theta1]), (psi1, hamiltonian1, [theta1])]) + result = job.result() + np.testing.assert_allclose(result[0].data.evs, [1.901141473854881]) + np.testing.assert_allclose(result[1].data.evs, [1.901141473854881]) + # precision=0 impliese the exact expectation value + job = estimator.run([(psi1, hamiltonian1, [theta1])], precision=0) + result = job.result() + np.testing.assert_allclose(result[0].data.evs, [1.5555572817900956]) + + +if __name__ == "__main__": + unittest.main()