Source code for tensorcircuit.cloud.abstraction

"""
Abstraction for Provider, Device and Task
"""

from typing import Any, Dict, List, Optional, Union, Tuple
from functools import partial
import time

import networkx as nx

from ..results import readout_mitigation as rem
from ..results import counts
from ..utils import arg_alias


[docs]class TCException(BaseException): pass
[docs]class TaskException(TCException): pass
[docs]class TaskUnfinished(TaskException):
[docs] def __init__(self, taskid: str, state: str): self.taskid = taskid self.state = state super().__init__( "Task %s is not completed yet, now in %s state" % (self.taskid, self.state) )
[docs]class TaskFailed(TaskException):
[docs] def __init__(self, taskid: str, state: str, message: str): self.taskid = taskid self.state = state self.message = message super().__init__( "Task %s is in %s state with err message %s" % (self.taskid, self.state, self.message) )
[docs]class Provider: """ Provider abstraction for cloud connection, eg. "tencent", "local" """ activated_providers: Dict[str, "Provider"] = {}
[docs] def __init__(self, name: str, lower: bool = True): if lower is True: name = name.lower() self.name = name
def __str__(self) -> str: return self.name __repr__ = __str__
[docs] @classmethod def from_name(cls, provider: Union[str, "Provider"] = "tencent") -> "Provider": if provider is None: provider = "tencent" if isinstance(provider, cls): p = provider elif isinstance(provider, str): if provider in cls.activated_providers: return cls.activated_providers[provider] else: p = cls(provider) cls.activated_providers[provider] = p else: raise ValueError( "Unsupported format for `provider` argument: %s" % provider ) return p
[docs] def set_token(self, token: str, cached: bool = True) -> Any: from .apis import set_token return set_token(token, self, cached=cached)
[docs] def get_token(self) -> str: from .apis import get_token return get_token(self) # type: ignore
[docs] def list_devices(self, **kws: Any) -> Any: from .apis import list_devices return list_devices(self, **kws)
[docs] def get_device(self, device: Optional[Union[str, "Device"]]) -> "Device": from .apis import get_device return get_device(self, device)
[docs] def list_tasks(self, **filter_kws: Any) -> List["Task"]: from .apis import list_tasks return list_tasks(self, **filter_kws)
sep = "::"
[docs]class Device: """ Device abstraction for cloud connection, eg. quantum chips """ activated_devices: Dict[str, "Device"] = {}
[docs] def __init__( self, name: str, provider: Optional[Union[str, Provider]] = None, lower: bool = False, ): if lower is True: name = name.lower() if provider is not None: self.provider = Provider.from_name(provider) if len(name.split(sep)) > 1: self.name = name.split(sep)[1] else: self.name = name else: # no explicit provider if len(name.split(sep)) > 1: self.name = name.split(sep)[1] self.provider = Provider.from_name(name.split(sep)[0]) else: from .apis import get_provider self.name = name self.provider = get_provider() self.readout_mit: Any = None
def __str__(self) -> str: return self.provider.name + sep + self.name __repr__ = __str__
[docs] @classmethod def from_name( cls, device: Union[str, "Device"], provider: Optional[Union[str, Provider]] = None, ) -> "Device": # if device is None: # raise ValueError("Must specify on device instead of default ``None``") if isinstance(device, cls): d = device elif isinstance(device, str): if len(device.split(sep)) > 1: provider = device.split(sep)[0] device = device.split(sep)[1] if provider is None: pn = "" elif isinstance(provider, str): pn = provider else: pn = provider.name if pn + sep + device in cls.activated_devices: return cls.activated_devices[pn + sep + device] else: d = cls(device, provider) cls.activated_devices[pn + sep + device] = d else: raise ValueError("Unsupported format for `provider` argument: %s" % device) return d
[docs] def set_token(self, token: str, cached: bool = True) -> Any: from .apis import set_token return set_token(token, provider=self.provider, device=self, cached=cached)
[docs] def get_token(self) -> Optional[str]: from .apis import get_token s = get_token(provider=self.provider, device=self) if s is not None: return s # fallback to provider default return get_token(provider=self.provider)
[docs] def list_properties(self) -> Dict[str, Any]: """ List all device properties in as dict :return: [description] :rtype: Dict[str, Any] """ from .apis import list_properties return list_properties(self.provider, self)
[docs] def native_gates(self) -> List[str]: """ List native gates supported for the device, str conforms qiskit convention :return: _description_ :rtype: List[str] """ properties = self.list_properties() if "native_gates" in properties: return properties["native_gates"] # type: ignore return []
[docs] def topology(self) -> List[Tuple[int, int]]: """ Get the bidirectional topology link list of the device :return: [description] :rtype: List[Tuple[int, int]] """ properties = self.list_properties() if "links" not in properties: return # type: ignore links = [] for link in properties["links"]: links.append((link[0], link[1])) links.append((link[1], link[0])) links = list(set(links)) links = [list(link) for link in links] # type: ignore # compatible with coupling_map in qiskit return links
[docs] def topology_graph(self, visualize: bool = False) -> nx.Graph: """ Get the qubit topology in ``nx.Graph`` or directly visualize it :param visualize: [description], defaults to False :type visualize: bool, optional :return: [description] :rtype: nx.Graph """ pro = self.list_properties() if not ("links" in pro and "bits" in pro): return # type: ignore g = nx.Graph() node_color = [] edge_color = [] for i in pro["bits"]: g.add_node(i) node_color.append(pro["bits"][i]["T1"]) for e1, e2 in pro["links"]: g.add_edge(e1, e2) edge_color.append(pro["links"][(e1, e2)]["CZErrRate"]) if visualize is False: return g from matplotlib import colormaps # pos1 = nx.planar_layout(g) # pos2 = nx.spring_layout(g, pos=pos1, k=2) pos = nx.kamada_kawai_layout(g) return nx.draw( g, pos=pos, with_labels=True, node_size=600, node_color=node_color, cmap=colormaps["Wistia"], vmin=max(min(node_color) - 5, 0), width=2.5, edge_color=edge_color, edge_cmap=colormaps["gray"], edge_vmin=0, edge_vmax=max(edge_color) * 1.2, )
[docs] def get_task(self, taskid: str) -> "Task": from .apis import get_task return get_task(taskid, device=self)
[docs] def submit_task(self, **task_kws: Any) -> List["Task"]: from .apis import submit_task return submit_task(provider=self.provider, device=self, **task_kws)
[docs] def list_tasks(self, **filter_kws: Any) -> List["Task"]: from .apis import list_tasks return list_tasks(self.provider, self, **filter_kws)
sep2 = "~~"
[docs]class Task: """ Task abstraction for quantum jobs on the cloud """
[docs] def __init__(self, id_: str, device: Optional[Device] = None): self.id_ = id_ self.device = device self.more_details: Dict[str, Any] = {}
def __repr__(self) -> str: return self.device.__repr__() + sep2 + self.id_ __str__ = __repr__
[docs] def get_device(self) -> Device: """ Query which device the task is run on :return: _description_ :rtype: Device """ if self.device is None: return Device.from_name(self.details()["device"]) else: return Device.from_name(self.device)
[docs] @partial(arg_alias, alias_dict={"blocked": ["wait"]}) def details(self, blocked: bool = False, **kws: Any) -> Dict[str, Any]: """ Get the current task details :param blocked: whether return until task is finished, defaults to False :type blocked: bool :return: _description_ :rtype: Dict[str, Any] """ from .apis import get_task_details if blocked is False: dt = get_task_details(self, **kws) dt.update(self.more_details) return dt s = self.state() tries = 0 while s == "pending": time.sleep(0.5 + tries / 10) tries += 1 s = self.state() return self.details(**kws) # type: ignore
[docs] def get_logical_physical_mapping(self) -> Optional[Dict[int, int]]: d = self.details() try: mp = d["optimization"]["pairs"] except KeyError: if "qubits" in d and isinstance(d["qubits"], int): mp = {i: i for i in range(d["qubits"])} else: mp = None return mp # type: ignore
[docs] def add_details(self, **kws: Any) -> None: self.more_details.update(kws)
[docs] def state(self) -> str: """ Query the current task status :return: _description_ :rtype: str """ r = self.details() return r["state"] # type: ignore
status = state
[docs] def resubmit(self) -> "Task": """ resubmit the task :return: the resubmitted task :rtype: Task """ from .apis import resubmit_task return resubmit_task(self)
[docs] @partial(arg_alias, alias_dict={"format": ["format_"], "blocked": ["wait"]}) def results( self, format: Optional[str] = None, blocked: bool = True, mitigated: bool = False, calibriation_options: Optional[Dict[str, Any]] = None, readout_mit: Optional[rem.ReadoutMit] = None, mitigation_options: Optional[Dict[str, Any]] = None, ) -> counts.ct: """ get task results of the qjob :param format: unsupported now, defaults to None, which is "count_dict_bin" :type format: Optional[str], optional :param blocked: whether blocked to wait until the result is returned, defaults to False, which raise error when the task is unfinished :type blocked: bool, optional :param mitigated: whether enable readout error mitigation, defaults to False :type mitigated: bool, optional :param calibriation_options: option dict for ``ReadoutMit.cals_from_system``, defaults to None :type calibriation_options: Optional[Dict[str, Any]], optional :param readout_mit: if given, directly use the calibriation info on ``readout_mit``, defaults to None :type readout_mit: Optional[rem.ReadoutMit], optional :param mitigation_options: option dict for ``ReadoutMit.apply_correction``, defaults to None :type mitigation_options: Optional[Dict[str, Any]], optional :return: count dict results :rtype: Any """ if not blocked: s = self.state() if s != "completed": raise TaskUnfinished(self.id_, s) r = self.details()["results"] r = counts.sort_count(r) # type: ignore else: s = self.state() tries = 0 while s != "completed": if s in ["failed"]: err = self.details().get("err", "") raise TaskFailed(self.id_, s, err) time.sleep(0.5 + tries / 10) tries += 1 s = self.state() r = self.results(format=format, blocked=False, mitigated=False) if mitigated is False: return r # type: ignore nqubit = len(list(r.keys())[0]) # mitigated is True: device = self.get_device() if device.provider.name != "tencent": raise ValueError("Only tencent provider supports auto readout mitigation") if readout_mit is None and getattr(device, "readout_mit", None) is None: def run(cs: Any, shots: Any) -> Any: """ current workaround for batch """ from .apis import submit_task ts = submit_task(circuit=cs, shots=shots, device=device.name + "?o=0") return [t.results(blocked=True) for t in ts] # type: ignore shots = self.details()["shots"] readout_mit = rem.ReadoutMit(run) if calibriation_options is None: calibriation_options = {} readout_mit.cals_from_system( list(range(nqubit)), shots, **calibriation_options ) device.readout_mit = readout_mit elif readout_mit is None: readout_mit = device.readout_mit if mitigation_options is None: try: mitigation_options = { "logical_physical_mapping": self.details()["optimization"]["pairs"] } except KeyError: mitigation_options = {} miti_count = readout_mit.apply_correction( r, list(range(nqubit)), **mitigation_options ) return counts.sort_count(miti_count)