tensorcircuit.cloud.wrapper 源代码

"""
higher level wrapper shortcut for submit_task
"""
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
import logging
import time

import numpy as np

from ..circuit import Circuit
from ..results import counts
from ..results.readout_mitigation import ReadoutMit
from ..utils import is_sequence
from ..cons import backend
from ..quantum import ps2xyz
from ..compiler import DefaultCompiler
from ..compiler.simple_compiler import simple_compile
from .apis import submit_task, get_device
from .abstraction import Device


logger = logging.getLogger(__name__)
Tensor = Any


[文档]def batch_submit_template( device: str, batch_limit: int = 64, **kws: Any ) -> Callable[..., List[counts.ct]]: def run( cs: Union[Circuit, Sequence[Circuit]], shots: int = 8192, **nkws: Any ) -> List[counts.ct]: """ batch circuit running alternative """ single = False if not is_sequence(cs): cs = [cs] # type: ignore single = True # for c in cs: # type: ignore # ts.append(submit_task(circuit=c, shots=shots, device=device)) # time.sleep(0.3) kws.update(nkws) if len(cs) <= batch_limit: # type: ignore css = [cs] else: ntimes = len(cs) // batch_limit # type: ignore if ntimes * batch_limit == len(cs): # type: ignore css = [ cs[i * batch_limit : (i + 1) * batch_limit] for i in range(ntimes) # type: ignore ] else: css = [ cs[i * batch_limit : (i + 1) * batch_limit] for i in range(ntimes) # type: ignore ] + [ cs[ntimes * batch_limit :] # type: ignore ] tss = [] logger.info(f"submit task on {device} for {len(cs)} circuits") # type: ignore time0 = time.time() for i, cssi in enumerate(css): tss += submit_task(circuit=cssi, shots=shots, device=device, **kws) if i < len(css) - 1: time.sleep(1.5) # TODO(@refraction-ray) whether the sleep time is enough for tquk? # incase duplicae request error protection l = [t.results() for t in tss] time1 = time.time() logger.info( f"finished collecting count results of {len(cs)} tasks in {round(time1-time0, 4)} seconds" # type: ignore ) if single is False: return l return l[0] # type: ignore return run
[文档]def sample_expectation_ps( c: Circuit, x: Optional[Sequence[int]] = None, y: Optional[Sequence[int]] = None, z: Optional[Sequence[int]] = None, shots: int = 1024, device: Optional[Device] = None, **kws: Any, ) -> float: """ Deprecated, please use :py:meth:`tensorcircuit.cloud.wrapper.batch_expectation_ps`. """ logger.warning( "This method is deprecated and not maintained, \ please use `tensorcircuit.cloud.wrapper.batch_expectation_ps` instead" ) c1 = Circuit.from_qir(c.to_qir()) if x is None: x = [] if y is None: y = [] if z is None: z = [] for i in x: c1.H(i) # type: ignore for i in y: c1.rx(i, theta=np.pi / 2) # type: ignore if device is None: device = get_device() t = submit_task(circuit=c1, device=device, shots=shots) raw_counts = t.results(blocked=True) # type: ignore x, y, z = list(x), list(y), list(z) return counts.expectation(raw_counts, x + y + z)
[文档]def reduce_and_evaluate( cs: List[Circuit], shots: int, run: Callable[..., Any] ) -> List[counts.ct]: reduced_cs = [] reduced_dict = {} recover_dict = {} # merge the same circuit for j, c in enumerate(cs): key = hash(c.to_openqasm()) if key not in reduced_dict: reduced_dict[key] = [j] reduced_cs.append(c) recover_dict[key] = len(reduced_cs) - 1 else: reduced_dict[key].append(j) # for j, ps in enumerate(pss): # ps = [i if i in [1, 2] else 0 for i in ps] # if tuple(ps) not in reduced_dict: # reduced_dict[tuple(ps)] = [j] # reduced_cs.append(cs[j]) # recover_dict[tuple(ps)] = len(reduced_cs) - 1 # else: # reduced_dict[tuple(ps)].append(j) reduced_raw_counts = run(reduced_cs, shots) raw_counts: List[Dict[str, int]] = [None] * len(cs) # type: ignore for i, c in enumerate(cs): key = hash(c.to_openqasm()) raw_counts[i] = reduced_raw_counts[recover_dict[key]] return raw_counts
[文档]def batch_expectation_ps( c: Circuit, pss: List[List[int]], device: Any = None, ws: Optional[List[float]] = None, shots: int = 8192, with_rem: bool = True, compile_func: Optional[Callable[[Circuit], Tuple[Circuit, Dict[str, Any]]]] = None, batch_limit: int = 64, batch_submit_func: Optional[Callable[..., List[counts.ct]]] = None, ) -> Union[Any, List[Any]]: """ Unified interface to compute the Pauli string expectation lists or sums via simulation or on real qpu. Error mitigation, circuit compilation and Pauli string grouping are all built-in. One line access to unlock the whole power or real quantum hardware on quantum cloud. :Example: .. code-block:: python c = tc.Circuit(2) c.h(0) c.x(1) tc.cloud.wrapper.batch_expectation_ps(c, [[1, 0], [0, 3]], device=None) # array([ 0.99999994, -0.99999994], dtype=float32) tc.cloud.wrapper.batch_expectation_ps(c, [[1, 0], [0, 3]], device="tencent::9gmon") # array([ 1.03093477, -1.11715944]) :param c: The target circuit to compute expectation :type c: Circuit :param pss: List of Pauli string list, eg. [[0, 1, 0], [2, 3, 3]] represents [X1, Y0Z1Z2]. :type pss: List[List[int]] :param device: The device str or object for quantum cloud module, defaults to None, None is for analytical exact simulation :type device: Any, optional :param ws: List of float to indicate the final return is the weighted sum of Pauli string expectations, e.g. [2., -0.3] represents the final results is 2* ``pss`` [0]-0.3* ``pss`` [1] defaults to None, None indicate the list of expectations for ``pss`` are all returned :type ws: Optional[List[float]], optional :param shots: measurement shots for each expectation estimation, defaults to 8192 :type shots: int, optional :param with_rem: whether enable readout error mitigation for the result, defaults to True :type with_rem: bool, optional :return: List of Pauli string expectation or a weighted sum float for Pauli strings, depending on ``ws`` :rtype: Union[Any, List[Any]] """ if device is None: results = [] for ps in pss: results.append(c.expectation_ps(**ps2xyz(ps))) # type: ignore if ws is None: return backend.real(backend.stack(results)) else: sumr = sum([w * r for w, r in zip(ws, results)]) return backend.convert_to_tensor(sumr) cs = [] infos = [] exps = [] if isinstance(device, str): device = get_device(device) if compile_func is None: try: coupling_map = device.topology() compile_func = DefaultCompiler( { "coupling_map": coupling_map, } ) except (AttributeError, ValueError): compile_func = DefaultCompiler() c1, info = compile_func(c) # type: ignore if not info.get("logical_physical_mapping", None): info["logical_physical_mapping"] = {i: i for i in range(c._nqubits)} for ps in pss: # TODO(@refraction-ray): Pauli string grouping # https://docs.pennylane.ai/en/stable/_modules/pennylane/pauli/grouping/group_observables.html c2 = Circuit.from_qir(c1.to_qir()) exp = [] for j, i in enumerate(ps): if i == 1: c2.H(info["logical_physical_mapping"][j]) # type: ignore c2, _ = simple_compile(c2) exp.append(j) elif i == 2: c2.rx(info["logical_physical_mapping"][j], theta=np.pi / 2) # type: ignore c2, _ = simple_compile(c2) exp.append(j) elif i == 3: exp.append(j) for i in range(c._nqubits): c2.measure_instruction(info["logical_physical_mapping"][i]) # c1, info = compile_func(c1) # type: ignore # TODO(@refraction-ray): two steps compiling with pre compilation: # basically done, require some fine tuning for performance cs.append(c2) infos.append(info) exps.append(exp) # reduced_cs = [] # reduced_dict = {} # recover_dict = {} # # merge the same circuit # for j, ps in enumerate(pss): # ps = [i if i in [1, 2] else 0 for i in ps] # if tuple(ps) not in reduced_dict: # reduced_dict[tuple(ps)] = [j] # reduced_cs.append(cs[j]) # recover_dict[tuple(ps)] = len(reduced_cs) - 1 # else: # reduced_dict[tuple(ps)].append(j) if batch_submit_func is None: run = batch_submit_template( device, batch_limit, enable_qos_qubit_mapping=False, enable_qos_gate_decomposition=False, ) else: run = batch_submit_func raw_counts = reduce_and_evaluate(cs, shots, run) # type: ignore # def run(cs: List[Any], shots: int) -> List[Dict[str, int]]: # logger.info(f"submit task on {device.name} for {len(cs)} circuits") # time0 = time.time() # ts = submit_task( # circuit=cs, # device=device, # shots=shots, # enable_qos_qubit_mapping=False, # enable_qos_gate_decomposition=False, # ) # if not is_sequence(ts): # ts = [ts] # type: ignore # raw_counts = [t.results(blocked=True) for t in ts] # time1 = time.time() # logger.info( # f"finished collecting count results of {len(cs)} tasks in {round(time1-time0, 4)} seconds" # ) # return raw_counts # reduced_raw_counts = run(reduced_cs, shots) # raw_counts: List[Dict[str, int]] = [None] * len(cs) # type: ignore # for i in range(len(cs)): # ps = [i if i in [1, 2] else 0 for i in pss[i]] # raw_counts[i] = reduced_raw_counts[recover_dict[tuple(ps)]] if with_rem: if getattr(device, "readout_mit", None) is None: mit = ReadoutMit(run) # TODO(@refraction-ray) only work for tencent provider nq = device.list_properties().get("qubits", None) if nq is None: nq = c._nqubits mit.cals_from_system(nq, shots=shots) device.readout_mit = mit else: mit = device.readout_mit results = [ mit.expectation(raw_counts[i], exps[i], **infos[i]) for i in range(len(raw_counts)) ] else: results = [ counts.expectation(raw_counts[i], exps[i]) for i in range(len(raw_counts)) ] if ws is not None: sumr = sum([w * r for w, r in zip(ws, results)]) return backend.convert_to_tensor(sumr) return backend.stack(results)