Source code for renormalizer.mps.mps

# -*- encoding: utf-8 -*-

import logging
from collections import Counter, deque
from functools import wraps, reduce
from typing import Union, List, Dict
import itertools


import scipy
from scipy import stats

from renormalizer.lib import solve_ivp, expm_krylov
from renormalizer.model import Model, Op, OpSum, basis as ba
from renormalizer.mps import svd_qn
from renormalizer.mps.svd_qn import add_outer, get_qn_mask
from renormalizer.mps.backend import backend, np, xp
from renormalizer.mps.lib import (
    Environ,
    select_basis,
    compressed_sum,
    contract_one_site,
    cvec2cmat
)
from renormalizer.mps.matrix import (
    multi_tensor_contract,
    ones,
    tensordot,
    Matrix,
    asnumpy,
    asxp)
from renormalizer.mps.mp import MatrixProduct
from renormalizer.mps.hop_expr import hop_expr
from renormalizer.mps.mpo import Mpo
from renormalizer.utils import (
    OptimizeConfig,
    CompressCriteria,
    EvolveConfig,
    EvolveMethod
)
from renormalizer.utils.utils import calc_vn_entropy, calc_vn_entropy_dm

logger = logging.getLogger(__name__)


def adaptive_tdvp(fun):
    # evolve t/2 (twice) and t to obtain the O(dt^3) error term in 2nd-order Trotter decomposition
    # J. Chem. Phys. 146, 174107 (2017)

    @wraps(fun)
    def adaptive_fun(self: "Mps", mpo, evolve_target_t):

        if not self.evolve_config.adaptive:
            return fun(self, mpo, evolve_target_t)
        config: EvolveConfig = self.evolve_config.copy()
        config.check_valid_dt(evolve_target_t)

        cur_mps = self
        # prevent bug
        del self

        # setup some constant
        p_restart = 0.5  # restart threshold
        p_min = 0.1  # safeguard for minimal allowed p
        p_max = 2.  # safeguard for maximal allowed p

        evolved_t = 0

        while True:

            dt = min_abs(config.guess_dt, evolve_target_t - evolved_t)
            logger.debug(
                    f"guess_dt: {config.guess_dt}, try time step size: {dt}"
            )
            
            mps_half1 = fun(cur_mps, mpo, dt / 2)
            mps_half2 = fun(mps_half1, mpo, dt / 2)
            mps = fun(cur_mps, mpo, dt)
            dis = mps.distance(mps_half2)

            # prevent bug. save "some" memory.
            del mps_half1, mps

            p = (0.75 * config.adaptive_rtol / (dis/mps_half2.mp_norm + 1e-30)) ** (1./3)
            logger.debug(f"distance: {dis}, enlarge p parameter: {p}")
            if p < p_min:
                p = p_min
            if p_max < p:
                p = p_max

            # rejected
            if p < p_restart:
                config.guess_dt = dt * p
                logger.debug(
                    f"evolution not converged, new guess_dt: {config.guess_dt}"
                )
                continue

            # accepted
            evolved_t += dt
            if np.allclose(evolved_t, evolve_target_t):
                # normal exit. Note that `dt` could be much less than actually tolerated for the last step
                # so use `guess_dt` for the last step. Slight inaccuracy won't harm.
                mps_half2.evolve_config.guess_dt = config.guess_dt
                logger.debug(
                    f"evolution converged, new guess_dt: {mps_half2.evolve_config.guess_dt}"
                )
                return mps_half2
            else:
                # in this case `config.guess_dt == dt`
                config.guess_dt *= p
                logger.debug(f"sub-step {dt} further, evolved: {evolved_t}, new guess_dt: {config.guess_dt}")
                cur_mps = mps_half2

    return adaptive_fun


