Source code for renormalizer.mps.tda

# -*- coding: utf-8 -*-

import logging
from collections import defaultdict

import numpy as np
import scipy
import opt_einsum as oe

from renormalizer.mps.matrix import tensordot, multi_tensor_contract, asnumpy, asxp
from renormalizer.mps.backend import xp, USE_GPU, primme, IMPORT_PRIMME_EXCEPTION
from renormalizer.mps import Mps
from renormalizer.mps.lib import Environ, compressed_sum
from renormalizer.lib import davidson

logger = logging.getLogger(__name__)

[docs]class TDA(object): r""" Tamm–Dancoff approximation (or called CIS) to calculate the excited states based on MPS. TDA use the first order tangent space to do excitation. The implementation is similar to J. Chem. Phys. 140, 024108 (2014). Parameters ---------- model: renormalizer.model.model Model of the system hmpo: renormalizer.mps.Mpo mpo of Hamiltonian mps: renormalizer.mps.Mps ground state mps (will be overwritten) nroots: int, optional number of roots to be calculated. Default is ``1``. algo: str, optional iterative diagonalization solver. Default is ``"primme"`` if ``primme`` is installed. Valid options are ``davidson`` and ``primme``. Note ---- Quantum number is not used, thus the conservation is not guaranteed. """ def __init__(self, model, hmpo, mps, nroots=1, algo=None): self.model = model self.hmpo = hmpo self.mps = mps # mps will be overwritten inplace self.nroots = nroots if algo is None: if primme is not None: self.algo = "primme" else: self.algo = "davidson" else: self.algo = algo self.e = None # wavefunction: [mps_l_cano, mps_r_cano, tangent_u, tda_coeff_list] self.wfn = None self.configs = defaultdict(list)
[docs] def kernel(self, restart=False, include_psi0=False): r"""calculate the roots Parameters ---------- restart: bool, optional if restart from the former converged root. Default is ``False``. If ``restart = True``, ``include_psi0`` must be the same as the former calculation. include_psi0: bool, optional if the basis of Hamiltonian includes the ground state :math:`\Psi_0`. Default is ``False``. Returns ------- e: np.ndarray the energy of the states, if ``include_psi0 = True``, the first element is the ground state energy, otherwise, it is the energy of the first excited state. """ # right canonical mps mpo = self.hmpo nroots = self.nroots algo = self.algo site_num = mpo.site_num if not restart: # make sure that M is not redundant near the edge mps = self.mps.ensure_right_canonical().canonicalise().normalize("mps_and_coeff").canonicalise() logger.debug(f"reference mps shape, {mps}") mps_r_cano = mps.copy() assert mps.to_right tangent_u = [] for ims in range(len(mps)): shape = list(mps[ims].shape) u, s, vt = scipy.linalg.svd(mps[ims].l_combine(), full_matrices=True) rank = len(s) if include_psi0 and ims == site_num-1: tangent_u.append(u.reshape(shape[:-1]+[-1])) else: if rank < u.shape[1]: tangent_u.append(u[:,rank:].reshape(shape[:-1]+[-1])) else: tangent_u.append(None) # the tangent space is None mps[ims] = u[:,:rank].reshape(shape[:-1]+[-1]) vt = xp.einsum("i, ij -> ij", asxp(s), asxp(vt)) if ims == site_num-1: assert vt.size == 1 and xp.allclose(vt, 1) else: mps[ims+1] = asnumpy(tensordot(vt, mps[ims+1], ([-1],[0]))) mps_l_cano = mps.copy() mps_l_cano.to_right = False mps_l_cano.qnidx = site_num-1 else: mps_l_cano, mps_r_cano, tangent_u, tda_coeff_list = self.wfn cguess = [] for iroot in range(len(tda_coeff_list)): tda_coeff = tda_coeff_list[iroot] x = [c.flatten() for c in tda_coeff if c is not None] x = np.concatenate(x,axis=None) cguess.append(x) cguess = np.stack(cguess, axis=1) xshape = [] xsize = 0 for ims in range(site_num): if tangent_u[ims] is None: xshape.append((0,0)) else: if ims == site_num-1: xshape.append((tangent_u[ims].shape[-1], 1)) else: xshape.append((tangent_u[ims].shape[-1], mps_r_cano[ims+1].shape[0])) xsize += np.prod(xshape[-1]) logger.debug(f"DMRG-TDA H dimension: {xsize}") if USE_GPU: oe_backend = "cupy" else: oe_backend = "numpy" mps_tangent = mps_r_cano.copy() environ = Environ(mps_tangent, mpo, "R") hdiag = [] for ims in range(site_num): ltensor = environ.GetLR( "L", ims-1, mps_tangent, mpo, itensor=None, method="System" ) rtensor = environ.GetLR( "R", ims+1, mps_tangent, mpo, itensor=None, method="Enviro" ) if tangent_u[ims] is not None: u = asxp(tangent_u[ims]) tmp = oe.contract("abc, ded, bghe, agl, chl -> ld", ltensor, rtensor, asxp(mpo[ims]), u, u, backend=oe_backend) hdiag.append(asnumpy(tmp)) mps_tangent[ims] = mps_l_cano[ims] hdiag = np.concatenate(hdiag, axis=None) count = 0 # recover the vector-like x back to the ndarray tda_coeff def reshape_x(x): tda_coeff = [] offset = 0 for shape in xshape: if shape == (0,0): tda_coeff.append(None) else: size = np.prod(shape) tda_coeff.append(x[offset:size+offset].reshape(shape)) offset += size assert offset == xsize return tda_coeff def hop(x): # H*X nonlocal count count += 1 assert len(x) == xsize tda_coeff = reshape_x(x) res = [np.zeros_like(coeff) if coeff is not None else None for coeff in tda_coeff] # fix ket and sweep bra and accumulate into res for ims in range(site_num): if tda_coeff[ims] is None: assert tangent_u[ims] is None continue # mix-canonical mps mps_tangent = merge(mps_l_cano, mps_r_cano, ims+1) mps_tangent[ims] = tensordot(tangent_u[ims], tda_coeff[ims], (-1, 0)) mps_tangent_conj = mps_r_cano.copy() environ = Environ(mps_tangent, mpo, "R", mps_conj=mps_tangent_conj) for ims_conj in range(site_num): ltensor = environ.GetLR( "L", ims_conj-1, mps_tangent, mpo, itensor=None, mps_conj=mps_tangent_conj, method="System" ) rtensor = environ.GetLR( "R", ims_conj+1, mps_tangent, mpo, itensor=None, mps_conj=mps_tangent_conj, method="Enviro" ) if tda_coeff[ims_conj] is not None: # S-a l-S # d # O-b-O-f-O # e # S-c k-S path = [ ([0, 1], "abc, cek -> abek"), ([2, 0], "abek, bdef -> akdf"), ([1, 0], "akdf, lfk -> adl"), ] out = multi_tensor_contract( path, ltensor, asxp(mps_tangent[ims_conj]), asxp(mpo[ims_conj]), rtensor ) res[ims_conj] += asnumpy(tensordot(tangent_u[ims_conj], out, ([0,1], [0,1]))) # mps_conj combine mps_tangent_conj[ims_conj] = mps_l_cano[ims_conj] res = [mat for mat in res if mat is not None] return np.concatenate(res, axis=None) if algo == "davidson": if restart: cguess = [cguess[:,i] for i in range(cguess.shape[1])] else: cguess = [np.random.random(xsize) - 0.5] precond = lambda x, e, *args: x / (hdiag - e + 1e-4) e, c = davidson( hop, cguess, precond, max_cycle=100, nroots=nroots, max_memory=64000 ) if nroots == 1: c = [c] c = np.stack(c, axis=1) elif algo == "primme": if primme is None: logger.error("can not import primme") raise IMPORT_PRIMME_EXCEPTION if not restart: cguess = None def multi_hop(x): if x.ndim == 1: return hop(x) elif x.ndim == 2: return np.stack([hop(x[:,i]) for i in range(x.shape[1])],axis=1) else: assert False def precond(x): if x.ndim == 1: return np.einsum("i, i -> i", 1/(hdiag+1e-4), x) elif x.ndim ==2: return np.einsum("i, ij -> ij", 1/(hdiag+1e-4), x) else: assert False A = scipy.sparse.linalg.LinearOperator((xsize,xsize), matvec=multi_hop, matmat=multi_hop) M = scipy.sparse.linalg.LinearOperator((xsize,xsize), matvec=precond, matmat=precond) e, c = primme.eigsh(A, k=min(nroots,xsize), which="SA", v0=cguess, OPinv=M, method="PRIMME_DYNAMIC", tol=1e-6) else: assert False logger.debug(f"H*C times: {count}") tda_coeff_list = [] for iroot in range(nroots): tda_coeff_list.append(reshape_x(c[:,iroot])) self.e = np.array(e) self.wfn = [mps_l_cano, mps_r_cano, tangent_u, tda_coeff_list] return self.e
[docs] def dump_wfn(self): r""" Dump wavefunction for restart and analysis Note ---- mps_l_cano.npz: left-canonical form of initial mps mps_r_cano.npz: right-canonical form of the initial mps tangent_u: the tangent space u of the mixed-canonical mps tda_coeff_{iroot}.npz: the tda_coeff of the ith root. """ mps_l_cano, mps_r_cano, tangent_u, tda_coeff_list = self.wfn # store mps_l_cano mps_r_cano mps_l_cano.dump("mps_l_cano.npz") mps_r_cano.dump("mps_r_cano.npz") # store tangent_u tangent_u_dict = {f"{i}":mat for i, mat in enumerate(tangent_u) if mat is not None} np.savez(f"tangent_u.npz", **tangent_u_dict) # store tda coeff for iroot, tda_coeff in enumerate(tda_coeff_list): tda_coeff_dict = {f"{i}":mat for i, mat in enumerate(tda_coeff) if mat is not None} np.savez(f"tda_coeff_{iroot}.npz", **tda_coeff_dict)
[docs] def load_wfn(self, model): r"""Load tda wavefunction """ mps_l_cano = Mps.load(model, "mps_l_cano.npz") mps_r_cano = Mps.load(model, "mps_r_cano.npz") tangent_u_dict = np.load("tangent_u.npz") tangent_u = [tangent_u_dict[str(i)] if str(i) in tangent_u_dict.keys() else None for i in range(mps_l_cano.site_num)] tda_coeff_list = [] for iroot in range(self.nroots): tda_coeff_dict = np.load(f"tda_coeff_{iroot}.npz") tda_coeff = [tda_coeff_dict[str(i)] if str(i) in tda_coeff_dict.keys() else None for i in range(mps_l_cano.site_num)] tda_coeff_list.append(tda_coeff) self.wfn = [mps_l_cano, mps_r_cano, tangent_u, tda_coeff_list]
[docs] def analysis_1ordm(self): r""" calculate one-orbital reduced density matrix of each tda root """ mps_l_cano, mps_r_cano, tangent_u, tda_coeff_list = self.wfn for iroot in range(self.nroots): tda_coeff = tda_coeff_list[iroot] rdm = None for ims in range(mps_l_cano.site_num): if tangent_u[ims] is None: assert tda_coeff[ims] is None continue mps_tangent = merge(mps_l_cano, mps_r_cano, ims+1) mps_tangent[ims] = tensordot(tangent_u[ims], tda_coeff[ims],[-1,0]) rdm_increment = mps_tangent.calc_1ordm() if rdm is None: rdm = rdm_increment else: rdm = [mat1+mat2 for mat1, mat2 in zip(rdm, rdm_increment)] dominant_config = {} for isite, mat in enumerate(rdm): quanta = np.argmax(np.diag(mat)) dominant_config[isite] = (quanta, np.diag(mat)[quanta]) logger.info(f"root: {iroot}, config: {dominant_config}")
[docs] def analysis_dominant_config(self, thresh=0.8, alias=None, tda_m_trunc=20, return_compressed_mps=False): r""" analyze the dominant configuration of each tda root. The algorithm is to compress the tda wavefunction to a rank-1 Hartree state and get the ci coefficient of the largest configuration. Then, the configuration is subtracted from the tda wavefunction and redo the first step to get the second largest configuration. The two steps continue until the thresh is achieved. Parameters ---------- thresh: float, optional the threshold to stop the analysis procedure of each root. :math:`\sum_i |c_i|^2 > thresh`. Default is 0.8. alias: dict, optional The alias of each site. For example, ``alias={0:"v_0", 1:"v_2", 2:"v_1"}``. Default is `None`. tda_m_trunc: int, optional the ``m`` to compress a tda wavefunction. Default is 20. return_compressed_mps: bool, optional If ``True``, return the tda excited state as a single compressed mps. Default is `False`. Returns ------- configs: dict The dominant configration of each root. ``configs = {0:[(config0, config_name0, ci_coeff0),(config1, config_name1, ci_coeff1),...], 1:...}`` compressed_mps: List[renormalizer.mps.Mps] see the description in ``return_compressed_mps``. Note ---- The compressed_mps is an approximation of the tda wavefunction with ``m=tda_m_trunc``. """ mps_l_cano, mps_r_cano, tangent_u, tda_coeff_list = self.wfn if alias is not None: assert len(alias) == mps_l_cano.site_num compressed_mps = [] for iroot in range(self.nroots): logger.info(f"iroot: {iroot}") tda_coeff = tda_coeff_list[iroot] mps_tangent_list = [] weight = [] for ims in range(mps_l_cano.site_num): if tangent_u[ims] is None: assert tda_coeff[ims] is None continue weight.append(np.sum(tda_coeff[ims]**2)) mps_tangent = merge(mps_l_cano, mps_r_cano, ims+1) mps_tangent[ims] = asnumpy(tensordot(tangent_u[ims], tda_coeff[ims],[-1,0])) mps_tangent_list.append(mps_tangent) assert np.allclose(np.sum(weight), 1) # sort the mps_tangent from large weight to small weight mps_tangent_list = [mps_tangent_list[i] for i in np.argsort(weight,axis=None)[::-1]] coeff_square_sum = 0 mps_delete = None config_visited = [] while coeff_square_sum < thresh: if mps_delete is None: # first compress it to M=tda_m_trunc mps_rank1 = compressed_sum(mps_tangent_list, batchsize=5, temp_m_trunc=tda_m_trunc) else: mps_rank1 = compressed_sum([mps_delete] + mps_tangent_list, batchsize=5, temp_m_trunc=tda_m_trunc) if coeff_square_sum == 0 and return_compressed_mps: compressed_mps.append(mps_rank1.copy()) mps_rank1 = mps_rank1.canonicalise().compress(temp_m_trunc=1) # get config with the largest coeff config = [] for ims, ms in enumerate(mps_rank1): ms = ms.array.flatten()**2 quanta = int(np.argmax(ms)) config.append(quanta) # check if the config has been visited if config in config_visited: break config_visited.append(config) ci_coeff_list = [] for mps_tangent in mps_tangent_list: sentinel = xp.ones((1,1)) for ims, ms in enumerate(mps_tangent): sentinel = sentinel.dot(asxp(ms[:,config[ims],:])) ci_coeff_list.append(float(sentinel[0,0])) ci_coeff = np.sum(ci_coeff_list) coeff_square_sum += ci_coeff**2 if alias is not None: config_name = [f"{quanta}"+f"{alias[isite]}" for isite, quanta in enumerate(config) if quanta != 0] config_name = " ".join(config_name) self.configs[iroot].append((config, config_name, ci_coeff)) logger.info(f"config: {config}, {config_name}") else: self.configs[iroot].append((config, ci_coeff)) logger.info(f"config: {config}") logger.info(f"ci_coeff: {ci_coeff}, weight:{ci_coeff**2}") condition = {dof:config[idof] for idof, dof in enumerate(self.model.dofs)} mps_delete_increment = Mps.hartree_product_state(self.model, condition).scale(-ci_coeff) if mps_delete is None: mps_delete = mps_delete_increment else: mps_delete = mps_delete + mps_delete_increment logger.info(f"coeff_square_sum: {coeff_square_sum}") return self.configs, compressed_mps
[docs]def merge(mpsl, mpsr, idx): """ merge two mps (mpsl, mpsr) at dix idx belongs mpsr, the other attributes are the same aas mpsl """ mps = mpsl.copy() for imps in range(idx, mpsr.site_num): mps[imps] = mpsr[imps] return mps