import time
import torch
from torch.utils.checkpoint import checkpoint
import config as cfg
from config import _torch_version_check
from ipeps.ipeps import IPEPS
from ctm.generic.env import *
from ctm.generic.ctm_components import *
from ctm.generic.ctm_projectors import *
from tn_interface import contract, einsum
from tn_interface import conj
from tn_interface import contiguous, view, permute
[docs]def run(state, env, conv_check=None, ctm_args=cfg.ctm_args, global_args=cfg.global_args):
r"""
:param state: wavefunction
:param env: environment
:param conv_check: function which determines the convergence of CTM algorithm. If ``None``,
the algorithm performs ``ctm_args.ctm_max_iter`` iterations.
:param ctm_args: CTM algorithm configuration
:param global_args: global configuration
:type state: IPEPS
:type env: ENV
:type conv_check: function(IPEPS,ENV,list[float],CTMARGS)->bool
:type ctm_args: CTMARGS
:type global_args: GLOBALARGS
Executes directional CTM algorithm for generic iPEPS starting from the intial environment ``env``.
To establish the convergence of CTM before the maximal number of iterations
is reached a ``conv_check`` function is invoked. Its expected signature is
``conv_check(IPEPS,ENV,Object,CTMARGS)`` where ``Object`` is an arbitary argument. For
example it can be a list or dict used for storing CTM data from previous steps to
check convergence.
"""
# 0) Create double-layer (DL) tensors, preserving the same convention
# for order of indices
#
# / /
# --A^dag-- = --a--
# /| /
# |/
# --A--
# /
#
sitesDL=dict()
for coord,A in state.sites.items():
dimsA = A.size()
a = contiguous(einsum('mefgh,mabcd->eafbgchd',A,conj(A)))
a= view(a, (dimsA[1]**2,dimsA[2]**2, dimsA[3]**2, dimsA[4]**2))
sitesDL[coord]=a
stateDL = IPEPS(sitesDL,state.vertexToSite)
# 1) perform CTMRG
t_obs=t_ctm=0.
history=None
for i in range(ctm_args.ctm_max_iter):
t0_ctm= time.perf_counter()
for direction in ctm_args.ctm_move_sequence:
diagnostics={"ctm_i": i, "ctm_d": direction} if ctm_args.verbosity_projectors>0 else None
ctm_MOVE(direction, stateDL, env, ctm_args=ctm_args, global_args=global_args, \
verbosity=ctm_args.verbosity_ctm_move,diagnostics=diagnostics)
t1_ctm= time.perf_counter()
t0_obs= time.perf_counter()
if conv_check is not None:
# evaluate convergence of the CTMRG procedure
converged, history = conv_check(state, env, history, ctm_args=ctm_args)
if ctm_args.verbosity_ctm_convergence>1: print(history)
if converged:
if ctm_args.verbosity_ctm_convergence>0:
print(f"CTMRG converged at iter= {i}, history= {history[-1]}")
break
t1_obs= time.perf_counter()
t_ctm+= t1_ctm-t0_ctm
t_obs+= t1_obs-t0_obs
return env, history, t_ctm, t_obs
def run_overlap(state1, state2, env, conv_check=None, ctm_args=cfg.ctm_args, global_args=cfg.global_args):
# r"""
# :param state: wavefunction
# :param env: environment
# :param conv_check: function which determines the convergence of CTM algorithm. If ``None``,
# the algorithm performs ``ctm_args.ctm_max_iter`` iterations.
# :param ctm_args: CTM algorithm configuration
# :param global_args: global configuration
# :type state: IPEPS
# :type env: ENV
# :type conv_check: function(IPEPS,ENV,list[float],CTMARGS)->bool
# :type ctm_args: CTMARGS
# :type global_args: GLOBALARGS
# Executes directional CTM algorithm for generic iPEPS starting from the intial environment ``env``.
# TODO add reference
# """
# 0) Create double-layer (DL) tensors, preserving the same convention
# for order of indices
#
# / /
# --A1^dag-- = --a--
# /| /
# |/
# --A2--
# /
#
sitesDL=dict()
for coord,site in state1.sites.items():
A1 = state1.site((coord[0], coord[1]))
A2 = state2.site((coord[0], coord[1]))
dimsA = A1.size()
a = contiguous(einsum('mefgh,mabcd->eafbgchd',A1,conj(A2)))
a = view(a, (dimsA[1]**2,dimsA[2]**2, dimsA[3]**2, dimsA[4]**2))
sitesDL[coord]=a
stateDL = IPEPS(sitesDL,state1.vertexToSite)
# 1) perform CTMRG
t_obs=t_ctm=0.
history=None
for i in range(ctm_args.ctm_max_iter):
t0_ctm= time.perf_counter()
for direction in ctm_args.ctm_move_sequence:
ctm_MOVE(direction, stateDL, env, ctm_args=ctm_args, global_args=global_args, \
verbosity=ctm_args.verbosity_ctm_move)
t1_ctm= time.perf_counter()
t0_obs= time.perf_counter()
if conv_check is not None:
# evaluate convergence of the CTMRG procedure
converged, history = conv_check(state1, state2, env, history, ctm_args=ctm_args)
if ctm_args.verbosity_ctm_convergence>1: print(history)
if converged:
if ctm_args.verbosity_ctm_convergence>0:
print(f"CTMRG converged at iter= {i}, history= {history[-1]}")
break
t1_obs= time.perf_counter()
t_ctm+= t1_ctm-t0_ctm
t_obs+= t1_obs-t0_obs
return env, history, t_ctm, t_obs
# performs
#
[docs]def ctm_MOVE(direction, state, env, ctm_args=cfg.ctm_args, global_args=cfg.global_args, \
verbosity=0, diagnostics=None):
r"""
:param direction: one of Up=(0,-1), Left=(-1,0), Down=(0,1), Right=(1,0)
:type direction: tuple(int,int)
:param state: wavefunction
:param env: environment
:param ctm_args: CTM algorithm configuration
:param global_args: global configuration
:type state: IPEPS
:type env: ENV
:type ctm_args: CTMARGS
:type global_args: GLOBALARGS
Executes a single directional CTM move in one of the directions. First, build
projectors for each non-equivalent bond (to be truncated) in the unit cell of iPEPS.
Second, construct enlarged environment tensors and then truncate them
to obtain updated environment tensors.
"""
# select projector function
if ctm_args.projector_method=='4X4':
ctm_get_projectors=ctm_get_projectors_4x4
elif ctm_args.projector_method=='4X2':
ctm_get_projectors=ctm_get_projectors_4x2
else:
raise ValueError("Invalid Projector method: "+str(ctm_args.projector_method))
# EXPERIMENTAL
# 0) extract raw tensors as tuple
tensors= tuple(state.sites[key] for key in state.sites.keys()) \
+ tuple(env.C[key] for key in env.C.keys()) + tuple(env.T[key] for key in env.T.keys())
def move_normalize_c(nC1, nC2, nT, norm_type=ctm_args.ctm_absorb_normalization,\
verbosity= ctm_args.verbosity_ctm_move):
_ord=2
if norm_type=='inf':
_ord= float('inf')
if _torch_version_check("1.9.0"):
scale_nC1= torch.linalg.vector_norm(nC1,ord=_ord)
scale_nC2= torch.linalg.vector_norm(nC2,ord=_ord)
scale_nT= torch.linalg.vector_norm(nT,ord=_ord)
else:
scale_nC1= nC1.norm(p=_ord)
scale_nC2= nC2.norm(p=_ord)
scale_nT= nT.norm(p=_ord)
if verbosity>0:
print(f"nC1 {scale_nC1} nC2 {scale_nC2} nT {scale_nT}")
nC1 = nC1/scale_nC1
nC2 = nC2/scale_nC2
nT = nT/scale_nT
return nC1, nC2, nT
# function wrapping up the core of the CTM MOVE segment of CTM algorithm
def ctm_MOVE_c(*tensors):
if global_args.device=='cpu' and global_args.offload_to_gpu != 'None':
tensors= tuple( t.to(global_args.offload_to_gpu) for t in tensors )
# 1) wrap raw tensors back into IPEPS and ENV classes
sites_loc= dict(zip(state.sites.keys(),tensors[0:len(state.sites)]))
state_loc= IPEPS(sites_loc, vertexToSite=state.vertexToSite)
env_loc= ENV(env.chi)
env_loc.C= dict(zip(env.C.keys(),tensors[len(state.sites):len(state.sites)+len(env.C)]))
env_loc.T= dict(zip(env.T.keys(),tensors[len(state.sites)+len(env.C):]))
# Loop over all non-equivalent sites of ipeps
# and compute projectors P(coord), P^tilde(coord)
P = dict()
Pt = dict()
for coord,site in state_loc.sites.items():
# TODO compute isometries
if not (diagnostics is None): diagnostics["coord"]= coord
P[coord], Pt[coord] = ctm_get_projectors(direction, coord, state_loc, env_loc,\
ctm_args, global_args, diagnostics=diagnostics)
if verbosity>0:
print("P,Pt RIGHT "+str(coord)+" P: "+str(P[coord].size())+" Pt: "+str(Pt[coord].size()))
if verbosity>1:
print(P[coord])
print(Pt[coord])
# Loop over all non-equivalent sites of ipeps
# and perform absorption and truncation
nC1 = dict()
nC2 = dict()
nT = dict()
for coord in state_loc.sites.keys():
if direction==(0,-1):
nC1[coord], nC2[coord], nT[coord] = absorb_truncate_CTM_MOVE_UP(coord, state_loc, env_loc, P, Pt)
elif direction==(-1,0):
nC1[coord], nC2[coord], nT[coord] = absorb_truncate_CTM_MOVE_LEFT(coord, state_loc, env_loc, P, Pt)
elif direction==(0,1):
nC1[coord], nC2[coord], nT[coord] = absorb_truncate_CTM_MOVE_DOWN(coord, state_loc, env_loc, P, Pt)
elif direction==(1,0):
nC1[coord], nC2[coord], nT[coord] = absorb_truncate_CTM_MOVE_RIGHT(coord, state_loc, env_loc, P, Pt)
else:
raise ValueError("Invalid direction: "+str(direction))
nC1[coord], nC2[coord], nT[coord]= move_normalize_c(nC1[coord], nC2[coord], nT[coord])
# 2) Return raw new tensors
ret_list= tuple(nC1[key] for key in nC1.keys()) + tuple(nC2[key] for key in nC2.keys()) \
+ tuple(nT[key] for key in nT.keys())
if global_args.device=='cpu' and global_args.offload_to_gpu != 'None':
ret_list= tuple( t.to(global_args.device) for t in ret_list )
return ret_list
# Call the core function, allowing for checkpointing
if ctm_args.fwd_checkpoint_move:
new_tensors= checkpoint(ctm_MOVE_c,*tensors)
else:
new_tensors= ctm_MOVE_c(*tensors)
# 3) warp the returned raw tensor in dictionary
tmp_coords= state.sites.keys()
count_coord= len(tmp_coords)
nC1 = dict(zip(tmp_coords, new_tensors[0:count_coord]))
nC2 = dict(zip(tmp_coords, new_tensors[count_coord:2*count_coord]))
nT = dict(zip(tmp_coords, new_tensors[2*count_coord:]))
# Assign new nC1,nT,nC2 to appropriate environment tensors
rel_CandT_vecs = dict()
# specify relative vectors identifying the environment tensors
# with respect to the direction
if direction==(0,-1):
rel_CandT_vecs = {"nC1": (1,-1), "nC2": (-1,-1), "nT": direction}
elif direction==(-1,0):
rel_CandT_vecs = {"nC1": (-1,-1), "nC2": (-1,1), "nT": direction}
elif direction==(0,1):
rel_CandT_vecs = {"nC1": (-1,1), "nC2": (1,1), "nT": direction}
elif direction==(1,0):
rel_CandT_vecs = {"nC1": (1,1), "nC2": (1,-1), "nT": direction}
else:
raise ValueError("Invalid direction: "+str(direction))
for coord,site in state.sites.items():
new_coord = state.vertexToSite((coord[0]-direction[0], coord[1]-direction[1]))
# print("coord: "+str(coord)+" + "+str(direction)+" -> "+str(new_coord))
env.C[(new_coord,rel_CandT_vecs["nC1"])] = nC1[coord]
env.C[(new_coord,rel_CandT_vecs["nC2"])] = nC2[coord]
env.T[(new_coord,rel_CandT_vecs["nT"])] = nT[coord]
#####################################################################
# functions performing absorption and truncation step
#####################################################################
def absorb_truncate_CTM_MOVE_UP(coord, state, env, P, Pt, verbosity=0):
vec = (1,0)
coord_shift_left= state.vertexToSite((coord[0]-vec[0], coord[1]-vec[1]))
coord_shift_right = state.vertexToSite((coord[0]+vec[0], coord[1]+vec[1]))
tensors= env.C[(coord,(1,-1))], env.T[(coord,(1,0))], env.T[(coord,(0,-1))], \
env.T[(coord,(-1,0))], env.C[(coord,(-1,-1))], state.site(coord), \
view(P[coord], (env.chi,state.site(coord_shift_left).size()[3],env.chi)), \
view(Pt[coord], (env.chi,state.site(coord).size()[1],env.chi)), \
view(P[coord_shift_right], (env.chi,state.site(coord).size()[3],env.chi)), \
view(Pt[coord_shift_right], (env.chi,state.site(coord_shift_right).size()[1],env.chi))
if cfg.ctm_args.fwd_checkpoint_absorb:
return checkpoint(absorb_truncate_CTM_MOVE_UP_c,*tensors)
else:
return absorb_truncate_CTM_MOVE_UP_c(*tensors)
def absorb_truncate_CTM_MOVE_UP_c(*tensors):
C1, T1, T, T2, C2, A, P2, Pt2, P1, Pt1= tensors
# 0--C1
# 1
# 0
# 1--T1
# 2
nC1 = contract(C1,T1,([1],[0]))
# --0 0--C1
# | |
# 0<-2--Pt1 |
# | |
# --1 1--T1
# 2->1
nC1 = contract(Pt1, nC1,([0,1],[0,1]))
# C2--1->0
# 0
# 0
# T2--2
# 1
nC2 = contract(C2, T2,([0],[0]))
# C2--0 0--
# | |
# | P2--2->1
# | |
# T2--2 1--
# 1->0
nC2 = contract(nC2, P2,([0,2],[0,1]))
# --0 0--T--2->3
# | 1->2
# 1<-2--Pt2
# |
# --1->0
nT = contract(Pt2, T, ([0],[0]))
# -------T--3->1
# | 2
# 0<-1--Pt2 |
# | 0
# --0 1--A--3
# 2
nT = contract(nT, A,([0,2],[1,0]))
# -------T--1 0--
# | | |
# 0--Pt2 | P1--2
# | | |
# -------A--3 1--
# 2->1
nT = contract(nT, P1,([1,3],[0,1]))
nT = contiguous(nT)
# Assign new C,T
#
# C(coord,(-1,-1))-- --T(coord,(0,-1))-- --C(coord,(1,-1))
# | P2-- --Pt2 | P1-- -Pt1 |
# T(coord,(-1,0))--- --A(coord)--------- --T(coord,(1,0))
# | | |
#
# =>
#
# C^new(coord+(0,1),(-1,-1))-- --T^new(coord+(0,1),(0,-1))-- --C^new(coord+(0,1),(1,-1))
# | | |
return nC1, nC2, nT
def absorb_truncate_CTM_MOVE_LEFT(coord, state, env, P, Pt, verbosity=0):
vec = (0,-1)
coord_shift_up= state.vertexToSite((coord[0]+vec[0], coord[1]+vec[1]))
coord_shift_down= state.vertexToSite((coord[0]-vec[0], coord[1]-vec[1]))
tensors = env.C[(coord,(-1,-1))], env.T[(coord,(0,-1))], env.T[(coord,(-1,0))], \
env.T[(coord,(0,1))], env.C[(coord,(-1,1))], state.site(coord), \
view(P[coord], (env.chi,state.site(coord_shift_down).size()[0],env.chi)), \
view(Pt[coord], (env.chi,state.site(coord).size()[2],env.chi)), \
view(P[coord_shift_up], (env.chi,state.site(coord).size()[0],env.chi)), \
view(Pt[coord_shift_up], (env.chi,state.site(coord_shift_up).size()[2],env.chi))
if cfg.ctm_args.fwd_checkpoint_absorb:
return checkpoint(absorb_truncate_CTM_MOVE_LEFT_c,*tensors)
else:
return absorb_truncate_CTM_MOVE_LEFT_c(*tensors)
def absorb_truncate_CTM_MOVE_LEFT_c(*tensors):
C1, T1, T, T2, C2, A, P2, Pt2, P1, Pt1= tensors
# C1--1 0--T1--2
# | |
# 0 1
nC1 = contract(C1,T1,([1],[0]))
# C1--1 0--T1--2->1
# | |
# 0 1
# 0 1
# |___Pt1__|
# 2->0
nC1 = contract(Pt1, nC1,([0,1],[0,1]))
# 0 0->1
# C2--1 1--T2--2
nC2 = contract(C2, T2,([1],[1]))
# 2->0
# ___P2___
# 0 1
# 0 1
# C2-----T2--2->1
nC2 = contract(P2, nC2,([0,1],[0,1]))
# 2->1
# ___P1__
# 0 1->0
# 0
# T--2->3
# 1->2
nT = contract(P1, T,([0],[0]))
# 1->0
# ___P1____
# | 0
# | 0
# T--3 1--A--3
# 2->1 2
nT = contract(nT, A,([0,3],[0,1]))
# 0
# ___P1___
# | |
# | |
# T-------A--3->1
# 1 2
# 0 1
# |___Pt2_|
# 2
nT = contract(nT, Pt2,([1,2],[0,1]))
nT = contiguous(permute(nT, (0,2,1)))
# Assign new C,T
#
# C(coord,(-1,-1))--T(coord,(0,-1))-- => C^new(coord+(1,0),(-1,-1))--
# |________ ______| |
# Pt1
# |
#
# |
# _________P1______
# | | |
# T(coord,(-1,0))--A(coord)-- T^new(coord+(1,0),(-1,0))--
# |________ _____| |
# Pt2
# |
#
# |
# ________P2_______
# | | |
# C(coord,(-1,1))--T(coord,(0,1))-- C^new(coord+(1,0),(-1,1))
return nC1, nC2, nT
def absorb_truncate_CTM_MOVE_DOWN(coord, state, env, P, Pt, verbosity=0):
vec = (-1,0)
coord_shift_right= state.vertexToSite((coord[0]-vec[0], coord[1]-vec[1]))
coord_shift_left = state.vertexToSite((coord[0]+vec[0], coord[1]+vec[1]))
tensors= env.C[(coord,(-1,1))], env.T[(coord,(-1,0))], env.T[(coord,(0,1))], \
env.T[(coord,(1,0))], env.C[(coord,(1,1))], state.site(coord), \
view(P[coord], (env.chi,state.site(coord_shift_right).size()[1],env.chi)), \
view(Pt[coord], (env.chi,state.site(coord).size()[3],env.chi)), \
view(P[coord_shift_left], (env.chi,state.site(coord).size()[1],env.chi)), \
view(Pt[coord_shift_left], (env.chi,state.site(coord_shift_left).size()[3],env.chi))
if cfg.ctm_args.fwd_checkpoint_absorb:
return checkpoint(absorb_truncate_CTM_MOVE_DOWN_c,*tensors)
else:
return absorb_truncate_CTM_MOVE_DOWN_c(*tensors)
def absorb_truncate_CTM_MOVE_DOWN_c(*tensors):
C1, T1, T, T2, C2, A, P2, Pt2, P1, Pt1= tensors
# 0->1
# T1--2->2
# 1
# 0
# C1--1->0
nC1 = contract(C1,T1,([0],[1]))
# 1->0
# T1--2 1--
# | |
# | Pt1--2->1
# | |
# C1--0 0--
nC1 = contract(nC1, Pt1, ([0,2],[0,1]))
# 1<-0
# 2<-1--T2
# 2
# 0
# 0<-1--C2
nC2 = contract(C2, T2,([0],[2]))
# 0<-1
# --1 2--T2
# | |
# 1<-2--P2 |
# | |
# --0 0--C2
nC2 = contract(nC2, P2, ([0,2],[0,1]))
# --1->0
# |
# 1<-2--P1
# | 0->2
# --0 1--T--2->3
nT = contract(P1, T, ([0],[1]))
# 0->2
# --0 1--A--3
# | 2
# 0<-1--P1 |
# | 2
# -------T--3->1
nT = contract(nT, A,([0,2],[1,2]))
# 2->1
# -------A--3 1--
# | | |
# 0--P1 | Pt2--2
# | | |
# -------T--1 0--
nT = contract(nT, Pt2,([1,3],[0,1]))
nT = contiguous(permute(nT, (1,0,2)))
# Assign new C,T
#
# | | |
# T(coord,(-1,0))-- --A(coord)-------- --T(coord,(1,0))
# | Pt1-- --P1 | Pt2-- --P2 |
# C(coord,(-1,1))-- --T(coord,(0,1))-- --C(coord,(1,1))
#
# =>
#
# | | |
# C^new(coord+(0,-1),(-1,1))-- --T^new(coord+(0,-1),(0,1))-- --C^new(coord+(0,-1),(1,1))
return nC1, nC2, nT
def absorb_truncate_CTM_MOVE_RIGHT(coord, state, env, P, Pt, verbosity=0):
vec = (0,1)
coord_shift_down = state.vertexToSite((coord[0]+vec[0], coord[1]+vec[1]))
coord_shift_up = state.vertexToSite((coord[0]-vec[0], coord[1]-vec[1]))
tensors= env.C[(coord,(1,1))], env.T[(coord,(0,1))], env.T[(coord,(1,0))], \
env.T[(coord,(0,-1))], env.C[(coord,(1,-1))], state.site(coord), \
view(P[coord], (env.chi,state.site(coord_shift_up).size()[2],env.chi)), \
view(Pt[coord], (env.chi,state.site(coord).size()[0],env.chi)), \
view(P[coord_shift_down], (env.chi,state.site(coord).size()[2],env.chi)), \
view(Pt[coord_shift_down], (env.chi,state.site(coord_shift_down).size()[0],env.chi))
if cfg.ctm_args.fwd_checkpoint_absorb:
return checkpoint(absorb_truncate_CTM_MOVE_RIGHT_c,*tensors)
else:
return absorb_truncate_CTM_MOVE_RIGHT_c(*tensors)
def absorb_truncate_CTM_MOVE_RIGHT_c(*tensors):
C1, T1, T, T2, C2, A, P2, Pt2, P1, Pt1= tensors
# 0->1 0
# 2<-1--T1--2 1--C1
nC1 = contract(C1, T1,([1],[2]))
# 2->0
# __Pt1_
# 1 0
# 1 0
# 1<-2--T1----C1
nC1 = contract(Pt1, nC1,([0,1],[0,1]))
# 1<-0--T2--2 0--C2
# 2<-1 0<-1
nC2 = contract(C2,T2,([0],[2]))
# 0<-1--T2----C2
# 2 0
# 1 0
# |__P2_|
# 2->1
nC2 = contract(nC2, P2,([0,2],[0,1]))
# 1<-2
# ___Pt2__
# 0<-1 0
# 0
# 2<-1--T
# 3<-2
nT = contract(Pt2, T,([0],[0]))
# 0<-1
# ___Pt2__
# 0 |
# 0 |
# 2<-1--A--3 2--T
# 3<-2 1<-3
nT = contract(nT, A,([0,2],[0,3]))
# 0
# ___Pt2__
# | |
# | |
# 1<-2--A-------T
# 3 1
# 1 0
# |___P1__|
# 2
nT = contract(nT, P1,([1,3],[0,1]))
nT = contiguous(nT)
# Assign new C,T
#
# --T(coord,(0,-1))--C(coord,(1,-1)) =>--C^new(coord+(-1,0),(1,-1))
# |______ ________| |
# P2
# |
#
# |
# ______Pt2
# | | |
# --A(coord)--T(coord,(1,0)) --T^new(coord+(-1,0),(1,0))
# |______ _| |
# P1
# |
#
# |
# ______Pt1______
# | | |
# --T(coord,(0,1))--C(coord,(1,1)) --C^new(coord+(-1,0),(1,1))
return nC1, nC2, nT