"""
A collection of useful function snippets that irrelevant with the main modules
or await for further refactor
"""
import logging
from typing import Any, Callable, Iterator, Optional, Sequence, Tuple
import numpy as np
import tensorflow as tf
logger = logging.getLogger(__name__)
try:
import cirq
except ImportError as e:
logger.warning(e)
logger.warning("Therefore some functionality in %s may not work" % __name__)
from ..circuit import Circuit
from .. import gates as G
from ..gates import array_to_tensor, num_to_tensor
Array = Any
Tensor = Any
Graph = Any
[文档]class FakeModule:
def __getattr__(self, name: str) -> str:
return name
fake_module = FakeModule()
##############
## qml related functions to be refactored
##############
# tf specific
[文档]def amplitude_encoding(
fig: Tensor,
qubits: int,
index: Optional[Sequence[int]] = None,
index_func: Optional[Callable[[int, int], Sequence[int]]] = None,
) -> Tensor:
if fig.shape[-1] == 1:
fig = tf.reshape(fig, shape=fig.shape[:-1])
if len(fig.shape) == 2:
fig = fig[tf.newaxis, ...]
fig = tf.reshape(fig, shape=(fig.shape[0], fig.shape[1] * fig.shape[2]))
norm = tf.linalg.norm(fig, axis=1)
norm = norm[..., tf.newaxis]
fig = fig / norm
if fig.shape[1] < 2**qubits:
fig = tf.concat(
[
fig,
tf.zeros([fig.shape[0], 2**qubits - fig.shape[1]], dtype=tf.float64),
],
axis=1,
)
if index is None and index_func is not None:
index = []
for i in range(32):
# magic number here, only valid for MNIST data
for j in range(32):
l = index_func(i, j)
r = 0
for p, q in enumerate(l):
r += q * 2 ** (9 - p)
index.append(r)
if index is not None:
fig = tf.constant(fig.numpy()[:, index], dtype=fig.dtype)
return fig
[文档]def recursive_index(x: int, y: int) -> Sequence[int]:
rl = []
for k in range(5):
rl.append((x // (2 ** (4 - k))) % 2)
rl.append((y // (2 ** (4 - k))) % 2)
return rl
[文档]def mnist_amplitude_data(
a: int,
b: int,
binarize: bool = False,
index: Optional[Sequence[int]] = None,
index_func: Optional[Callable[[int, int], Sequence[int]]] = None,
loader: Any = None,
threshold: float = 0.4,
) -> Tensor:
def filter_pair(x: Tensor, y: Tensor, a: int, b: int) -> Tuple[Tensor, Tensor]:
keep = (y == a) | (y == b)
x, y = x[keep], y[keep]
y = y == a
return x, y
if loader is None:
loader = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = loader.load_data()
x_train, x_test = x_train[..., np.newaxis] / 255.0, x_test[..., np.newaxis] / 255.0
if binarize:
x_train[x_train > threshold] = 1.0
x_train[x_train <= threshold] = 0.0
x_test[x_test > threshold] = 1.0
x_test[x_test <= threshold] = 0.0
x_train, y_train = filter_pair(x_train, y_train, a, b)
x_test, y_test = filter_pair(x_test, y_test, a, b)
x_train = tf.image.pad_to_bounding_box(x_train, 2, 2, 32, 32)
x_test = tf.image.pad_to_bounding_box(x_test, 2, 2, 32, 32)
x_train_q = amplitude_encoding(x_train, 10, index=index, index_func=index_func)
x_test_q = amplitude_encoding(x_test, 10, index=index, index_func=index_func)
return (x_train_q, y_train), (x_test_q, y_test)
[文档]def mnist_generator(
x_train: Tensor, y_train: Tensor, batch: int = 1, random: bool = True
) -> Iterator[Any]:
if isinstance(x_train, tf.Tensor):
x_train = x_train.numpy()
if isinstance(y_train, tf.Tensor):
y_train = y_train.numpy()
i = np.array(list(range(batch)))
while True:
if random:
i = np.random.randint(low=0, high=x_train.shape[0], size=batch)
else:
i += batch
i = i % x_train.shape[0]
yield tf.constant(x_train[i]), tf.constant(y_train[i])
[文档]def generate_random_circuit(
inputs: Tensor, nqubits: int = 10, epochs: int = 3, layouts: Tensor = None
) -> Circuit:
inputs = tf.cast(inputs, dtype=tf.complex64)
c = Circuit(nqubits, inputs=inputs)
if layouts is None:
layouts = np.random.choice([0, 1, 2], size=[epochs, nqubits])
layouts = tf.reshape(layouts, shape=[epochs, nqubits])
for epoch in range(epochs):
for i in range(nqubits):
flg = layouts[epoch, i]
if flg == 0:
c.rx(i, theta=num_to_tensor(np.pi / 2)) # type: ignore
elif flg == 1:
c.ry(i, theta=num_to_tensor(np.pi / 2)) # type: ignore
else:
c.wroot(i) # type: ignore
for i in range(nqubits): # entangling layer
c.swap(i, (i + 1) % nqubits) # type: ignore
return c
# the following vags are not the recommended fashion to write qml tasks in tensorcircuit now.
# also the hardcoded circuit structure is a bad practice, they are here for backward compatibility
# instead use vvag or keras paradigm now
[文档]def naive_qml_vag(
gdata: Graph,
nnp: Tensor,
preset: Sequence[int],
nqubits: int = 10,
epochs: int = 3,
target: int = 0,
) -> Tuple[Tensor, Tensor]:
# used for structure searching?
xs, ys = gdata
loss = 0.0
for x, y in zip(xs, ys):
circuit = generate_random_circuit(
x, nqubits=nqubits, epochs=epochs, layouts=preset
)
value = circuit.expectation(
(
G.z(), # type: ignore
[
target,
],
)
)
y = tf.cast(y, dtype=tf.complex64)
y = 2 * y - 1
loss += (value - y) ** 2
return tf.constant(tf.cast(loss, dtype=tf.float32)), tf.zeros_like(nnp) # MSE loss
[文档]def train_qml_vag(
gdata: Graph,
nnp: Tensor,
preset: Optional[Sequence[int]] = None,
nqubits: int = 10,
epochs: int = 3,
batch: int = 64,
validation: bool = False,
) -> Any:
xs, ys = gdata
with tf.GradientTape() as tape:
tape.watch(nnp)
cnnp = tf.cast(nnp, dtype=tf.complex64)
c = Circuit(nqubits, inputs=np.ones([1024], dtype=np.complex64) / 2**5)
for epoch in range(epochs):
for i in range(nqubits):
c.rz(i, theta=cnnp[3 * epoch, i]) # type: ignore
for i in range(nqubits):
c.ry(i, theta=cnnp[3 * epoch + 1, i]) # type: ignore
for i in range(0, nqubits, 2):
# c.swap(i, (i + 1) % 10) # type: ignore
c.exp( # type: ignore
i,
(i + 1) % 10,
unitary=array_to_tensor(G._swap_matrix),
theta=cnnp[3 * epoch + 2, i],
)
for i in range(1, nqubits, 2):
# c.swap(i, (i + 1) % 10) # type: ignore
c.exp( # type: ignore
i,
(i + 1) % 10,
unitary=array_to_tensor(G._swap_matrix),
theta=cnnp[3 * epoch + 2, i],
)
for i in range(nqubits):
c.rx(i, theta=cnnp[3 * epochs, i]) # type: ignore
count = 0
loss = 0
for x, y in zip(xs, ys):
y = tf.cast(y, dtype=tf.float32) # [1, 0] binary
c.replace_inputs(tf.cast(x, dtype=tf.complex64))
flg = False
yp = 0.0
for i in range(nqubits):
yp += cnnp[3 * epochs + 1, i] * c.expectation(
(
G.z(), # type: ignore
[
i,
],
),
reuse=flg,
)
flg = True
yp = tf.cast(yp, dtype=tf.float32)
yp = tf.math.sigmoid(
(yp + tf.math.real(cnnp[3 * epochs + 2, 0])) * 15
) ## [0,1] intepreted as probability
if tf.math.abs(yp - y) < 0.5:
count += 1
loss += (yp - y) ** 2
# print("accuracy:", count / batch)
gr = tape.gradient(loss, cnnp)
if not validation:
return tf.constant(count / batch, dtype=tf.float32), gr
else:
return count / batch # evaluate as validation kernel
[文档]def validate_qml_vag(
gdata: Graph,
nnp: Tensor,
preset: Optional[Sequence[int]] = None,
nqubits: int = 10,
epochs: int = 3,
batch: int = 64,
) -> Any:
xs, ys = gdata
cnnp = tf.cast(nnp, dtype=tf.complex64)
c = Circuit(nqubits, inputs=np.ones([1024], dtype=np.complex64) / 2**5)
for epoch in range(epochs):
for i in range(nqubits):
c.rz(i, theta=cnnp[3 * epoch, i]) # type: ignore
for i in range(nqubits):
c.ry(i, theta=cnnp[3 * epoch + 1, i]) # type: ignore
for i in range(0, nqubits, 2):
# c.swap(i, (i + 1) % 10) # type: ignore
c.exp( # type: ignore
i,
(i + 1) % 10,
unitary=array_to_tensor(G._swap_matrix),
theta=cnnp[3 * epoch + 2, i],
)
for i in range(1, nqubits, 2):
# c.swap(i, (i + 1) % 10) # type: ignore
c.exp( # type: ignore
i,
(i + 1) % 10,
unitary=array_to_tensor(G._swap_matrix),
theta=cnnp[3 * epoch + 2, i],
)
for i in range(nqubits):
c.rx(i, theta=cnnp[3 * epochs, i]) # type: ignore
count = 0
loss = 0
for x, y in zip(xs, ys):
y = tf.cast(y, dtype=tf.float32) # [1, 0] binary
c.replace_inputs(tf.cast(x, dtype=tf.complex64))
flg = False
yp = 0.0
for i in range(nqubits):
yp += cnnp[3 * epochs + 1, i] * c.expectation(
(
G.z(), # type: ignore
[
i,
],
),
reuse=flg,
)
flg = True
yp = tf.cast(yp, dtype=tf.float32)
yp = tf.math.sigmoid(
(yp + tf.math.real(cnnp[3 * epochs + 2, 0])) * 15
) ## [0,1] intepreted as probability
if tf.math.abs(yp - y) < 0.5:
count += 1
loss += (yp - y) ** 2
return {
"val_loss": loss / batch,
"val_accuracy": count / batch,
} # evaluate as validation kernel
##############
## really small and irrelavant pieces below
##############
[文档]def color_svg(circuit: cirq.Circuit, *coords: Tuple[int, int]) -> Any:
"""
color cirq circuit SVG for given gates,
a small tool to hack the cirq SVG
:param circuit:
:param coords: integer coordinate which gate is colored
:return:
"""
import xml
from cirq.contrib.svg import SVGCircuit
svg_str = SVGCircuit(circuit)._repr_svg_()
DOMTree = xml.dom.minidom.parseString(svg_str) # type: ignore
xpos = []
ypos = []
for r in DOMTree.getElementsByTagName("rect"): # [0].setAttribute("fill", "gray")
xpos.append(int(float(r.getAttribute("x"))))
ypos.append(int(float(r.getAttribute("y"))))
xpos = sorted(list(set(xpos)))
ypos = sorted(list(set(ypos)))
# xpos_dict = dict(zip(range(len(xpos)), xpos))
# ypos_dict = dict(zip(range(len(ypos)), ypos))
i_xpos_dict = dict(zip(xpos, range(len(xpos))))
i_ypos_dict = dict(zip(ypos, range(len(ypos))))
for r in DOMTree.getElementsByTagName("rect"):
x, y = int(float(r.getAttribute("x"))), int(float(r.getAttribute("y")))
if (i_xpos_dict[x], i_ypos_dict[y]) in coords:
r.setAttribute("fill", "gray")
return DOMTree.toxml()
[文档]def repr2array(inputs: str) -> Array:
"""
transform repr form of an array to real numpy array
:param inputs:
:return:
"""
# used to transform np.array in print form into live np.array
inputs = inputs.split("]") # type: ignore
inputs = [l.strip().strip("[") for l in inputs if l.strip()] # type: ignore
outputs = []
for l in inputs:
o = [float(c.strip()) for c in l.split(" ") if c.strip()]
outputs.append(o)
return np.array(outputs)
# duplicated, will remove later
# use the version in applications.physics.baseline.py
[文档]def TFIM1Denergy(
L: int, Jzz: float = 1.0, Jx: float = 1.0, Pauli: bool = True
) -> float:
# PBC
# nice tutorial: https://arxiv.org/pdf/2009.09208.pdf
# further investigation on 1 TFIM solution structure is required
# will fail on AFM phase Jzz>Jx and Jzz>0 and odd sites (frustration in boundary)
e = 0
if Pauli:
Jx *= 2
Jzz *= 4
for i in range(L):
q = np.pi * (2 * i - (1 + (-1) ** L) / 2) / L
e -= np.abs(Jx) / 2 * np.sqrt(1 + Jzz**2 / 4 / Jx**2 - Jzz / Jx * np.cos(q))
return e
[文档]def Heisenberg1Denergy(L: int, Pauli: bool = True, maxiters: int = 1000) -> float:
# PBC
error = 1e-15
eps = 1e-20 # avoid zero division
phi = np.zeros([L // 2, L // 2])
phi2 = np.zeros([L // 2, L // 2])
lamb = np.array([2 * i + 1 for i in range(L // 2)])
for _ in range(maxiters):
k = 1 / L * (2 * np.pi * lamb + np.sum(phi, axis=-1) - np.diag(phi))
for i in range(L // 2):
for j in range(L // 2):
phi2[i, j] = (
np.arctan(
2
/ (
1 / (np.tan(k[i] / 2) + eps)
- 1 / (np.tan(k[j] / 2) + eps)
+ eps
)
)
* 2
)
if np.allclose(phi, phi2, rtol=error): # converged
break
phi = phi2.copy()
else:
raise ValueError(
"the maxiters %s is too small for bethe ansatz to converge" % maxiters
)
e = -np.sum(1 - np.cos(k)) + L / 4
if Pauli is True:
e *= 4
return e # type: ignore