import logging
import itertools
from copy import deepcopy
from typing import List, Union
import numpy as np
import scipy
import scipy.sparse
from renormalizer.model import Model, HolsteinModel
from renormalizer.mps.backend import xp
from renormalizer.mps.matrix import moveaxis, tensordot
from import MatrixProduct
from renormalizer.mps.svd_qn import add_outer
from renormalizer.mps import svd_qn
from renormalizer.mps.lib import update_cv
from renormalizer.mps.symbolic_mpo import construct_symbolic_mpo, _terms_to_table, symbolic_mo_to_numeric_mo, swap_site
from renormalizer.utils import Quantity
from renormalizer.model.op import Op
from renormalizer.utils.elementop import (
logger = logging.getLogger(__name__)
[docs]class Mpo(MatrixProduct):
Matrix product operator (MPO)
[docs] @classmethod
def exact_propagator(cls, model: HolsteinModel, x, space="GS", shift=0.0):
construct the GS space propagator e^{xH} exact MPO
H=\\sum_{in} \\omega_{in} b^\\dagger_{in} b_{in}
fortunately, the H is local. so e^{xH} = e^{xh1}e^{xh2}...e^{xhn}
the bond dimension is 1
shift is the a constant for H+shift
assert space in ["GS", "EX"]
mpo = cls()
if np.iscomplex(x):
mpo.model = model
for imol, mol in enumerate(model):
if model.scheme < 4:
mo = np.eye(2).reshape(1, 2, 2, 1)
elif model.scheme == 4:
if len(mpo) == model.order[0]:
n = model.mol_num
mpo.append(np.eye(n+1).reshape(1, n+1, n+1, 1))
assert False
for ph in mol.ph_list:
if space == "EX":
ph_pbond = ph.pbond
# construct the matrix exponential by diagonalize the matrix first
phop = construct_ph_op_dict(ph_pbond)
h_mo = (
phop[r"b^\dagger b"] *[0]
+ phop[r"b^\dagger + b"] * ph.term10
w, v = scipy.linalg.eigh(h_mo)
h_mo = np.diag(np.exp(x * w))
h_mo =
h_mo =
mo = h_mo.reshape(1, ph_pbond, ph_pbond, 1)
elif space == "GS":
# for the ground state space
ph_pbond = ph.pbond
d = np.exp(
* np.arange(ph_pbond)
mo = np.diag(d).reshape(1, ph_pbond, ph_pbond, 1)
assert False
# shift the H by plus a constant
mpo.qn = [np.zeros((1, model.qn_size), dtype=int)] * (len(mpo) + 1)
mpo.qnidx = len(mpo) - 1
mpo.qntot = np.zeros(model.qn_size, dtype=int)
# np.exp(shift * x) is usually very large
mpo = mpo.scale(np.exp(shift * x), inplace=True)
return mpo
[docs] @classmethod
def onsite(cls, model: Model, opera, dipole=False, dof_set=None):
if dof_set is None:
if model.n_edofs == 0:
raise ValueError("No electronic DoF present in the model.")
dof_set = model.e_dofs
ops = []
for idx in dof_set:
if dipole:
factor = model.dipole[idx]
factor = 1.
ops.append(Op(opera, idx, factor))
return cls(model, ops)
[docs] @classmethod
def ph_onsite(cls, model: HolsteinModel, opera: str, mol_idx:int, ph_idx=0):
assert opera in ["b", r"b^\dagger", r"b^\dagger b"]
if not isinstance(model, HolsteinModel):
raise TypeError("ph_onsite only supports HolsteinModel")
return cls(model, Op(opera, (mol_idx, ph_idx)))
[docs] @classmethod
def intersite(cls, model: HolsteinModel, e_opera: dict, ph_opera: dict, scale:
r""" construct the inter site MPO
model : HolsteinModel
the molecular information
the electronic operators. {imol: operator}, such as {1:"a", 3:r"a^\dagger"}
the vibrational operators. {(imol, iph): operator}, such as {(0,5):"b"}
scale: Quantity
scalar to scale the mpo
the operator index starts from 0,1,2...
ops = []
for e_key, e_op in e_opera.items():
ops.append(Op(e_op, e_key))
for v_key, v_op in ph_opera.items():
ops.append(Op(v_op, v_key))
op = scale.as_au() * Op.product(ops)
return cls(model, op)
[docs] @classmethod
def finiteT_cv(cls, model, nexciton, m_max, spectratype, percent=1.0):
# np.random.seed(0)
X = cls()
X.model = model
if spectratype == "abs":
# quantum number index, |1><0|
tag_1, tag_2 = 0, 1
elif spectratype == "emi":
# quantum number index, |0><1|
tag_1, tag_2 = 1, 0
X.qn = [[[0, 0]]]
for ix in range(model.nsite - 1):
X.qn.append([[0, 0]])
dim_list = [1]
for ix in range(model.nsite - 1):
sigmaqn = model.basis[ix].sigmaqn
sigmaqn = np.array(list(itertools.product(sigmaqn, repeat=2)))
qn1 = np.add.outer(np.array(X.qn[ix])[:, 0], sigmaqn[:, 0]).ravel()
qn2 = np.add.outer(np.array(X.qn[ix])[:, 1], sigmaqn[:, 1]).ravel()
qnbig = np.stack([qn1, qn2], axis=1)
# print('qnbig', qnbig)
u_set = []
s_set = []
qnset = []
if spectratype != "conductivity":
fq = list(itertools.chain.from_iterable([y[tag_1]] for y in qnbig))
for iblock in range(min(fq), nexciton+1):
indices = [i for i, y in enumerate(qnbig) if
((y[tag_1] == iblock) and (y[tag_2] == 0))]
if len(indices) != 0:
a: np.ndarray = np.random.random([len(indices), len(indices)]) - 0.5
a = a + a.T
s, u = scipy.linalg.eigh(a=a)
u_set.append(svd_qn.blockrecover(indices, u, len(qnbig)))
if spectratype == "abs":
qnset += [iblock, 0] * len(indices)
elif spectratype == "emi":
qnset += [0, iblock] * len(indices)
fq1 = list(itertools.chain.from_iterable([y[0]] for y in qnbig))
fq2 = list(itertools.chain.from_iterable([y[1]] for y in qnbig))
# print('fq1, fq2', fq1, fq2)
for iblock in range(min(fq1), nexciton+1):
for jblock in range(min(fq2), nexciton+1):
# print('iblock', iblock, jblock)
indices = [i for i, y in enumerate(qnbig) if
((y[0] == iblock) and (y[1] == jblock))]
# print('indices', indices)
if len(indices) != 0:
a: np.ndarray = np.random.random([len(indices), len(indices)]) - 0.5
a = a + a.T
s, u = scipy.linalg.eigh(a=a)
u_set.append(svd_qn.blockrecover(indices, u, len(qnbig)))
qnset += [iblock, jblock] * len(indices)
# print('iblock', iblock)
list_qnset = []
for i in range(0, len(qnset), 2):
list_qnset.append([qnset[i], qnset[i + 1]])
qnset = list_qnset
# print('qnset', qnset)
u_set = np.concatenate(u_set, axis=1)
s_set = np.concatenate(s_set)
# print('uset', u_set.shape)
# print('s_set', s_set.shape)
x, xdim, xqn, compx = update_cv(u_set, s_set, qnset, None, nexciton, m_max, spectratype, percent=percent)
X.qn[ix + 1] = xqn
x = x.reshape(dim_list[-2], model.pbond_list[ix], model.pbond_list[ix], dim_list[ix + 1])
X.append(np.random.random([dim_list[-2], model.pbond_list[-1],
model.pbond_list[-1], dim_list[-1]]))
X.qnidx = len(X) - 1
X.to_right = False
X.qntot = nexciton
# print('dim', [X[i].shape for i in range(len(X))])
return X
[docs] @classmethod
def identity(cls, model: Model):
mpo = cls()
mpo.model = model
for p in model.pbond_list:
mpo.append(np.eye(p).reshape(1, p, p, 1))
return mpo
def __init__(self, model: Model = None, terms: Union[Op, List[Op]] = None, offset: Quantity = Quantity(0), algo = "qr"):
todo: document
super(Mpo, self).__init__()
# leave the possibility to construct MPO by hand
if model is None:
if not isinstance(offset, Quantity):
raise ValueError(f"offset must be Quantity object. Got {offset} of {type(offset)}.")
self.offset = offset.as_au()
if terms is None:
terms = model.ham_terms
elif isinstance(terms, Op):
terms = [terms]
if len(terms) == 0:
raise ValueError("Terms contain nothing.")
terms = model.check_operator_terms(terms)
if len(terms) == 0:
raise ValueError("Terms all have factor 0.")
table, primary_ops, factor = _terms_to_table(model, terms, -self.offset)
self.dtype = factor.dtype
self.symbolic_mpo, self.qn, self.qntot, self.qnidx, self.symbolic_out_ops_list, self.primary_ops \
= construct_symbolic_mpo(table, primary_ops, factor, algo=algo)
# from renormalizer.mps.symbolic_mpo import _format_symbolic_mpo
# print(_format_symbolic_mpo(mpo_symbol))
self.model = model
self.to_right = False
# evaluate the symbolic mpo
assert model.basis is not None
for impo, mo in enumerate(self.symbolic_mpo):
mo_mat = symbolic_mo_to_numeric_mo(model.basis[impo], mo, self.dtype)
def _get_sigmaqn(self, idx):
array_up = self.model.basis[idx].sigmaqn
return add_outer(array_up, -array_up)
def is_mps(self):
return False
def is_mpo(self):
return True
def is_mpdm(self):
return False
def dummy_qn(self):
return [np.zeros((dim, self.model.qn_size), dtype=int) for dim in self.bond_dims]
def digest(self):
return np.array([mt.var() for mt in self]).var()
[docs] def apply(self, mp: MatrixProduct, canonicalise: bool=False) -> MatrixProduct:
# todo: use meta copy to save time, could be subtle when complex type is involved
# todo: inplace version (saved memory and can be used in `hybrid_exact_propagator`)
# the model is the same as the mps.model
assert self.site_num == mp.site_num
new_mps = self.promote_mt_type(mp.copy())
if mp.is_mps:
# mpo x mps
for i, (mt_self, mt_other) in enumerate(zip(self, mp)):
assert mt_self.shape[2] == mt_other.shape[1]
# mt=np.einsum("apqb,cqd->acpbd",mpo[i],mps[i])
mt = xp.moveaxis(
tensordot(mt_self.array, mt_other.array, axes=([2], [1])), 3, 1
mt = mt.reshape(
mt_self.shape[0] * mt_other.shape[0],
mt_self.shape[-1] * mt_other.shape[-1],
new_mps[i] = mt
elif mp.is_mpo or mp.is_mpdm:
# mpo x mpo
for i, (mt_self, mt_other) in enumerate(zip(self, mp)):
assert mt_self.shape[2] == mt_other.shape[1]
# mt=np.einsum("apqb,cqrd->acprbd",mt_s,mt_o)
mt = xp.moveaxis(
tensordot(mt_self.array, mt_other.array, axes=([2], [1])),
[-3, -2],
[1, 3],
mt = mt.reshape(
mt_self.shape[0] * mt_other.shape[0],
mt_self.shape[-1] * mt_other.shape[-1],
new_mps[i] = mt
assert False
orig_idx = new_mps.qnidx
new_mps.qn = [
add_outer(np.array(qn_o), np.array(qn_m)).reshape(-1, qn_o.shape[1])
for qn_o, qn_m in zip(self.qn, new_mps.qn)
new_mps.qntot += self.qntot
# concerns about whether to canonicalise:
# * canonicalise helps to keep mps in a truly canonicalised state
# * canonicalise comes with a cost. Unnecessary canonicalise (for example in P&C evolution and
# expectation calculation) hampers performance.
if canonicalise:
return new_mps
[docs] def contract(self, mps, algo="svd"):
r""" an approximation of mpo @ mps/mpdm/mpo
mps : `Mps`, `Mpo`, `MpDm`
algo: str, optional
The algorithm to compress mpo @ mps/mpdm/mpo. It could be ``svd``
(default) and ``variational``.
new_mps : `Mps`
an approximation of mpo @ mps/mpdm/mpo. The input ``mps`` is not
See Also
-------- : svd compression. : variational
if algo == "svd":
# mapply->canonicalise->compress
new_mps = self.apply(mps)
elif algo == "variational":
new_mps = mps.variational_compress(self)
assert False
return new_mps
[docs] def try_swap_site(self, new_model: Model, swap_jw: bool, algo="Hopcroft-Karp"):
# in place swapping.
# if swap_jw is set to True, then self.primary_ops is modified in place
diffs = []
for i, (b1, b2) in enumerate(zip(self.model.basis, new_model.basis)):
if b1.dofs != b2.dofs:
if len(diffs) == 0:
logger.debug("MPO: No need to swap")
assert len(diffs) == 2
i, j = min(diffs), max(diffs)
assert j - i == 1
logger.debug(f"MPO: swaping {i} and {j}")
# although usually the `model` of MPO does not store `mpos`
out_ops2, out_ops3, mo1, mo2, qn = \
swap_site(self.symbolic_out_ops_list[i:i+3], self.primary_ops, swap_jw, algo=algo)
self.symbolic_out_ops_list[i+1] = out_ops2
self.symbolic_out_ops_list[i+2] = out_ops3
self.model = new_model
self.qn[i+1] = qn
for impo, mo in zip([i, j], [mo1, mo2]):
self[impo] = symbolic_mo_to_numeric_mo(new_model.basis[impo], mo, self.dtype)
[docs] def conj_trans(self):
new_mpo = self.metacopy()
for i in range(new_mpo.site_num):
new_mpo[i] = moveaxis(self[i], (1, 2), (2, 1)).conj()
new_mpo.qn = [np.array([-i for i in mt_qn]) for mt_qn in new_mpo.qn]
return new_mpo
[docs] def todense(self):
dim =
if 20000 < dim:
raise ValueError("operator too large")
res = np.ones((1, 1, 1, 1))
for mt in self:
dim1 = res.shape[1] * mt.shape[1]
dim2 = res.shape[2] * mt.shape[2]
dim3 = mt.shape[-1]
res = np.tensordot(res, mt.array, axes=1).transpose((0, 1, 3, 2, 4, 5)).reshape(1, dim1, dim2, dim3)
return res[0, :, :, 0]
[docs] def is_hermitian(self):
full = self.todense()
return np.allclose(full.conj().T, full, atol=1e-7)
def __matmul__(self, other):
return self.apply(other)
class StackedMpo:
An effective sparse representation of MPO in the block diagonal form.
When it enters into the optimization, the Hamiltonian is calculated as
the sum of Hamiltonians generated by each MPO, then the Hamiltonian is
diagonalized and the MPS is updated.
optimize_mps(mps, StackedMpo([mpo1, mpo2, ...]))
def __init__(self, mpos: List[Mpo]):
self.mpos = mpos