使用 PyTorch 进行量子机器学习#

概述#

在本教程中,我们展示了具有与 mnist_qml 相同设置的 MNIST 二元分类 QML 示例。 这一次,我们使用 PyTorch 机器学习管道来构建 QML 模型。 同样,本教程不是关于最佳 QML 实践或最佳 PyTorch 管道实践,而是关于 PyTorch 和 TensorCircuit 之间集成的演示。

设置#

[1]:
import time
import numpy as np
import tensorflow as tf
import torch

import tensorcircuit as tc

K = tc.set_backend("tensorflow")

# 使用 TensorFlow 作为后端,同时将量子函数封装在 PyTorch 接口中
[2]:
# 我们使用和之前 notebook 相同的数据集与预处理

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train[..., np.newaxis] / 255.0


def filter_pair(x, y, a, b):
    keep = (y == a) | (y == b)
    x, y = x[keep], y[keep]
    y = y == a
    return x, y


x_train, y_train = filter_pair(x_train, y_train, 1, 5)
x_train_small = tf.image.resize(x_train, (3, 3)).numpy()
x_train_bin = np.array(x_train_small > 0.5, dtype=np.float32)
x_train_bin = np.squeeze(x_train_bin).reshape([-1, 9])
y_train_torch = torch.tensor(y_train, dtype=torch.float32)
x_train_torch = torch.tensor(x_train_bin)
x_train_torch.shape, y_train_torch.shape
[2]:
(torch.Size([12163, 9]), torch.Size([12163]))

使用 torch_interface 包装量子函数#

[3]:
n = 9
nlayers = 3

# 我们定义量子函数,
# 注意这个函数是在 tensorflow 上运行的


