import time
import warnings
from types import SimpleNamespace
import config as cfg
# from yamps.tensor import decompress_from_1d
import yast.yast as yast
from tn_interface_abelian import contract, permute
from ctm.one_site_c4v_abelian.ctm_components_c4v import *
try:
from torch.utils.checkpoint import checkpoint
except ImportError as e:
warnings.warn("torch not available", Warning)
import logging
log = logging.getLogger(__name__)
[docs]def run(state, env, conv_check=None, ctm_args=cfg.ctm_args, global_args=cfg.global_args):
r"""
:param state: wavefunction
:param env: initial C4v symmetric environment
:param conv_check: function which determines the convergence of CTM algorithm. If ``None``,
the algorithm performs ``ctm_args.ctm_max_iter`` iterations.
:param ctm_args: CTM algorithm configuration
:param global_args: global configuration
:type state: IPEPS_C4V_ABELIAN
:type env: ENV_C4V_ABELIAN
:type conv_check: function(IPEPS_C4V_ABELIAN,ENV_C4V_ABELIAN,Object,CTMARGS)->bool
:type ctm_args: CTMARGS
:type global_args: GLOBALARGS
Executes specialized CTM algorithm for abelian-symmetric 1-site C4v symmetric iPEPS starting
from the intial environment ``env``.
To establish the convergence of CTM before
the maximal number of iterations is reached a ``conv_check`` function is invoked.
Its expected signature is ``conv_check(IPEPS_ABELIAN_C4V,ENV_ABELIAN_C4V,Object,CTMARGS)``
where ``Object`` is an arbitary argument. For example it can be a list or dict used
for storing CTM data from previous steps to check convergence.
If :attr:`config.CTMARGS.ctm_force_dl`, then double-layer tensor is precomputed
and used in the CTMRG. Otherwise, `ket` and `bra` layer of on-site tensor are contracted
sequentially when building enlarged corner.
"""
if ctm_args.projector_svd_method=='DEFAULT' or ctm_args.projector_svd_method=='GESDD':
def truncation_f(S):
return yast.linalg.truncation_mask_multiplets(S,keep_multiplets=True, D_total=env.chi,\
tol=ctm_args.projector_svd_reltol, tol_block=ctm_args.projector_svd_reltol_block, \
eps_multiplet=ctm_args.projector_eps_multiplet)
def truncated_decomp(M, chi, legs=(0,1), sU=1, diagnostics=None):
return yast.linalg.svd_with_truncation(M, legs, sU=sU, mask_f=truncation_f, diagnostics=diagnostics)
# elif ctm_args.projector_svd_method=='SYMARP':
# def truncated_decomp(M, chi, legs=(0,1), sU=1):
# return M.split_svd_eigsh_2C(axes=legs, tol=ctm_args.projector_svd_reltol, D_total=chi, \
# sU=sU, keep_multiplets=True)
else:
raise Exception(f"Projector eig/svd method \"{cfg.ctm_args.projector_svd_method}\" not implemented")
a= state.site()
if ctm_args.ctm_force_dl:
# 0) Create double-layer (DL) tensors, preserving the same convention
# for order of indices
#
# / /(+1)
# --a--- = (+1)--A--(+1)
# /| /
# |/ (+1)
# --a*--
# /
#
a_dl= contract(a,a, ((0),(0)), conj=(0,1)) # mefgh,mabcd->efghabcd; efghabcd->eafbgchd
a_dl= a_dl.fuse_legs( axes=((0,4),(1,5),(2,6),(3,7)) )
# 1) perform CTMRG
t_obs=t_ctm=t_fpcm=0.
history=None
past_steps_data=dict() # possibly store some data throughout the execution of CTM
for i in range(ctm_args.ctm_max_iter):
t0_ctm= time.perf_counter()
if ctm_args.ctm_force_dl:
ctm_MOVE_dl(a_dl, env, truncated_decomp, ctm_args=ctm_args, global_args=global_args)
else:
ctm_MOVE_sl(a, env, truncated_decomp, ctm_args=ctm_args, global_args=global_args)
t1_ctm= time.perf_counter()
t0_obs= time.perf_counter()
if conv_check is not None:
# evaluate convergence of the CTMRG procedure
converged, history= conv_check(state, env, history, ctm_args=ctm_args)
if converged:
if ctm_args.verbosity_ctm_convergence>0:
print(f"CTMRG converged at iter= {i}")
break
t1_obs= time.perf_counter()
t_ctm+= t1_ctm-t0_ctm
t_obs+= t1_obs-t0_obs
return env, history, t_ctm, t_obs
# performs CTM move
[docs]def ctm_MOVE_dl(a_dl, env, f_c2x2_decomp, ctm_args=cfg.ctm_args, global_args=cfg.global_args):
r"""
:param a_dl: double-layer on-site C4v symmetric tensor
:param env: C4v symmetric environment
:param f_c2x2_decomp: function performing the truncated spectral decomposition (eigenvalue/svd)
of enlarged corner. The ``f_c2x2_decomp`` returns a tuple composed of
leading chi spectral values and projector on leading chi spectral values.
:param ctm_args: CTM algorithm configuration
:param global_args: global configuration
:type a_dl: yast.Tensor
:type env: ENV_C4V_ABELIAN
:type f_c2x2_decomp: function(yast.Tensor, int)->yast.Tensor, yast.Tensor, yast.Tensor
:type ctm_args: CTMARGS
:type global_args: GLOBALARGS
Executes a single step of C4v symmetric CTM algorithm for 1-site C4v symmetric iPEPS.
This variant of CTM step uses pre-built double-layer on-site tensor.
"""
# 0) compress abelian symmetric tensor into 1D representation for the purposes of
# checkpointing
metadata_store= {}
tmp= tuple([a_dl.compress_to_1d(), env.C[env.keyC].compress_to_1d(), \
env.T[env.keyT].compress_to_1d()])
tensors, metadata_store["in"]= list(zip(*tmp))
# function wrapping up the core of the CTM MOVE segment of CTM algorithm
def ctm_MOVE_dl_c(*tensors):
#
# keep inputs for autograd stored on cpu, move to gpu for the core
# of the computation if desired
if global_args.offload_to_gpu != 'None' and global_args.device=='cpu':
tensors= tuple(r1d.to(global_args.offload_to_gpu) for r1d in tensors)
for meta in metadata_store["in"]:
meta['config']= meta['config']._replace(default_device=global_args.offload_to_gpu)
tensors= tuple(yast.decompress_from_1d(r1d, meta) \
for r1d,meta in zip(tensors,metadata_store["in"]))
A, C, T= tensors
# Fuse on-site tensor indices into single index
# 0(+)
# T--2(-),3(+)->2(-)
# 1(+)
T= T.fuse_legs(axes=(0,1,(2,3)))
# 1) build enlarged corner upper left corner
C2X2= c2x2_dl(A, C, T, verbosity=ctm_args.verbosity_projectors)
# import pdb; pdb.set_trace()
# 2) build projector
# (+1)--M--(+1) = (+1)0--P--1(+1)(-1)--S--(+1)--(-1)0--Vh--1(+1)
P, S, Vh = f_c2x2_decomp(C2X2, env.chi)
# 3) absorb and truncate
# __
# C2X2 |--1(+1) => C2X2---1(+1) (1->-1)0---P--1(1->-1) => C2X2--1(-1)
# |_____| | |
# | 0(-1) 0(-1)
# 0(+1)
# 0(-1)
# |
# P*--
# 1->0(-1)
# NOTE change signature <=> hermitian-conj since C2X2= UDU^\dag where U=U^\dag ?
C2X2= contract(P.conj(), C2X2, ([0],[0]))
C2X2= contract(C2X2, P.flip_signature(), ([1],[0]))
# 2->1(+1->-1)
# ______P___
# 0(+1->-1) 1->0(+1->-1)
# 0(+1)
# T--2->3(-1)
# 1->2(+1)
P= P.unfuse_legs(axes=0)
nT= contract(P.flip_signature(), T,([0],[0]))
# 1->0(-1)
# __P____________
# | 0(-1)
# | 0(+1)
# T--3(-1) (+1)1--A--3(+1)
# 2->1(+1) 2(+1)
# TODO is it neccessary to "ungroup" the index connecting T to the site ?
nT= contract(nT, A,([0,3],[0,1]))
# 0(-1)
# __P____
# | |
# | | 0(-1)
# T-------A--3->1(+1) => T--1(+1)
# 1(+1) 2(+1) 2->1(-1)
# 0(-1) 1(-1)
# |___P*__|
# 2(-1)
# TODO do we conjugate here ?
nT= contract(nT, P.conj(),([1,2],[0,1]))
nT= nT.transpose((0,2,1)).flip_signature()
# 4) symmetrize, normalize and assign new C,T
C2X2= 0.5*( C2X2 + C2X2.transpose((1,0)).conj_blocks() )
nT= 0.5*(nT + nT.transpose((1,0,2)).conj_blocks() )
C2X2= C2X2/S.norm(p='inf')
nT= nT/nT.norm(p='inf')
# 0(-)
# T--2(+)->2(-),3(+)
# 1(-)
nT= nT.unfuse_legs(axes=2)
# 2) Return raw new tensors
tmp_loc= tuple([C2X2.compress_to_1d(), nT.compress_to_1d()])
tensors_loc, metadata_store["out"]= list(zip(*tmp_loc))
# move back to (default) cpu if offload_to_gpu is specified
if global_args.offload_to_gpu != 'None' and global_args.device=='cpu':
tensors_loc= tuple(r1d.to(global_args.device) for r1d in tensors_loc)
for meta in metadata_store["out"]:
meta['config']= meta['config']._replace(default_device=global_args.device)
return tensors_loc
# Call the core function, allowing for checkpointing
if ctm_args.fwd_checkpoint_move:
new_tensors= checkpoint(ctm_MOVE_dl_c,*tensors)
else:
new_tensors= ctm_MOVE_dl_c(*tensors)
new_tensors= tuple(yast.decompress_from_1d(r1d, meta) \
for r1d,meta in zip(new_tensors,metadata_store["out"]))
env.C[env.keyC]= new_tensors[0]
env.T[env.keyT]= new_tensors[1]
# performs CTM move
[docs]def ctm_MOVE_sl(a, env, f_c2x2_decomp, ctm_args=cfg.ctm_args, global_args=cfg.global_args):
r"""
:param a: on-site C4v symmetric tensor
:param env: C4v symmetric environment
:param f_c2x2_decomp: function performing the truncated spectral decomposition (eigenvalue/svd)
of enlarged corner. The ``f_c2x2_decomp`` returns a tuple composed of
leading chi spectral values and projector on leading chi spectral values.
:param ctm_args: CTM algorithm configuration
:param global_args: global configuration
:type a: yast.Tensor
:type env: ENV_ABELIAN_C4V
:type f_c2x2_decomp: function(yast.Tensor, int)->yast.Tensor, yast.Tensor, yast.Tensor
:type ctm_args: CTMARGS
:type global_args: GLOBALARGS
Executes a single step of C4v symmetric CTM algorithm for 1-site C4v symmetric iPEPS.
This variant of CTM step does not explicitly build double-layer on-site tensor.
"""
# 0) compress abelian symmetric tensor into 1D representation for the purposes of
# checkpointing
#
# keep inputs for autograd stored on cpu, move to gpu for the core
# of the computation if desired
metadata_store= {}
tmp= tuple([a.compress_to_1d(), \
env.C[env.keyC].compress_to_1d(), env.T[env.keyT].compress_to_1d()])
tensors, metadata_store["in"] = list(zip(*tmp))
# function wrapping up the core of the CTM MOVE segment of CTM algorithm
def ctm_MOVE_sl_c(*tensors):
#
# keep inputs for autograd stored on cpu, move to gpu for the core
# of the computation if desired
if global_args.offload_to_gpu != 'None' and global_args.device=='cpu':
tensors= tuple(r1d.to(global_args.offload_to_gpu) for r1d in tensors)
for meta in metadata_store["in"]:
meta['config']= meta['config']._replace(default_device=global_args.offload_to_gpu)
a,C,T= tuple(yast.decompress_from_1d(r1d, meta) \
for r1d,meta in zip(tensors,metadata_store["in"]))
# 1) build enlarged corner upper left corner
# C----T---3(+)
# | |
# T---a*a--4(+),5(-)
# | |
# 0(+) 1(+),2(-)
C2X2= c2x2_sl(a, C, T, verbosity=ctm_args.verbosity_projectors)
# 2) build projector
# (+)0--C2X2--(+)3 = (+)0--P--3(+1)(-1)--S--(+1)--(-1)0--Vh--3->1(+)
# (+)1-- --(+)4 (+)1-- --4->2(+)
# (-)2-- --(-)5 (-)2-- --5->3(-)
P, S, Vh = f_c2x2_decomp(C2X2, env.chi, legs=([0,1,2],[3,4,5]))
# 3) absorb and truncate
# C2X2 |--3->1(+1) => C2X2_|--1(+) (+1->-)0---P--3->1(+1->-) => C2X2--1(-1)
# | |--4->2(+) | \--2(+) (+1->-)1--/ |
# |_____|--5->3(-) 0(-) --3(-) (-1->+)2-- 0(-1)
# | |
# 0(+1) 1(+),2(-)
# 0(-) 1(-),2(+)
# | /
# P*--
# 2->0(-1)
C2X2= contract(P.conj(), C2X2, ([0,1,2],[0,1,2]))
C2X2= contract(C2X2, P.flip_signature(), ([1,2,3],[0,1,2]))
# The absorption step for C is done with T placed at B-sublattice
# and hence the half-row/column tensor absorption step is done with T'
# corresponding to A-sublattice
#
# C--T--C => C---T--T'--T--C => C--T-- & --T'--
# T--A--T T---A--B---A--T T--A-- --B---
# C--T--C T'--B--A---B--T | | |
# T---A--B---A--T
# C---T--T'--T--C
#
# 0(+1->-1)
# T--2(-)->2(-),3(+)->2(-1->+1),3(+1->-1)
# 1(+1->-1)
T= T.flip_signature()
# 3(+)->2(+) 0
# ______P___ _____P___
# 0(+) 1(+),2(-)->0(+),1(-) => | 2(+),3(-)
# 0(-) |
# T--2(+),3(-)->4(+),5(-) T--4(+),5(-)
# 1(-)->3(-) 1(-)
nT= contract(P, T,([0],[0]))
nT= permute(nT,(2,3,0,1,4,5))
# Half-row/column tensor absorbs B-sublattice version of on-site tensor
#
# 0(+) 0(+)
# _______P__________ => _____P_________
# | (+)2 \3->2(-) | | \
# | (-)1 T-------------a-------6->3(-)
# T----4(+)(-)2----a--4->6(-) |\ (-)4/| 2(-)
# |\5->3(-) |\0->4(-) | \ (+)0-|--\1(+)
# | 3->5(-) | \(-)3(+)2-----a*---4->5(+)
# 1(-) | | |
# 1(-) (-)2<-5 3->4(+)
#
_a= a.flip_signature()
nT= contract(nT, _a, ([2,4], [1,2]))
nT= contract(nT, _a.conj(), ([2,3,4], [1,2,0]))
# 0(+) 0(+)
# __P__ _P__
# | | => | |
# T'---a*a--3(-),5(+)->4(-),5(+) T'--a*a--4(-),5(+)
# | | | |
# 1(-1) 2(-),4(+)->2(-),3(+) 1(-) 2(-),3(+)
nT= permute(nT, (0,1,2,4,3,5))
# 0(+)
# __P____
# | |
# | | 0(+)
# T'-----a*a--4(-),5(+)->1(-),2(+) => T--1(-),2(+)->2(-),3(+)
# 1(-1) 2(-),3(+) 2->1(+)
# 0(+1) 1(+),2(-)
# |___P__|
# 3(+)
# TODO do we conjugate here ?
nT= contract(nT, P,([1,2,3],[0,1,2]))
nT= nT.transpose((0,3,1,2))
# 4) symmetrize, normalize and assign new C,T
C2X2= 0.5*( C2X2 + C2X2.transpose((1,0)).conj_blocks() )
nT= 0.5*(nT + nT.transpose((1,0,2,3)).conj_blocks() )
C2X2= C2X2/S.norm(p='inf')
nT= nT/nT.norm(p='inf')
# 2) Return raw new tensors
tmp_loc= tuple([C2X2.compress_to_1d(), nT.compress_to_1d()])
tensors_loc, metadata_store["out"]= list(zip(*tmp_loc))
# move back to (default) cpu if offload_to_gpu is specified
if global_args.offload_to_gpu != 'None' and global_args.device=='cpu':
tensors_loc= tuple(r1d.to(global_args.device) for r1d in tensors_loc)
for meta in metadata_store["out"]:
meta['config']= meta['config']._replace(default_device=global_args.device)
return tensors_loc
# Call the core function, allowing for checkpointing
if ctm_args.fwd_checkpoint_move:
new_tensors= checkpoint(ctm_MOVE_sl_c,*tensors)
else:
new_tensors= ctm_MOVE_sl_c(*tensors)
new_tensors= tuple(yast.decompress_from_1d(r1d, meta) \
for r1d,meta in zip(new_tensors,metadata_store["out"]))
env.C[env.keyC]= new_tensors[0]
env.T[env.keyT]= new_tensors[1]