[docs]class Mps(MatrixProduct):
[docs] @classmethod def random(cls, model: Model, qntot, m_max, percent=1.0) -> "Mps": # a high percent makes the result more random # sometimes critical for getting correct optimization result mps = cls() mps.model = model if isinstance(qntot, int): qntot = np.array([qntot]) qn_size = len(qntot) assert qn_size == model.qn_size mps.qn = [np.zeros((1, qn_size), dtype=int)] dim_list = [1] for imps in range(model.nsite - 1): # quantum number qnbig = add_outer(mps.qn[imps], mps._get_sigmaqn(imps)).reshape(-1, qn_size) u_set = [] s_set = [] qnset = [] for iblock in set([tuple(t) for t in qnbig]): if np.all(np.array(qntot) < np.array(iblock)): continue # find the quantum number index indices = [i for i, x in enumerate(qnbig) if tuple(x) == iblock] assert len(indices) != 0 a = np.random.random([len(indices), len(indices)]) - 0.5 a = a + a.T s, u = scipy.linalg.eigh(a=a) u_set.append(svd_qn.blockrecover(indices, u, len(qnbig))) s_set.append(s) qnset += [iblock] * len(indices) u_set = np.concatenate(u_set, axis=1) s_set = np.concatenate(s_set) if isinstance(m_max, (list, tuple, np.ndarray)): m_max2 = m_max[imps + 1] else: m_max2 = m_max mt, mpsdim, mpsqn, nouse = select_basis( u_set, s_set, qnset, u_set, m_max2, percent=percent ) # add the next mpsdim dim_list.append(mpsdim) mps.append(mt.reshape((dim_list[imps], -1, dim_list[imps + 1]))) mps.qn.append(mpsqn) # the last site mps.qn.append(np.zeros((1, qn_size), dtype=int)) dim_list.append(1) last_mt = ( xp.random.random([dim_list[-2], mps.pbond_list[-1], dim_list[-1]]) - 0.5 ) # set elements with wrong qn to zero qnmat = add_outer(mps.qn[-2], model.basis[-1].sigmaqn) qnmask = get_qn_mask(qnmat, qntot) last_mt[~qnmask] = 0 # normalize the mt so that the whole mps is normalized last_mt /= xp.linalg.norm(last_mt.flatten()) mps.append(last_mt) mps.qnidx = len(mps) - 1 mps.to_right = False mps.qntot = np.array(qntot) return mps
[docs] @classmethod def hartree_product_state(cls, model, condition:Dict = None, qn_idx:int = None): r""" Construct a Hartree product state Args: model (:class:`~renormalizer.model.Model`): Model information. condition (Dict): Dict with format ``{dof:local_state}``. The default local state for dofs not contained in the condition dict is the "0" state. An example is ``{"e_1":1, "v_0":2, "v_3":[0, 0.707, 0.707]}``. Note: For basis sets containing multiple DoFs (like :class:`~renormalizer.model.BasisMultiElectron` and :class:`~renormalizer.model.BasisMultiElectronVac`), the key in the condition dict must be ONE OF THE ACTUAL DOF NAMES or ALL OF THE ACTUAL DOF NAMES from the basis set. In both cases, the value specifies the state for ALL DoFs in that basis. For :class:`~renormalizer.model.BasisMultiElectron` with dof_names = ["S0", "S1", "S2"]: - The basis order is [S0, S1, S2] - ``{"S0": 0}`` or ``{"S0": [1, 0, 0]}`` means S0 is occupied - ``{"S1": 1}`` or ``{"S1": [0, 1, 0]}`` means S1 is occupied - ``{"S2": 2}`` or ``{"S2": [0, 0, 1]}`` means S2 is occupied - The key can be ANY of the DoF names: "S0", "S1", or "S2" - they all refer to the same basis - Alternatively, ``{("S0", "S1", "S2"): 1}`` means S1 is occupied. For :class:`~renormalizer.model.BasisMultiElectronVac` with dof_names = ["S0", "S1"]: - The basis order is [vacuum, S0, S1] - ``{"S0": 0}`` or ``{"S0": [1, 0, 0]}`` means vacuum state - ``{"S1": 1}`` or ``{"S1": [0, 1, 0]}`` means S0 is occupied - ``{"S0": 2}`` or ``{"S0": [0, 0, 1]}`` means S1 is occupied qn_idx (int): the site index of the quantum number center. Returns: Constructed mps (:class:`Mps`) """ if condition is None: condition = {} mps = cls() mps.model = model mps.build_empty_mp(model.nsite) qn_size = model.qn_size mps.qn = [np.zeros((1, qn_size), dtype=int)] # check that the condition is not duplicated # each site has at most 1 single key to assign the occupation index = [model.dof_to_siteidx[key] for key in condition.keys()] if len(index) != len(set(index)): raise ValueError("Each site has at most 1 single key to assign the occupation") # replace the dof_name key to site_index key condition = {model.dof_to_siteidx[key]:value for key, value in condition.items()} for isite, local_basis in enumerate(model.basis): pdim = local_basis.nbas ms = np.zeros((1, pdim, 1)) local_state = condition.pop(isite,0) if isinstance(local_state, int): ms[0, local_state, 0] = 1. qn = local_basis.sigmaqn[local_state] else: ms[0, :, 0] = local_state # quantum numbers for all states occupied all_qn = np.array(local_basis.sigmaqn)[np.nonzero(local_state)] if not np.allclose(all_qn.std(axis=0), 0): raise ValueError("Quantum numbers are mixed in the condition.") qn = all_qn[0] mps[isite] = ms mps.qn.append(mps.qn[-1] + qn.reshape(1, qn_size)) if len(condition) != 0: raise ValueError(f"Condition not completely used: {condition}") mps.qntot = mps.qn[-1][0] mps.qnidx = model.nsite if qn_idx is None: qn_idx = model.nsite - 1 mps.move_qnidx(qn_idx) mps.to_right = False return mps
[docs] @classmethod def ground_state(cls, model: Model, max_entangled: bool, normalize:bool=True, condition:Dict=None): r""" Obtain ground state at :math:`T = 0` or :math:`T = \infty` (maximum entangled). Electronic DOFs are always at ground state. and vibrational DOFs depend on ``max_entangled``. For Spin-Boson model the electronic DOF also depends on ``max_entangled``. Parameters ---------- model : :class:`~renormalizer.model.Model` system information. max_entangled : bool temperature of the vibrational DOFs. If set to ``True``, :math:`T = \infty` and if set to ``False``, :math:`T = 0`. normalize: bool, optional if the returned Mps are normalized when ``max_entangled=True``. Default is True. If ``normalize=False``, the vibrational part is identity. condition: dict, optional the same as `hartree_product_state`. only used in ba.BasisMultiElectron cases to define the occupation. Default is ``None``. Returns ------- mps : renormalizer.mps.Mps """ mps = cls() mps.model = model mps.qn = [np.zeros((1, model.qn_size), dtype=int)] * (model.nsite + 1) mps.qnidx = model.nsite - 1 mps.to_right = False mps.qntot = np.zeros(model.qn_size, dtype=int) mps.build_empty_mp(model.nsite) if condition is not None: # check that the condition is not duplicated # each site has at most 1 single key to assign the occupation index = [model.dof_to_siteidx[key] for key in condition.keys()] assert len(index) == len(set(index)) # replace the dof_name key to site_index key condition = {model.dof_to_siteidx[key]:value for key, value in condition.items()} for isite, local_basis in enumerate(model.basis): pdim = local_basis.nbas ms = np.zeros((1, pdim, 1)) if local_basis.is_phonon: if max_entangled: if normalize: ms[0, :, 0] = 1.0 / np.sqrt(pdim) else: ms[0, :, 0] = 1.0 else: ms[0, 0, 0] = 1.0 mps[isite] = ms elif local_basis.is_electron or local_basis.is_spin: if isinstance(local_basis, ba.BasisSimpleElectron): # simple electron site ms[0,0,0] = 1. elif isinstance(local_basis, ba.BasisMultiElectron): assert condition is not None local_state = condition.pop(isite) if isinstance(local_state, int): ms[0, local_state, 0] = 1. qn = local_basis.sigmaqn[local_state] else: ms[0, :, 0] = local_state qn = local_basis.sigmaqn[np.nonzero(local_state)] assert np.allclose(qn, 0) if max_entangled and normalize: ms /= np.linalg.norm(ms) elif isinstance(local_basis, ba.BasisMultiElectronVac): ms[0,0,0] = 1. elif isinstance(local_basis, ba.BasisHalfSpin): if max_entangled: if normalize: ms[0,:,0] = 1. / np.sqrt(2.) else: ms[0,:,0] = 1. else: ms[0,0,0] = 1. else: raise NotImplementedError mps[isite] = ms for ms in mps: assert ms is not None return mps
[docs] @classmethod def load(cls, model: Model, fname: str): npload = np.load(fname, allow_pickle=True) mp = cls() mp.model = model nsites = int(npload["nsites"]) for i in range(nsites): mt = npload[f"mt_{i}"] if np.iscomplexobj(mt): mp.dtype = backend.complex_dtype else: mp.dtype = backend.real_dtype mp.append(mt) version = npload["version"] mp.qn = npload["qn"] mp.qnidx = int(npload["qnidx"]) mp.qntot = npload["qntot"].astype(int) if version == "0.1": mp.to_right = bool(npload["left"]) # in this protocol, TDH and coeff is not dumped logger.warning("Using old dump/load protocol. TD Hartree part will be lost") mp.coeff = 1 elif version == "0.2": mp.to_right = bool(npload["to_right"]) # in this protocol, TDH is dumped, but it's not useful anymore logger.warning("Using old dump/load protocol. TD Hartree part will be lost") mp.coeff = npload["tdh_wfns"][-1] elif version in ["0.3", "0.4"]: mp.to_right = bool(npload["to_right"]) mp.coeff = npload["coeff"].item(0) else: raise ValueError(f"Unknown dump version: {version}") return mp
[docs] @classmethod def from_dense(cls, model, wfn: np.ndarray): # for debugging mp = cls() mp.model = model if np.iscomplexobj(wfn): mp.dtype = backend.complex_dtype else: mp.dtype = backend.real_dtype residual_wfn = wfn.reshape([1] + [b.nbas for b in model.basis] + [1]) for i in range(len(model.basis) - 1): wfn_2d = residual_wfn.reshape(residual_wfn.shape[0] * residual_wfn.shape[1], -1) q, r = np.linalg.qr(wfn_2d) mp.append(q.reshape(residual_wfn.shape[0], residual_wfn.shape[1], q.shape[1])) residual_wfn = r.reshape([r.shape[0]] + list(residual_wfn.shape[2:])) assert residual_wfn.ndim == 3 mp.append(residual_wfn) mp.build_empty_qn() return mp
def __init__(self): super().__init__() self.coeff: Union[float, complex] = 1 self.optimize_config: OptimizeConfig = OptimizeConfig() self.evolve_config: EvolveConfig = EvolveConfig()
[docs] def conj(self) -> "Mps": new_mps = super().conj() new_mps.coeff = new_mps.coeff.conjugate() return new_mps
[docs] def to_complex(self, inplace=False) -> "Mps": new_mp = super(Mps, self).to_complex(inplace=inplace) new_mp.coeff = complex(new_mp.coeff) return new_mp
def _get_sigmaqn(self, idx): return self.model.basis[idx].sigmaqn @property def is_mps(self): return True @property def is_mpo(self): return False @property def is_mpdm(self): return False @property def nexciton(self): return self.qntot @property def norm(self): '''the norm of the total wavefunction ''' return np.linalg.norm(self.coeff) * self.mp_norm def _expectation_path(self): # S--a--S--e--S # | | | # | d | # | | | # O--b--O--g--O # | | | # | f | # | | | # S--c--S--h--S path = [ ([0, 1], "abc, cfh -> abfh"), ([3, 0], "abfh, bdfg -> ahdg"), ([2, 0], "ahdg, ade -> hge"), ([1, 0], "hge, egh -> "), ] return path def _expectation_conj(self): return self.conj()
[docs] def expectation(self, mpo: Union[Mpo, Op, OpSum], self_conj: "Mps"=None) -> Union[float, complex]: r""" Calculate the expectation value of the operator ``mpo`` with respect to the MPS. For the expectation value :math:`\langle \psi | \hat O | \psi \rangle`, ``self`` is :math:`|\psi\rangle` and ``mpo`` is :math:`\hat O`. For the transition amplitude :math:`\langle \phi | \hat O | \psi \rangle`, ``self_conj`` should be set to :math:`|\phi\rangle`. Parameters ---------- mpo : Union[MPO, Op, OpSum] The operator. Can be provided as: - MPO: Matrix Product Operator - Op: Single symbolic operator - OpSum: Sum of symbolic operators self_conj : :class:`~renormalizer.mps.Mps`, optional The MPS for the bra vector. If not provided, uses the conjugate of ``self``. Returns ------- expectation: Union[float, complex] The expectation value or transition amplitude. Returns a float if the imaginary part is negligible. Examples -------- >>> from renormalizer import Mps, Mpo, Model, Op, BasisHalfSpin >>> model = Model([BasisHalfSpin(0)], []) >>> mps = Mps.hartree_product_state(model, condition={}) >>> mpo = Mpo(model, Op("X", 0)) >>> mps.expectation(mpo) 0.0 >>> mps.expectation(Op("X", 0)) # direct Op input 0.0 >>> mps2 = Mps.hartree_product_state(model, condition={0: [0,1]}) >>> mps.expectation(mpo, self_conj=mps2) 1.0 """ # Convert Op/OpSum to MPO if needed if isinstance(mpo, (Op, OpSum)): mpo = Mpo(self.model, mpo) if self_conj is None: self_conj = self._expectation_conj() environ = Environ(self, mpo, "R", mps_conj=self_conj) l = xp.ones((1, 1, 1), dtype=self.dtype) r = environ.read("R", 1) path = self._expectation_path() val = multi_tensor_contract(path, l, self[0], mpo[0], self_conj[0], r) if np.isclose(float(val.imag), 0): return float(val.real) else: return complex(val)
# This is time and memory consuming # return self_conj.dot(mpo.apply(self)).real
[docs] def expectations(self, mpos: Union[List[Union[Mpo, Op, OpSum]]], self_conj:"Mps"=None, opt:bool=True) -> np.ndarray: new_mpos = [] for mpo in mpos: # Convert Op/OpSum to MPO if needed if isinstance(mpo, (Op, OpSum)): mpo = Mpo(self.model, mpo) new_mpos.append(mpo) mpos = new_mpos if not opt: # the naive way, slow and time consuming. Yet predictable and reliable return np.array([self.expectation(mpo, self_conj) for mpo in mpos]) # optimized way, cache for intermediates # hash is used as indices of the matrices. # The chance for collision (the same hash for two different matrices) is # about 1-0.99999999999997 in 1000 matrices. # In which case a RuntimeError is raised and rerun the job should solve the problem hash_to_obj = dict() mpos_hash: List[List] = [] for mpo in mpos: mpo_hash = [] for m in mpo: m_hash = hash(m) if m_hash not in hash_to_obj: hash_to_obj[m_hash] = m else: if not np.allclose(hash_to_obj[m_hash], m.array): raise RuntimeError("Rare hash collision") mpo_hash.append(m_hash) mpos_hash.append(mpo_hash) if self_conj is None: self_conj = self._expectation_conj() l_environ_dict = _construct_freq_environ(mpos_hash, hash_to_obj, self, "L", self_conj) r_environ_dict = _construct_freq_environ(mpos_hash, hash_to_obj, self, "R", self_conj) results = [] for mpo in mpos: l_environ, l_idx = _get_freq_environ(l_environ_dict, mpo, "L", np.inf) r_environ, r_idx = _get_freq_environ(r_environ_dict, mpo, "R", len(mpo)-l_idx-1) for i in range(l_idx+1, r_idx): l_environ = contract_one_site(l_environ, self[i], mpo[i], "L", self_conj[i]) results.append(complex(l_environ.flatten() @ r_environ.flatten())) # cast to python type results = np.array(results) if np.allclose(results.imag, 0): return results.real else: return results
@property def ph_occupations(self): r""" phonon occupations :math:`b^\dagger_i b_i` for each electronic DoF. The order is defined by :attr:`~renormalizer.model.model.v_dofs`. """ key = "ph_occupations" # ph_occupations is actually the occupation of the basis if key not in self.model.mpos: mpos = [] for dof in self.model.v_dofs: mpos.append(Mpo(self.model, Op("n", dof))) self.model.mpos[key] = mpos else: mpos = self.model.mpos[key] return self.expectations(mpos) @property def e_occupations(self): r""" Electronic occupations :math:`a^\dagger_i a_i` for each electronic DoF. The order is defined by :attr:`~renormalizer.model.model.e_dofs`. """ key = "e_occupations" if key not in self.model.mpos: mpos = [] for dof in self.model.e_dofs: mpos.append(Mpo(self.model, Op(r"a^\dagger a", dof))) self.model.mpos[key] = mpos else: mpos = self.model.mpos[key] return self.expectations(mpos)
[docs] def metacopy(self) -> "Mps": new: Mps = super().metacopy() new.coeff = self.coeff new.optimize_config = self.optimize_config.copy() # evolve_config has its own data new.evolve_config = self.evolve_config.copy() return new
[docs] def normalize(self, kind): r''' normalize the wavefunction Parameters ---------- kind: str "mps_only": the mps part is normalized and coeff is not modified; "mps_norm_to_coeff": the mps part is normalized and the norm is multiplied to coeff; "mps_and_coeff": both mps and coeff is normalized Returns ------- ``self`` is overwritten. ''' return normalize(self, kind)
[docs] def expand_bond_dimension(self, hint_mpo=None, coef=1e-10, include_ex=True): """ expand bond dimension as required in compress_config """ return expand_bond_dimension(self, hint_mpo, coef, include_ex)
[docs] def evolve(self, mpo, evolve_dt, normalize=True) -> "Mps": method = { EvolveMethod.prop_and_compress: self._evolve_prop_and_compress, EvolveMethod.prop_and_compress_tdrk4: self._evolve_prop_and_compress_tdrk4, EvolveMethod.prop_and_compress_tdrk: self._evolve_prop_and_compress_tdrk, EvolveMethod.tdvp_mu_vmf: self._evolve_tdvp_mu_vmf, EvolveMethod.tdvp_vmf: self._evolve_tdvp_mu_vmf, EvolveMethod.tdvp_mu_cmf: self._evolve_tdvp_mu_cmf, EvolveMethod.tdvp_ps: self._evolve_tdvp_ps, EvolveMethod.tdvp_ps2: self._evolve_tdvp_ps2 }[self.evolve_config.method] new_mps = method(mpo, evolve_dt) if normalize: if np.iscomplex(evolve_dt): new_mps.normalize("mps_and_coeff") else: new_mps.normalize("mps_only") return new_mps
def _evolve_prop_and_compress_tdrk4(self, mpo, evolve_dt) -> "Mps": """ classical 4th order Runge-Kutta solver for time-dependent Hamiltonian """ if isinstance(mpo, Mpo): def mpo_t(t, *args, **kwargs): return mpo elif callable(mpo): # mpo can be a function of time, the range is 0 -> evolve_dt mpo_t = mpo else: raise TypeError(f"unsupported mpo type: {mpo}") k1 = mpo_t(0).contract(self).scale(-1j) logger.debug(f"k1:{k1}") tmp_mps = self + k1.scale(0.5*evolve_dt) tmp_mps.canonicalise().compress() k2 = mpo_t(0.5*evolve_dt).contract(tmp_mps).scale(-1j) logger.debug(f"k2:{k2}") tmp_mps = self + k2.scale(0.5*evolve_dt) tmp_mps.canonicalise().compress() k3 = mpo_t(0.5*evolve_dt).contract(tmp_mps).scale(-1j) logger.debug(f"k3:{k3}") tmp_mps = self + k3.scale(evolve_dt) tmp_mps.canonicalise().compress() k4 = mpo_t(evolve_dt).contract(tmp_mps).scale(-1j) logger.debug(f"k4:{k4}") new_mps = compressed_sum([self, k1.scale(1/6*evolve_dt), k2.scale(2/6*evolve_dt), k3.scale(2/6*evolve_dt), k4.scale(1/6*evolve_dt)]) logger.info(f"new_mps:{new_mps}") return new_mps def _evolve_prop_and_compress_tdrk(self, mpo, evolve_dt) -> "Mps": """ The most general Runge-Kutta solver for both time-dependent and time-independnet Hamiltonian and adaptive or unadaptive time-step size evolution """ if isinstance(mpo, Mpo): def mpo_t(t, *args, **kwargs): return mpo elif callable(mpo): mpo_t = mpo else: raise TypeError(f"unsupported mpo type: {mpo}") rk_config = self.evolve_config.rk_config a,b,c = rk_config.tableau def sub_time_step_evolve(y,tau,t0): # error is relative error k_list = [] for istage in range(rk_config.stage): k = compressed_sum([y]+[k_list[i].scale(a[istage,i]*tau) for i in range(istage) if a[istage,i] != 0], batchsize=6) k = mpo_t(c[istage]*tau+t0, mps=k).contract(k).scale(-1j) logger.debug(f"k_{istage}: {k}") k_list.append(k) new_mps = compressed_sum([y] + [k_list[istage].scale(b[0,istage]*tau) \ for istage in range(rk_config.stage) if b[0,istage]!=0], batchsize=6) logger.debug(f"order_{rk_config.order[0]}: {new_mps}") if self.evolve_config.adaptive: assert len(rk_config.order) == 2 assert rk_config.order[0] - rk_config.order[1] == 1 error = reduce(lambda mps1, mps2: mps1.add(mps2), [k_list[istage].scale((b[0,istage]-b[1,istage])*tau) \ for istage in range(rk_config.stage) if not \ np.allclose(b[0,istage],b[1,istage])]) error = error.norm / new_mps.norm else: assert len(rk_config.order) == 1 error = 0 return new_mps, error self.evolve_config.check_valid_dt(evolve_dt) if self.evolve_config.adaptive: p_restart = 0.5 # restart threshold p_min = 0.1 # safeguard for minimal allowed p p_max = 2.0 # safeguard for maximal allowed p evolved_dt = 0 new_mps = self while True: dt = min_abs(new_mps.evolve_config.guess_dt, evolve_dt-evolved_dt) logger.debug(f"guess_dt: {new_mps.evolve_config.guess_dt}, try time step size: {dt}") new_mps, error = sub_time_step_evolve(new_mps, dt, evolved_dt) p = (new_mps.evolve_config.adaptive_rtol / (error + 1e-30)) ** (1/rk_config.order[0]) logger.debug(f"RKsolver:{rk_config.method} relative error: {error}, enlarge p parameter: {p}") if p < p_restart: # not accurate, will restart new_mps.evolve_config.guess_dt = dt * max(p_min, p) logger.debug( f"evolution not converged, new guess_dt: {new_mps.evolve_config.guess_dt}" ) else: if xp.allclose(dt+evolved_dt, evolve_dt): new_mps.evolve_config.guess_dt = min_abs( dt * p, new_mps.evolve_config.guess_dt ) # normal exit logger.debug( f"evolution converged, new guess_dt: {new_mps.evolve_config.guess_dt}" ) break else: new_mps.evolve_config.guess_dt *= min(p, p_max) evolved_dt += dt logger.debug( f"evolution converged, new guess_dt: {new_mps.evolve_config.guess_dt}" ) logger.debug(f"sub-step {dt} further, remaining: {evolve_dt-evolved_dt}") else: new_mps, _ = sub_time_step_evolve(self, evolve_dt, 0) return new_mps def _evolve_prop_and_compress(self, mpo, evolve_dt) -> "Mps": """ The global propagation & compression evolution scheme only for time-independent Hamiltonian Taylor expansion approximation to the formal propagator """ config = self.evolve_config assert evolve_dt is not None propagation_c = config.taylor_config.coeff order = len(propagation_c)-1 termlist = [self] # don't let bond dim grow when contracting orig_compress_config = self.compress_config contract_compress_config = self.compress_config.copy() if contract_compress_config.criteria is CompressCriteria.threshold: contract_compress_config.criteria = CompressCriteria.both # contract_compress_config.max_dims = np.array(self.bond_dims) + 4 self.compress_config = contract_compress_config while len(termlist) < len(propagation_c): termlist.append(mpo.contract(termlist[-1])) # bond dim can grow after adding for t in termlist: t.compress_config = orig_compress_config if config.adaptive: config.check_valid_dt(evolve_dt) p_restart = 0.5 # restart threshold p_min = 0.1 # safeguard for minimal allowed p p_max = 2.0 # safeguard for maximal allowed p while True: scaled_termlist = [] dt = min_abs(config.guess_dt, evolve_dt) logger.debug(f"guess_dt: {config.guess_dt}, try time step size: {dt}") for idx, term in enumerate(termlist): scale = (-1.0j * dt) ** idx * propagation_c[idx] scaled_termlist.append(term.scale(scale)) del term new_mps1 = compressed_sum(scaled_termlist[:-1]) new_mps2 = compressed_sum( [new_mps1, scaled_termlist[-1]] ) dis = new_mps1.distance(new_mps2) p = (config.adaptive_rtol / (dis/new_mps2.mp_norm + 1e-30)) ** (1/order) logger.debug(f"RK45 error distance: {dis}, enlarge p parameter: {p}") if xp.allclose(dt, evolve_dt): # approahes the end if p < p_restart: # not accurate in this final sub-step will restart config.guess_dt = dt * max(p_min, p) logger.debug( f"evolution not converged, new guess_dt: {config.guess_dt}" ) else: # normal exit new_mps2.evolve_config.guess_dt = min_abs( dt * p, config.guess_dt ) logger.debug( f"evolution converged, new guess_dt: {new_mps2.evolve_config.guess_dt}" ) return new_mps2 else: # sub-steps if p < p_restart: config.guess_dt *= max(p_min, p) logger.debug( f"evolution not converged, new guess_dt: {config.guess_dt}" ) else: new_dt = evolve_dt - dt config.guess_dt *= min(p, p_max) new_mps2.evolve_config.guess_dt = config.guess_dt # memory consuming and not useful anymore del new_mps1, termlist, scaled_termlist logger.debug( f"evolution converged, new guess_dt: {config.guess_dt}" ) logger.debug(f"sub-step {dt} further, remaining: {new_dt}") return new_mps2._evolve_prop_and_compress(mpo, new_dt) else: for idx, term in enumerate(termlist): term.scale( (-1.0j * evolve_dt) ** idx * propagation_c[idx], inplace=True ) return compressed_sum(termlist) def _evolve_tdvp_mu_vmf(self, mpo, evolve_dt) -> "Mps": """ variable mean field see the difference between VMF and CMF, refer to Z. Phys. D 42, 113–129 (1997) the matrix unfolding algorithm, see arXiv:1907.12044 only the RKF45 integration is used. The default RKF45 local step error tolerance is rtol:1e-5, atol:1e-8 regulation of S is 1e-10, these default parameters could be changed in /utils/configs.py """ if isinstance(mpo, Mpo): def mpo_t(t, *args, **kwargs): return mpo elif callable(mpo): mpo_t = mpo else: raise TypeError(f"unsupported mpo type: {mpo}") # a workaround for https://github.com/scipy/scipy/issues/10164 imag_time = np.iscomplex(evolve_dt) if imag_time: evolve_dt = -evolve_dt.imag # used in calculating derivatives coef = -1 else: coef = 1j # only not canonicalise when force_ovlp=True and to_right=False if not (self.evolve_config.force_ovlp and not self.to_right): self.ensure_left_canonical() # `self` should not be modified during the evolution if imag_time: mps = self.copy() else: mps = self.to_complex() # the quantum number symmetry is used qn_mask_list = [] position = [0] qntot = mps.qntot for imps in range(mps.site_num): mps.move_qnidx(imps) qnbigl, qnbigr, qnmat = mps._get_big_qn([imps]) qn_mask = get_qn_mask(qnmat, mps.qntot) qn_mask_list.append(qn_mask) position.append(position[-1]+np.sum(qn_mask)) sw_min_list = [] def func_vmf(t,y): sw_min_list.clear() # update mps: from left to right for imps in range(mps.site_num): mps[imps] = cvec2cmat(asnumpy(y[position[imps]:position[imps + 1]]), qn_mask_list[imps]) mpo = mpo_t(t, mps=mps) if self.evolve_config.method == EvolveMethod.tdvp_mu_vmf: environ_mps = mps.copy() elif self.evolve_config.method == EvolveMethod.tdvp_vmf: environ_mps = mps # the first S_R S_R = np.ones([1, 1], dtype=mps.dtype) else: assert False environ = Environ(environ_mps, mpo, "L") if self.evolve_config.force_ovlp: # construct the S_L list (type: Matrix) and S_L_inv list (type: xp.array) # len: mps.site_num+1 S_L_list = [ np.ones([1, 1], dtype=mps.dtype), ] for imps in range(mps.site_num): S_L_list.append( transferMat(mps, None, "L", imps, S_L_list[imps]) ) S_L_inv_list = [] for imps in range(mps.site_num + 1): w, u = scipy.linalg.eigh(S_L_list[imps]) S_L_inv = u.dot(np.diag(1.0 / w)).dot(u.T.conj()) S_L_inv_list.append(S_L_inv) else: S_L_list = [None,] * (mps.site_num + 1) S_L_inv_list = [None,] * (mps.site_num + 1) # calculate hop_y: from right to left hop_y = xp.empty_like(y) for imps in mps.iter_idx_list(full=True): shape = list(mps[imps].shape) ltensor = asxp(environ.read("L", imps - 1)) if imps == self.site_num - 1: # the coefficient site rtensor = xp.ones((1, 1, 1), dtype=mps.dtype) hop =hop_expr(ltensor, rtensor, [asxp(mpo[imps])], shape) S_inv = xp.diag(xp.ones(1,dtype=mps.dtype)) func = integrand_func_factory(shape, hop, True, S_inv, True, coef, ovlp_inv1=S_L_inv_list[imps+1], ovlp_inv0=S_L_inv_list[imps], ovlp0=S_L_list[imps]) hop_y[position[imps]:position[imps+1]] = func(0, mps[imps].array.ravel()).reshape(mps[imps].shape)[qn_mask_list[imps]] continue if self.evolve_config.method == EvolveMethod.tdvp_mu_vmf: # perform qr on the environment mps qnbigl, qnbigr, _ = environ_mps._get_big_qn([imps + 1]) u, s, qnlset, v, s, qnrset = svd_qn.svd_qn( environ_mps[imps + 1].array, qnbigl, qnbigr, environ_mps.qntot, system="R", full_matrices=False) vt = v.T environ_mps[imps + 1] = vt.reshape(environ_mps[imps + 1].shape) rtensor = environ.GetLR( "R", imps + 1, environ_mps, mpo, itensor=None, method="System" ) sw_min_list.append(s.min()) regular_s = _mu_regularize(s, epsilon=self.evolve_config.reg_epsilon) u = asxp(u) us = u.dot(xp.diag(s)) rtensor = xp.tensordot(rtensor, us, axes=(-1, -1)) environ_mps[imps] = xp.tensordot(asxp(environ_mps[imps]), us, axes=(-1, 0)) environ_mps.qn[imps + 1] = qnrset environ_mps.qnidx = imps S_inv = u.conj().dot(xp.diag(1.0 / regular_s)).T elif self.evolve_config.method == EvolveMethod.tdvp_vmf: rtensor = environ.GetLR( "R", imps + 1, environ_mps, mpo, itensor=None, method="System") # regularize density matrix # Note that S_R is (#.conj, #) S_R = transferMat(environ_mps, None, "R", imps + 1, S_R) w, u = scipy.linalg.eigh(asnumpy(S_R)) # discard the negative eigenvalues due to numerical error w = np.where(w>0, w, 0) sw_min_list.append(w.min()) epsilon = self.evolve_config.reg_epsilon w = w + epsilon * np.exp(-w / epsilon) u = asxp(u) # S_inv is (#.conj, #) S_inv = u.dot(xp.diag(1.0 / w)).dot(u.T.conj()).T hop = hop_expr(ltensor, rtensor, [asxp(mpo[imps])], shape) func = integrand_func_factory(shape, hop, False, S_inv, True, coef, ovlp_inv1=S_L_inv_list[imps+1], ovlp_inv0=S_L_inv_list[imps], ovlp0=S_L_list[imps]) hop_y[position[imps]:position[imps+1]] = func(0, asxp(mps[imps].array.ravel())).reshape(mps[imps].shape)[qn_mask_list[imps]] return hop_y init_y = xp.concatenate([asxp(ms.array[qn_mask_list[ims]]) for ims, ms in enumerate(mps)]) # the ivp local error, please refer to the Scipy default setting sol = solve_ivp( func_vmf, (0, evolve_dt), init_y, method="RK45", rtol=self.evolve_config.ivp_rtol, atol=self.evolve_config.ivp_atol, ) # update mps: from left to right for imps in range(mps.site_num): mps[imps] = cvec2cmat(asnumpy(sol.y[:, -1][position[imps]:position[imps + 1]]), qn_mask_list[imps]) logger.info(f"{self.evolve_config.method} VMF func called: {sol.nfev}. RKF steps: {len(sol.t)}") sw_min_list = xp.array(sw_min_list) # auto-switch between tdvp_mu_vmf and tdvp_vmf if self.evolve_config.vmf_auto_switch: if sw_min_list.min() > np.sqrt(self.evolve_config.reg_epsilon*10.) and \ mps.evolve_config.method == EvolveMethod.tdvp_mu_vmf: logger.debug(f"sw.min={sw_min_list.min()}, Switch to tdvp_vmf") mps.evolve_config.method = EvolveMethod.tdvp_vmf elif sw_min_list.min() < self.evolve_config.reg_epsilon and \ mps.evolve_config.method == EvolveMethod.tdvp_vmf: logger.debug(f"sw.min={sw_min_list.min()}, Switch to tdvp_mu_vmf") mps.evolve_config.method = EvolveMethod.tdvp_mu_vmf # The caller do not want to deal with an MPS that is not canonicalised return mps.canonicalise() @adaptive_tdvp def _evolve_tdvp_mu_cmf(self, mpo, evolve_dt) -> "Mps": """ evolution scheme: TDVP + constant mean field + matrix-unfolding regularization MPS : LLLLLLC 1st / 2nd order(default) CMF for 2nd order CMF: L is evolved with midpoint scheme C is evolved with midpoint(default) / trapz scheme """ if self.evolve_config.tdvp_cmf_c_trapz: assert self.evolve_config.tdvp_cmf_midpoint imag_time = np.iscomplex(evolve_dt) # a workaround for https://github.com/scipy/scipy/issues/10164 if imag_time: evolve_dt = -evolve_dt.imag # used in calculating derivatives coef = -1 else: coef = 1j self.ensure_left_canonical() # `self` should not be modified during the evolution # mps: the mps to return # environ_mps: mps to construct environ if imag_time: mps = self.copy() else: mps = self.to_complex() if self.evolve_config.tdvp_cmf_midpoint: # mps at t/2 (1st order) as environment orig_config = self.evolve_config.copy() self.evolve_config.tdvp_cmf_midpoint = False self.evolve_config.tdvp_cmf_c_trapz = False self.evolve_config.adaptive = False environ_mps = self.evolve(mpo, evolve_dt / 2) self.evolve_config = orig_config else: # mps at t=0 as environment environ_mps = mps.copy() if self.evolve_config.tdvp_cmf_c_trapz: loop = 2 mps[-1] = environ_mps[-1].copy() else: loop = 1 while loop > 0: # construct the environment matrix environ = Environ(environ_mps, mpo, "L") # statistics for debug output cmf_rk_steps = [] if self.evolve_config.force_ovlp: # construct the S_L list (type: Matrix) and S_L_inv list (type: xp.array) # len: mps.site_num+1 S_L_list = [np.ones([1, 1], dtype=mps.dtype),] for imps in range(mps.site_num): S_L_list.append(transferMat(environ_mps, None, "L", imps, S_L_list[imps])) S_L_inv_list = [] for imps in range(mps.site_num+1): w, u = scipy.linalg.eigh(S_L_list[imps]) S_L_inv = xp.asarray(u.dot(np.diag(1.0 / w)).dot(u.T.conj())) S_L_inv_list.append(S_L_inv) S_L_list[imps] = S_L_list[imps] else: S_L_list = [None,] * (mps.site_num+1) S_L_inv_list = [None,] * (mps.site_num+1) for imps in mps.iter_idx_list(full=True): shape = list(mps[imps].shape) ltensor = environ.read("L", imps - 1) if imps == self.site_num - 1: if loop == 1: # the coefficient site rtensor = ones((1, 1, 1)) hop = hop_expr(ltensor, rtensor, [mpo[imps]], shape) S_inv = xp.diag(xp.ones(1,dtype=mps.dtype)) def func1(y): func = integrand_func_factory(shape, hop, True, S_inv, True, coef, ovlp_inv1=S_L_inv_list[imps+1], ovlp_inv0=S_L_inv_list[imps], ovlp0=S_L_list[imps]) return func(0, y) if self.evolve_config.ivp_solver == "krylov": ms, Lanczos_vectors = expm_krylov(func1, evolve_dt, mps[imps].ravel().array) logger.debug(f"# of Lanczos_vectors, {Lanczos_vectors}") else: sol = solve_ivp(lambda t, y: func1(y), (0, evolve_dt), mps[imps].ravel().array, method=self.evolve_config.ivp_solver, rtol=self.evolve_config.ivp_rtol, atol=self.evolve_config.ivp_atol, ) ms = sol.y logger.debug(f"# of Hc, {sol.nfev}") mps[imps] = ms.reshape(shape) if loop == 1 and self.evolve_config.tdvp_cmf_c_trapz: break else: continue # perform qr on the environment mps qnbigl, qnbigr, _ = environ_mps._get_big_qn([imps + 1]) u, s, qnlset, v, s, qnrset = svd_qn.svd_qn( environ_mps[imps + 1].array, qnbigl, qnbigr, environ_mps.qntot, system="R", full_matrices=False, ) vt = v.T environ_mps[imps + 1] = vt.reshape(environ_mps[imps + 1].shape) rtensor = environ.GetLR( "R", imps + 1, environ_mps, mpo, itensor=None, method="System" ) regular_s = _mu_regularize(s, epsilon=self.evolve_config.reg_epsilon) us = u.dot(np.diag(s)) rtensor = tensordot(rtensor, us, axes=(-1, -1)) environ_mps[imps] = tensordot(environ_mps[imps], us, axes=(-1, 0)) environ_mps.qn[imps + 1] = qnrset environ_mps.qnidx = imps S_inv = u.conj().dot(np.diag(1.0 / regular_s)).T hop = hop_expr(ltensor, rtensor, [mpo[imps]], shape) func = integrand_func_factory(shape, hop, False, S_inv, True, coef, ovlp_inv1=S_L_inv_list[imps+1], ovlp_inv0=S_L_inv_list[imps], ovlp0=S_L_list[imps]) sol = solve_ivp( func, (0, evolve_dt), mps[imps].ravel().array, method="RK45" ) cmf_rk_steps.append(len(sol.t)) ms = sol.y[:, -1].reshape(shape) mps[imps] = ms if len(cmf_rk_steps) > 0: steps_stat = stats.describe(cmf_rk_steps) logger.debug(f"{self.evolve_config.method} CMF steps: {steps_stat}") if loop == 2: environ_mps = mps evolve_dt /= 2. loop -= 1 # new_mps.evolve_config.stat = steps_stat return mps @adaptive_tdvp def _evolve_tdvp_ps(self, mpo, evolve_dt) -> "Mps": # PhysRevB.94.165116 # TDVP projector splitting # one-site if np.iscomplex(evolve_dt): mps = self.copy() if self.evolve_config.ivp_solver != "krylov": evolve_dt = -evolve_dt.imag # used in calculating derivatives coef = -1 else: mps = self.to_complex() if self.evolve_config.ivp_solver != "krylov": coef = 1j # construct the environment matrix # almost half is not used. Not a big deal. environ = Environ(mps, mpo) # statistics for debug output local_steps = [] # sweep for 2 rounds for i in range(2): for imps in mps.iter_idx_list(full=True): system = "L" if mps.to_right else "R" l_array = environ.read("L", imps - 1) r_array = environ.read("R", imps + 1) shape = list(mps[imps].shape) hop = hop_expr(l_array, r_array, [asxp(mpo[imps].array)], shape) if self.evolve_config.ivp_solver == "krylov": mps_t, j = expm_krylov( lambda y: hop(y.reshape(shape)).ravel(), -1j * evolve_dt / 2, mps[imps].ravel().array ) else: sol = solve_ivp( lambda t, y: hop(y.reshape(shape)).ravel() / coef, (0, evolve_dt/2), mps[imps].ravel().array, method=self.evolve_config.ivp_solver, rtol=self.evolve_config.ivp_rtol, atol=self.evolve_config.ivp_atol, ) mps_t, j = sol.y, sol.nfev local_steps.append(j) mps_t = mps_t.reshape(shape) qnbigl, qnbigr, _ = mps._get_big_qn([imps]) u, qnlset, v, qnrset = svd_qn.svd_qn( asnumpy(mps_t), qnbigl, qnbigr, mps.qntot, QR=True, system=system, full_matrices=False, ) vt = v.T if not mps.to_right and imps != 0: mps[imps] = vt.reshape([-1] + shape[1:]) mps.qn[imps] = qnrset mps.qnidx = imps-1 r_array = environ.GetLR( "R", imps, mps, mpo, itensor=r_array, method="System" ) # reverse update u site shape_u = u.shape hop_u = hop_expr(l_array, r_array, [], shape_u) if self.evolve_config.ivp_solver == "krylov": mps_t, j = expm_krylov( lambda y: hop_u(y.reshape(shape_u)).ravel(), 1j * evolve_dt / 2, u.ravel() ) else: sol = solve_ivp( lambda t, y: hop_u(y.reshape(shape_u)).ravel() / -coef, (0, evolve_dt/2), u.ravel(), method=self.evolve_config.ivp_solver, rtol=self.evolve_config.ivp_rtol, atol=self.evolve_config.ivp_atol, ) mps_t, j = sol.y, sol.nfev local_steps.append(j) mps_t = mps_t.reshape(shape_u) mps[imps - 1] = tensordot(mps[imps - 1].array, mps_t, axes=(-1, 0),) elif mps.to_right and imps != len(mps) - 1: mps[imps] = u.reshape(shape[:-1] + [-1]) mps.qn[imps + 1] = qnlset mps.qnidx = imps+1 l_array = environ.GetLR( "L", imps, mps, mpo, itensor=l_array, method="System" ) # reverse update svt site shape_svt = vt.shape hop_svt = hop_expr(l_array, r_array, [], shape_svt) if self.evolve_config.ivp_solver == "krylov": mps_t, j = expm_krylov( lambda y: hop_svt(y.reshape(shape_svt)).ravel(), 1j * evolve_dt / 2, vt.ravel() ) else: sol = solve_ivp( lambda t, y: hop_svt(y.reshape(shape_svt)).ravel() / -coef, (0, evolve_dt/2), vt.ravel(), method=self.evolve_config.ivp_solver, rtol=self.evolve_config.ivp_rtol, atol=self.evolve_config.ivp_atol, ) mps_t, j = sol.y, sol.nfev local_steps.append(j) mps_t = mps_t.reshape(shape_svt) mps[imps + 1] = tensordot(mps_t, mps[imps + 1].array, axes=(1, 0),) else: mps[imps] = mps_t mps._switch_direction() steps_stat = stats.describe(local_steps) logger.debug(f"TDVP-PS Krylov space: {steps_stat}") mps.evolve_config.stat = steps_stat return mps @adaptive_tdvp def _evolve_tdvp_ps2(self, mpo, evolve_dt) -> "Mps": # PhysRevB.94.165116 # TDVP projector splitting # two-site if np.iscomplex(evolve_dt): mps = self.copy() if self.evolve_config.ivp_solver != "krylov": evolve_dt = -evolve_dt.imag # used in calculating derivatives coef = -1 else: mps = self.to_complex() if self.evolve_config.ivp_solver != "krylov": coef = 1j # construct the environment matrix # almost half is not used. Not a big deal. environ = Environ(mps, mpo) # statistics for debug output local_steps = [] # sweep for 2 rounds for i in range(2): for imps in mps.iter_idx_list(full=False): if mps.to_right: lidx, cidx0, cidx1, ridx = range(imps - 1, imps + 3) # the idx of the next site cidx2 = cidx1 # the idx of the last site last_idx = len(mps) - 2 else: lidx, cidx0, cidx1, ridx = range(imps - 2, imps + 2) cidx2 = cidx0 last_idx = 1 l_array = environ.read("L", lidx) r_array = environ.read("R", ridx) # the two-site matrix state ms2 = tensordot(mps[cidx0], mps[cidx1], axes=1) hop = hop_expr(l_array, r_array, [mpo[cidx0], mpo[cidx1]], ms2.shape) if self.evolve_config.ivp_solver == "krylov": mps_t, j = expm_krylov( lambda y: hop(y.reshape(ms2.shape)).ravel(), -1j * evolve_dt / 2, ms2.ravel() ) else: sol = solve_ivp( lambda t, y: hop(y.reshape(ms2.shape)).ravel() / coef, (0, evolve_dt/2), ms2.ravel(), method=self.evolve_config.ivp_solver, rtol=self.evolve_config.ivp_rtol, atol=self.evolve_config.ivp_atol, ) mps_t, j = sol.y, sol.nfev local_steps.append(j) mps_t = mps_t.reshape(ms2.shape) qnbigl, qnbigr, _ = mps._get_big_qn([cidx0, cidx1]) mps._update_mps(mps_t, [cidx0, cidx1], qnbigl, qnbigr) if mps.compress_config.ofs is not None: mpo.try_swap_site(mps.model, mps.compress_config.ofs_swap_jw) if imps == last_idx: continue if mps.to_right: l_array = environ.GetLR( "L", lidx + 1, mps, mpo, itensor=l_array, method="System" ) else: r_array = environ.GetLR( "R", ridx - 1, mps, mpo, itensor=r_array, method="System" ) # reverse update the next site ms1 = mps[cidx2] hop = hop_expr(l_array, r_array, [mpo[cidx2]], ms1.shape) if self.evolve_config.ivp_solver == "krylov": mps_t, j = expm_krylov( lambda y: hop(y.reshape(ms1.shape)).ravel(), 1j * evolve_dt / 2, ms1.ravel() ) else: sol = solve_ivp( lambda t, y: hop(y.reshape(ms1.shape)).ravel() / -coef, (0, evolve_dt/2), ms1.ravel(), method=self.evolve_config.ivp_solver, rtol=self.evolve_config.ivp_rtol, atol=self.evolve_config.ivp_atol, ) mps_t, j = sol.y, sol.nfev local_steps.append(j) mps_t = mps_t.reshape(ms1.shape) mps[cidx2] = mps_t mps._push_cano(cidx2) mps._switch_direction() steps_stat = stats.describe(local_steps) logger.debug(f"TDVP-PS Krylov space: {steps_stat}") mps.evolve_config.stat = steps_stat #logger.debug(f"current mps: {mps}") return mps
[docs] def evolve_exact(self, h_mpo, evolve_dt, space): MPOprop = Mpo.exact_propagator(self.model, -1j * evolve_dt, space, -h_mpo.offset) new_mps = MPOprop.apply(self, canonicalise=True) self.coeff *= np.exp(-1j * h_mpo.offset * evolve_dt) return new_mps
@property def digest(self): # used for debugging. Mostly for quickly comparing how two MPSs differ. if 10 < self.site_num or self.is_mpdm: return None prod = np.eye(1).reshape(1, 1, 1) for ms in self: prod = np.tensordot(prod, ms, axes=1) prod = prod.reshape((prod.shape[0], -1, prod.shape[-1])) return {"var": prod.var(), "mean": prod.mean(), "ptp": prod.ptp()}
[docs] def todense(self) -> np.array: dim = np.prod(self.pbond_list) if 20000 < dim: raise ValueError("wavefunction too large") res = np.ones((1, 1, 1)) for mt in self: dim1 = res.shape[1] * mt.shape[1] dim2 = mt.shape[-1] res = np.tensordot(res, mt.array, axes=1).reshape(1, dim1, dim2) return res[0, :, 0]
[docs] def calc_1site_rdm(self, idx=None) -> Dict[int, np.ndarray]: r""" Calculate 1-site reduced density matrix :math:`\rho_i = \textrm{Tr}_{j \neq i} | \Psi \rangle \langle \Psi|` Parameters ---------- idx : int, list, tuple, optional site index of 1site_rdm. Default is None, which mean all the rdms are calculated. Returns ------- rdm: Dict :math:`\{0:\rho_0, 1:\rho_1, \cdots\}`. The key is the index of the site. """ identity = Mpo.identity(self.model) environ = Environ(self, identity, "R") if idx is None: idx = list(range(self.site_num)) elif type(idx) is int: idx = [idx] elif (type(idx) is list) or (type(idx) is tuple): idx = list(idx) else: assert False rdm = {} for ims, ms in enumerate(self): ltensor = environ.GetLR( "L", ims-1, self, identity, itensor=None, method="System" ) rtensor = environ.GetLR( "R", ims+1, self, identity, itensor=None, method="Enviro" ) if ims not in idx: continue ltensor = ltensor.reshape(ltensor.shape[0], ltensor.shape[-1]) rtensor = rtensor.reshape(rtensor.shape[0], rtensor.shape[-1]) tensor = tensordot(ltensor, ms.conj(), ([0],[0])) tensor = tensordot(tensor, rtensor, ([-1],[0])) if ms.ndim == 3: tensor = tensordot(tensor, ms, ([0,-1],[0,-1])) else: tensor = tensordot(tensor, ms, ([0,-1,-2],[0,-1,-2])) assert xp.allclose(tensor, tensor.T.conj()) rdm[ims] = asnumpy(tensor) return rdm
[docs] def calc_2site_rdm(self): r""" Calculate 2-site reduced density matrix :math:`\rho_{ij} = \textrm{Tr}_{k \neq i, k \neq j} | \Psi \rangle \langle \Psi |`. Returns ------- rdm: Dict :math:`\{(0,1):\rho_{01}, (0,2):\rho_{02}, \cdots\}`. The key is a tuple of index of the site. """ identity = Mpo.identity(self.model) environ_R = Environ(self, identity, "R") environ_L = Environ(self, identity, "L") L_component = [] R_component = [] rdm = {} # first construct 1-site environment for ims, ms in enumerate(self): ltensor = environ_L.GetLR("L", ims-1, self, identity, itensor=None, method="Enviro") ltensor = ltensor.reshape(ltensor.shape[0], ltensor.shape[-1]) tensor = tensordot(ltensor, ms.conj(), ([0],[0])) if ms.ndim == 3: tensor = tensordot(tensor, ms, ([0],[0])) elif ms.ndim == 4: tensor = tensordot(tensor, ms, ([0,2],[0,2])) L_component.append(tensor.transpose((0,2,1,3))) rtensor = environ_R.GetLR("R", ims+1, self, identity, itensor=None, method="Enviro") rtensor = rtensor.reshape(rtensor.shape[0], rtensor.shape[-1]) tensor = tensordot(ms.conj(), rtensor, ([-1],[0])) if ms.ndim == 3: tensor = tensordot(tensor, ms, ([-1],[-1])) elif ms.ndim == 4: tensor = tensordot(tensor, ms, ([2,-1],[2,-1])) R_component.append(tensor.transpose((0,2,1,3))) # merge two 1-site environment together for ims in range(self.site_num): tensor = L_component[ims] for jms in range(ims+1, self.site_num): if jms != ims+1: kms = jms - 1 tensor = tensordot(tensor, self[kms].conj(), ([2],[0])) if self[kms].ndim == 3: tensor = tensordot(tensor, self[kms], ([2,3],[0,1])) elif self[kms].ndim == 4: tensor = tensordot(tensor, self[kms], ([2,3,4],[0,1,2])) rtensor = R_component[jms] res = tensordot(tensor, rtensor, ([2,3],[0,1])).transpose(0,2,1,3) rdm[(ims, jms)] = asnumpy(res.reshape(res.shape[0]*res.shape[1],-1)) return rdm
[docs] def calc_edof_rdm(self) -> np.ndarray: r"""Calculate the reduced density matrix of electronic DoF :math:`\rho_{ij} = \langle \Psi | a_i^\dagger a_j | \Psi \rangle` .. note:: This function is designed for single-electron system. Fermionic commutation relation is not considered. """ key = "edof_reduced_density_matrix" n_e = self.model.n_edofs e_dofs = self.model.e_dofs if key not in self.model.mpos: mpos = [] for idx, dof1 in enumerate(e_dofs): for dof2 in e_dofs[idx:]: op = Op(r"a^\dagger a", [dof1, dof2]) mpo = Mpo(self.model, terms=op) mpos.append(mpo) self.model.mpos[key] = mpos else: mpos = self.model.mpos[key] expectations = deque(self.expectations(mpos)) reduced_density_matrix = np.zeros((n_e, n_e), dtype=backend.complex_dtype) for idx in range(n_e): for jdx in range(idx, n_e): reduced_density_matrix[idx, jdx] = expectations.popleft() reduced_density_matrix[jdx, idx] = np.conj(reduced_density_matrix[idx, jdx]) return reduced_density_matrix
[docs] def calc_entropy(self, entropy_type): r""" Calculate 1site, 2site, mutual and bond Von Neumann entropy :math:`\textrm{entropy} = -\textrm{Tr}(\rho \ln \rho)` where :math:`\ln` stands for natural logarithm. 1site entropy is the entropy between any site and the other ``(N-1)`` sites. 2site entropy is the entropy between any two sites and the other ``(N-2)`` sites. mutual entropy characterize the entropy between any two sites. bond entropy is the entropy between L-block and R-block. Parameters ---------- entropy_type : str "1site", "2site", "mutual", "bond" Returns ------- entropy : dict, ndarray if entropy_type = "1site" or "2site", a dictionary is returned and the key is the index or the tuple of index of mps sites, else an ndarray is returned. """ if entropy_type in ["1site", "2site"]: if entropy_type == "1site": rdm = self.calc_1site_rdm() else: rdm = self.calc_2site_rdm() entropy = {} for key, dm in rdm.items(): entropy[key] = calc_vn_entropy_dm(dm) elif entropy_type == "mutual": entropy = self.calc_2site_mutual_entropy() elif entropy_type == "bond": entropy = self.calc_bond_entropy() else: raise ValueError(f"unsupported entropy type {entropy_type}") return entropy
[docs] def calc_2site_mutual_entropy(self) -> np.ndarray: r""" Calculate mutual entropy between two sites. Also known as mutual information :math:`m_{ij} = (s_i + s_j - s_{ij})/2` See Chemical Physics 323 (2006) 519–531 Returns ------- mutual_entropy : 2d np.ndarry mutual entropy with shape (nsite, nsite) """ entropy_1site = self.calc_entropy("1site") entropy_2site = self.calc_entropy("2site") nsites = self.site_num mut_entropy = np.zeros((nsites, nsites)) for isite, jsite in itertools.combinations(range(nsites),2): key = (isite, jsite) if (isite, jsite) in entropy_2site.keys() else (jsite, isite) mut_entropy[isite, jsite] = (entropy_1site[isite] + entropy_1site[jsite] - entropy_2site[key]) / 2 mut_entropy += mut_entropy.T return mut_entropy
[docs] def calc_bond_singular_values(self) -> np.ndarray: r""" Calculate the singular values at each bond. Returns ------- Singular values: 2D array """ # Make sure that the bond entropy is from the left to the right and not # destroy the original mps mps = self.copy() mps.ensure_right_canonical() _, s_array = mps.compress(temp_m_trunc=np.inf, ret_s=True) return s_array
[docs] def calc_bond_entropy(self, s_array :np.array=None) -> np.ndarray: r""" Calculate von Neumann entropy at each bond according to :math:`S = -\textrm{Tr}(\rho \ln \rho)` where :math:`\rho` is the reduced density matrix of either block. Parameters ---------- s_array : np.ndarray, optional The singular values array. Calculated by :meth:`Mps.calc_bond_singular_values`. Returns ------- S : 1D array a NumPy array containing the entropy values. """ if s_array is None: s_array = self.calc_bond_singular_values() return np.array([calc_vn_entropy(sigma ** 2) for sigma in s_array])
[docs] def dump(self, fname): super().dump(fname, other_attrs=["coeff"])
def __setitem__(self, key, value): return super().__setitem__(key, value)
[docs] def add(self, other): if not np.allclose(self.coeff, other.coeff): self.scale(self.coeff, inplace=True) other.scale(other.coeff, inplace=True) self.coeff = 1 other.coeff = 1 return super().add(other)
[docs] def distance(self, other) -> float: if not np.allclose(self.coeff, other.coeff): self.scale(self.coeff, inplace=True) other.scale(other.coeff, inplace=True) self.coeff = 1 other.coeff = 1 return super().distance(other)
def projector( ms: xp.ndarray, left: bool, Ovlp_inv1: xp.ndarray = None, Ovlp0: xp.ndarray = None ) -> xp.ndarray: if left: axes = (-1, -1) else: axes = (0, 0) if Ovlp_inv1 is None: proj = xp.tensordot(ms, ms.conj(), axes=axes) else: # consider the case that the canonical condition is not fulfilled if left: proj = xp.tensordot(Ovlp0, ms, axes=(-1, 0)) proj = xp.tensordot(proj, Ovlp_inv1, axes=(-1, 0)) proj = xp.tensordot(proj, ms.conj(), axes=(-1, -1)) else: proj = xp.tensordot(ms, Ovlp0, axes=(-1, 0)) proj = xp.tensordot(Ovlp_inv1, proj, axes=(-1, 0)) proj = xp.tensordot(proj, ms.conj(), axes=(0, 0)) if left: sz = int(np.prod(ms.shape[:-1])) else: sz = int(np.prod(ms.shape[1:])) Iden = xp.array(xp.diag(xp.ones(sz)), dtype=backend.real_dtype).reshape(proj.shape) proj = Iden - proj return proj def integrand_func_factory( shape, hop, islast, S_inv: Union[np.ndarray, xp.ndarray], left: bool, coef: complex, ovlp_inv1: Union[xp.ndarray, np.ndarray] = None, ovlp_inv0: Union[xp.ndarray, np.ndarray] = None, ovlp0: Union[xp.ndarray, np.ndarray] = None, ): S_inv, ovlp_inv1, ovlp_inv0, ovlp0 = map(asxp, [S_inv, ovlp_inv1, ovlp_inv0, ovlp0]) # left == True: projector operate on the left side of the HC # Ovlp0 is (#.conj, #), Ovlp_inv0 = (#, #.conj), Ovlp_inv1 = (#, #.conj) # S_inv is (#.conj, #) def func(t, y): y0 = asxp(y.reshape(shape)) HC = hop(y0) if not islast: proj = projector(y0, left, ovlp_inv1, ovlp0) if y0.ndim == 3: if left: HC = tensordot(proj, HC, axes=([2, 3], [0, 1])) else: HC = tensordot(HC, proj, axes=([1, 2], [2, 3])) elif y0.ndim == 4: if left: HC = tensordot(proj, HC, axes=([3, 4, 5], [0, 1, 2])) else: HC = tensordot(HC, proj, axes=([1, 2, 3], [3, 4, 5])) if left: if ovlp_inv0 is not None: HC = tensordot(ovlp_inv0, HC, axes=(-1, 0)) return tensordot(HC, S_inv, axes=(-1, 0)).ravel() / coef else: if ovlp_inv0 is not None: HC = tensordot(HC, ovlp_inv0, axes=(-1, -1)) return tensordot(S_inv, HC, axes=(0, 0)).ravel() / coef return func def transferMat(mps, mpsconj, domain, imps, val) -> np.ndarray: """ calculate the transfer matrix from the left hand or the right hand """ if mpsconj is not None: ms, ms_conj = mps[imps].array, mpsconj[imps].array else: ms = mps[imps].array ms_conj = ms.conj() if mps[0].ndim == 3: if domain == "R": val = tensordot(ms_conj, val, axes=(2, 0)) val = tensordot(val, ms, axes=([1, 2], [1, 2])) elif domain == "L": val = tensordot(ms_conj, val, axes=(0, 0)) val = tensordot(val, ms, axes=([0, 2], [1, 0])) else: assert False elif mps[0].ndim == 4: if domain == "R": val = tensordot(ms_conj, val, axes=(3, 0)) val = tensordot(val, ms, axes=([1, 2, 3], [1, 2, 3])) elif domain == "L": val = tensordot(ms_conj, val, axes=(0, 0)) val = tensordot(val, ms, axes=([0, 3, 1], [1, 0, 2])) else: assert False else: raise ValueError(f"the dim of local mps is not correct: {mps[0].ndim}") return asnumpy(val) def _mu_regularize(s, epsilon=1e-10): """ regularization of the singular value of the reduced density matrix """ epsilon = np.sqrt(epsilon) return s + epsilon * np.exp(-s / epsilon) def expand_bond_dimension(mps, hint_mpo=None, coef=1e-10, include_ex=True): """ expand bond dimension as required in compress_config """ if hint_mpo is not None and include_ex: # fill states related to `hint_mpo` logger.debug( f"average bond dimension of hint mpo: {hint_mpo.bond_dims_mean}" ) # in case of localized `self` if mps.is_mps: ex_state: MatrixProduct = mps.ground_state(mps.model, False) # for self.qntot >= 1 assert mps.model.qn_size == 1 # otherwise not supported for i in range(mps.qntot[0]): ex_state = Mpo.onsite(mps.model, r"a^\dagger") @ ex_state elif mps.is_mpdm: assert mps.qntot == 1 ex_state: MatrixProduct = mps.max_entangled_ex(mps.model) else: assert False ex_state.compress_config = mps.compress_config ex_state.move_qnidx(mps.qnidx) ex_state.to_right = mps.to_right else: ex_state = None return expand_bond_dimension_general(mps, hint_mpo, coef, ex_state) def expand_bond_dimension_general(mps, hint_mpo=None, coef=1e-10, ex_mps=None): """ expand bond dimension as required in compress_config. works for both mps and ttns """ if hasattr(mps, "model"): # MPS random_first_arg = mps.model else: # TTNS random_first_arg = mps.basis mps.compress_config.set_bonddim(len(mps.bond_dims)) # expander m target m_target: np.ndarray = np.minimum(np.array(mps.compress_config.max_dims) - np.array(mps.bond_dims), mps.bond_dims_exact) m_target = np.array(m_target, dtype=int) # original dtype is float logger.debug(f"target for expander: {m_target.tolist()}") if hint_mpo is None: expander = mps.__class__.random(random_first_arg, mps.qntot, m_target) else: # fill states related to `hint_mpo` logger.debug(f"bond dimension of hint mpo: {hint_mpo.bond_dims}") logger.debug(f"Mean value: {hint_mpo.bond_dims_mean}") if ex_mps is None: lastone = mps else: # in case of localized `self` lastone = mps + ex_mps expander_list = [] expander_dims = np.zeros_like(m_target) while True: lastone = (hint_mpo @ lastone).normalize("mps_and_coeff") # more bond dimension for `lastone` for quick increase lastone = lastone.canonicalise().compress(np.max(m_target)) logger.debug(f"lastone bond dimension: {lastone.bond_dims}") expander_list.append(lastone) expander = compressed_sum(expander_list, temp_m_trunc=m_target) logger.debug(f"expander bond dimension: {expander.bond_dims}") if np.all(expander.bond_dims >= m_target): break if np.all(expander.bond_dims == expander_dims): logger.warning("Expander does not increase anymore. The expand target is too high") m_target2 = np.max(m_target - np.array(expander_dims)) expander2 = (hint_mpo @ lastone).canonicalise().compress(np.maximum(m_target2, 1)) expander = expander + expander2 break expander_dims = expander.bond_dims temp_m_trunc = int(np.max(m_target) / np.max(hint_mpo.bond_dims)) + 1 lastone = lastone.canonicalise().compress(temp_m_trunc) logger.debug(f"lastone bond dimension after compression: {lastone.bond_dims}") return ((mps + expander.scale(coef * mps.norm, inplace=True)).canonicalise(). compress(mps.compress_config.max_dims).normalize("mps_norm_to_coeff")) def normalize(tn, kind): r''' normalize the wavefunction Parameters ---------- kind: str "mps_only": the mps part is normalized and coeff is not modified; "mps_norm_to_coeff": the mps part is normalized and the norm is multiplied to coeff; "mps_and_coeff": both mps and coeff is normalized Returns ------- ``self`` is overwritten. ''' if hasattr(tn, "mp_norm"): tn_norm = tn.mp_norm elif hasattr(tn, "ttns_norm"): tn_norm = tn.ttns_norm else: raise ValueError(f"{type(tn)} does not have norm attribute") if kind in ["mps_only", "ttns_only"]: new_coeff = tn.coeff elif kind in ["mps_and_coeff", "ttns_and_coeff"]: new_coeff = tn.coeff / np.linalg.norm(tn.coeff) elif kind in ["mps_norm_to_coeff", "ttns_norm_to_coeff"]: new_coeff = tn.coeff * tn_norm else: raise ValueError(f"kind={kind} is not valid.") tn.scale(1.0 / tn_norm, inplace=True) tn.coeff = new_coeff return tn class BraKetPair: def __init__(self, bra_mps, ket_mps, mpo=None): self.bra_mps = bra_mps self.ket_mps = ket_mps self.mpo = mpo self.ft = self.calc_ft() def calc_ft(self): if self.mpo is None: dot = self.bra_mps.conj().dot(self.ket_mps) else: dot = self.ket_mps.expectation(self.mpo, self.bra_mps.conj()) return complex( dot * np.conjugate(self.bra_mps.coeff) * self.ket_mps.coeff ) def __str__(self): if np.iscomplexobj(self.ft): # if negative, sign is included in the imag part sign = "+" if 0 <= self.ft.imag else "" ft_str = "%g%s%gj" % (self.ft.real, sign, self.ft.imag) else: ft_str = "%g" % self.ft return "bra: %s, ket: %s, ft: %s" % (self.bra_mps, self.ket_mps, ft_str) def __iter__(self): return iter((self.bra_mps, self.ket_mps)) def min_abs(t1, t2): # t1, t2 could be int, float, complex # return the number with smaller norm assert xp.iscomplex(t1) == xp.iscomplex(t2) if xp.absolute(t1) < xp.absolute(t2): return t1 else: return t2 def _construct_freq_environ(mpos_hash: List[List[int]], hash_to_obj: Dict[int, Matrix], mps: Mps, domain: str, mps_conj): """ Construct environment tensors that are most frequently shown in the group of MPOs """ assert domain in ["L", "R"] # count mpo sequence frequency counter = Counter() for mpo_hash in mpos_hash: for i in range(1, len(mpo_hash)+1): if domain == "L": mpo_seq = mpo_hash[:i] else: mpo_seq = reversed(mpo_hash[-i:]) counter.update([tuple(mpo_seq)]) # transform the counter into a list of matrices. # The most frequent sequences first. If the same freq, then shorter sequences first # Note that shorter sequences are not less frequent than longer sequences most_common = list(counter.items()) most_common.sort(key=lambda x: (-x[1], len(x[0]))) matrices_list = [] hash_list = [] for hashes, n in most_common: # discard unique ones because they do not need to be cached if n == 1: break # cache ``len(mps)`` sequences # sequences with the same length may be treated differently. if len(mps) < len(matrices_list): break hash_list.append(hashes) matrices_list.append(list(map(hash_to_obj.get, hashes))) # contract the tensors result = {(): xp.ones((1, 1, 1), dtype=backend.real_dtype)} for m_hashes, matrices in zip(hash_list, matrices_list): environ = result[tuple(m_hashes[:-1])] if domain == "L": idx = len(matrices)-1 else: idx = -len(matrices) ms, ms_conj = mps[idx], mps_conj[idx] result[tuple(m_hashes)] = contract_one_site(environ, ms, matrices[-1], domain=domain, ms_conj=ms_conj) return result def _get_freq_environ(environ_dict, mpo, domain, max_length): assert domain in ["L", "R"] if domain == "L": it = mpo else: it = reversed(mpo) hashes = [] for mo in it: hashes.append(hash(mo)) if (not tuple(hashes) in environ_dict) or (max_length < len(hashes)): hashes.pop() break if domain == "L": i = len(hashes) - 1 else: i = len(mpo) - len(hashes) environ = environ_dict[tuple(hashes)] return environ, i