def qpred(x, weights):
    c = tc.Circuit(n)
    for i in range(n):
        c.rx(i, theta=x[i])
    for j in range(nlayers):
        for i in range(n - 1):
            c.cnot(i, i + 1)
        for i in range(n):
            c.rx(i, theta=weights[2 * j, i])
            c.ry(i, theta=weights[2 * j + 1, i])
    ypred = c.expectation_ps(z=[n // 2])
    ypred = K.real(ypred)
    return K.sigmoid(ypred)


# 将函数包装成 pytorch 形式,但具有 tensorflow 速度!
qpred_torch = tc.interfaces.torch_interface(qpred, jit=True)

在我们拥有 PyTorch 格式的可微功能之后,我们可以进一步将其包装为一个 Torch 模块(网络层)。

[4]:
class QuantumNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.q_weights = torch.nn.Parameter(torch.randn([2 * nlayers, n]))

    def forward(self, inputs):
        ypred = qpred_torch(inputs, self.q_weights)
        return ypred
[5]:
net = QuantumNet()
net(x_train_torch[0])
[5]:
tensor(0.4539, grad_fn=<FunBackward>)
[6]:
criterion = torch.nn.BCELoss()
opt = torch.optim.Adam(net.parameters(), lr=1e-2)
nepochs = 500
nbatch = 32
times = []

for epoch in range(nepochs):
    index = np.random.randint(low=0, high=100, size=nbatch)
    # index = np.arange(nbatch)
    inputs, labels = x_train_torch[index], y_train_torch[index]
    opt.zero_grad()

    with torch.set_grad_enabled(True):
        time0 = time.time()
        yps = []
        for i in range(nbatch):
            yp = net(inputs[i])
            yps.append(yp)
        yps = torch.stack(yps)
        loss = criterion(
            torch.reshape(yps, [nbatch, 1]), torch.reshape(labels, [nbatch, 1])
        )
        loss.backward()
        if epoch % 100 == 0:
            print(loss)
        opt.step()
        time1 = time.time()
        times.append(time1 - time0)

print("training time per step: ", np.mean(time1 - time0))
tensor(0.7287, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.5947, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.5804, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.6358, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.6503, grad_fn=<BinaryCrossEntropyBackward0>)
training time per step:  0.12587213516235352

批处理版本#

现在让我们尝试使用向量化版本来加快批量输入处理。请注意,本质上是 tf.vectorized_map 帮助批处理管道的。

[7]:
qpred_vmap = K.vmap(qpred, vectorized_argnums=0)

# `qpred_vmap` 是一个具有向量化能力的 TensorFlow 函数

qpred_batch = tc.interfaces.torch_interface(qpred_vmap, jit=True)

# 我们进一步将函数包装为 PyTorch 函数
[8]:
# 测试 PyTorch 函数的 AD 支持

w = torch.ones([2 * nlayers, n])
w.requires_grad_()
with torch.set_grad_enabled(True):
    yps = qpred_batch(x_train_torch[:3], w)
    loss = torch.sum(yps)
    loss.backward()
print(w.grad)
tensor([[-6.2068e-03, -3.0100e-05, -1.0997e-02, -1.8381e-02, -9.1800e-02,
          1.2481e-01, -6.5200e-02,  1.1176e-08,  7.4506e-09],
        [-3.2353e-03,  3.4989e-03, -1.1344e-02, -1.6136e-02,  1.9075e-02,
          2.1119e-02,  2.6881e-02, -1.1176e-08,  0.0000e+00],
        [-1.1777e-02, -1.1572e-03, -5.0570e-03,  6.4838e-03, -5.5077e-02,
         -3.4250e-02, -7.4506e-09, -1.1176e-08,  3.7253e-09],
        [-1.4748e-02, -2.3818e-02, -4.3567e-02, -4.7879e-02,  1.2331e-01,
          1.4314e-01,  3.7253e-09,  1.1176e-08,  3.7253e-09],
        [-3.7253e-09,  3.7253e-09,  0.0000e+00,  0.0000e+00, -2.7574e-02,
          7.4506e-09,  7.4506e-09, -1.1176e-08,  0.0000e+00],
        [ 3.7253e-09,  3.7253e-09,  1.4901e-08, -7.4506e-09,  7.1655e-02,
         -7.4506e-09,  3.7253e-09,  1.4901e-08,  0.0000e+00]])
[9]:
class QuantumNetV2(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.q_weights = torch.nn.Parameter(torch.randn([2 * nlayers, n]))

    def forward(self, inputs):
        ypred = qpred_batch(inputs, self.q_weights)
        return ypred
[10]:
net2 = QuantumNetV2()
net2(x_train_torch[:3])
[10]:
tensor([0.4706, 0.4706, 0.4767], grad_fn=<FunBackward>)

借助借用自 TensorFlow 的 vmap 基础设施,训练的性能大大提升!

[11]:
criterion = torch.nn.BCELoss()
opt = torch.optim.Adam(net2.parameters(), lr=1e-2)
nepochs = 500
nbatch = 32
times = []
for epoch in range(nepochs):
    index = np.random.randint(low=0, high=100, size=nbatch)
    # index = np.arange(nbatch)
    inputs, labels = x_train_torch[index], y_train_torch[index]
    opt.zero_grad()

    with torch.set_grad_enabled(True):
        time0 = time.time()
        yps = net2(inputs)
        loss = criterion(
            torch.reshape(yps, [nbatch, 1]), torch.reshape(labels, [nbatch, 1])
        )
        loss.backward()
        if epoch % 100 == 0:
            print(loss)
        opt.step()
        time1 = time.time()
        times.append(time1 - time0)
print("training time per step: ", np.mean(times[1:]))
tensor(0.6973, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.6421, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.6419, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.6498, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.6466, grad_fn=<BinaryCrossEntropyBackward0>)
training time per step:  0.009107916531916371

具有经典后处理的混合模型#

我们现在构建了一个量子经典混合机器学习模型管道,其中输出测量结果被进一步馈送到经典的全连接层。

[12]:
def qpreds(x, weights):
    c = tc.Circuit(n)
    for i in range(n):
        c.rx(i, theta=x[i])
    for j in range(nlayers):
        for i in range(n - 1):
            c.cnot(i, i + 1)
        for i in range(n):
            c.rx(i, theta=weights[2 * j, i])
            c.ry(i, theta=weights[2 * j + 1, i])

    return K.stack([K.real(c.expectation_ps(z=[i])) for i in range(n)])


qpreds_vmap = K.vmap(qpreds, vectorized_argnums=0)
qpreds_batch = tc.interfaces.torch_interface(qpreds_vmap, jit=True)

qpreds_batch(x_train_torch[:2], torch.ones([2 * nlayers, n]))
[12]:
tensor([[ 0.2839,  0.3786,  0.0158,  0.1512,  0.1395,  0.1364,  0.1403,  0.1423,
         -0.1285],
        [ 0.2839,  0.3786,  0.0158,  0.1512,  0.1395,  0.1364,  0.1403,  0.1423,
         -0.1285]])
[13]:
class QuantumNetV3(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.q_weights = torch.nn.Parameter(torch.randn([2 * nlayers, n]))

    def forward(self, inputs):
        ypred = qpreds_batch(inputs, self.q_weights)
        return ypred
[14]:
net3 = QuantumNetV3()
net3(x_train_bin[:2])
[14]:
tensor([[ 0.2931,  0.5393, -0.0369, -0.0450,  0.0511, -0.0121,  0.0156, -0.0406,
         -0.1330],
        [ 0.2931,  0.5393, -0.0369, -0.0450,  0.0511, -0.0121,  0.0156, -0.0406,
         -0.1330]], grad_fn=<FunBackward>)

我们现在用量子层 net3构建一个混合模型,并在后面附加一个线性层。

[15]:
model = torch.nn.Sequential(QuantumNetV3(), torch.nn.Linear(9, 1), torch.nn.Sigmoid())

model(x_train_bin[:2])
[15]:
tensor([[0.5500],
        [0.5500]], grad_fn=<SigmoidBackward0>)
[16]:
criterion = torch.nn.BCELoss()
opt = torch.optim.Adam(model.parameters(), lr=1e-2)
nepochs = 500
nbatch = 32
times = []
for epoch in range(nepochs):
    index = np.random.randint(low=0, high=100, size=nbatch)
    # index = np.arange(nbatch)
    inputs, labels = x_train_torch[index], y_train_torch[index]
    opt.zero_grad()

    with torch.set_grad_enabled(True):
        time0 = time.time()
        yps = model(inputs)
        loss = criterion(
            torch.reshape(yps, [nbatch, 1]), torch.reshape(labels, [nbatch, 1])
        )
        loss.backward()
        if epoch % 100 == 0:
            print(loss)
        opt.step()
        time1 = time.time()
        times.append(time1 - time0)
print("training time per step: ", np.mean(times[1:]))
tensor(0.6460, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.6086, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.5199, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.5697, grad_fn=<BinaryCrossEntropyBackward0>)
tensor(0.5248, grad_fn=<BinaryCrossEntropyBackward0>)
training time per step:  0.020270218113381304