Coverage for gpaw/lcaotddft/linedensity.py: 20%

86 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-07-09 00:21 +0000

1import numpy as np 

2 

3import ase.io.ulm as ulm 

4 

5from gpaw.io import Writer 

6from gpaw.lcaotddft.observer import TDDFTObserver 

7 

8 

9class LineDensityReader: 

10 def __init__(self, filename): 

11 self.reader = ulm.Reader(filename) 

12 tag = self.reader.get_tag() 

13 if tag != LineDensityWriter.ulmtag: 

14 raise RuntimeError('Unknown tag %s' % tag) 

15 self.filename = filename 

16 

17 def __getattr__(self, attr): 

18 return getattr(self.reader, attr) 

19 

20 @property 

21 def kick_strength(self): 

22 for ri in range(1, len(self.reader)): 

23 r = self.reader[ri] 

24 if r.action == 'kick': 

25 return r.kick_strength 

26 

27 def read(self, skip_duplicates=True, zero_pad='both'): 

28 assert zero_pad in ['both', False, None] 

29 time_t = [] 

30 rho_tx = [] 

31 time0 = -np.inf 

32 for ri in range(1, len(self.reader)): 

33 r = self.reader[ri] 

34 time = r.time 

35 if time > time0 or not skip_duplicates: 

36 time_t.append(time) 

37 rho_x = r.rho_x 

38 if zero_pad == 'both': 

39 rho_x = np.concatenate(([0], rho_x, [0])) 

40 rho_tx.append(rho_x) 

41 time0 = time 

42 time_t = np.array(time_t) 

43 rho_tx = np.array(rho_tx) 

44 return time_t, rho_tx 

45 

46 def close(self): 

47 self.reader.close() 

48 

49 

50class LineDensityWriter(TDDFTObserver): 

51 version = 1 

52 ulmtag = 'LineDensity' 

53 

54 def __init__(self, paw, filename, c=0, density_type='comp', interval=1): 

55 TDDFTObserver.__init__(self, paw, interval) 

56 if paw.niter == 0: 

57 self.density_type = density_type 

58 self.c = c 

59 density = paw.density 

60 if 'coarse' in density_type: 

61 gd = density.gd 

62 else: 

63 gd = density.finegd 

64 h_v = gd.h_cv[self.c] 

65 

66 self.writer = Writer(filename, paw.world, mode='w', 

67 tag=self.__class__.ulmtag) 

68 self.writer.write(version=self.__class__.version, 

69 density_type=self.density_type, 

70 c=self.c, h_v=h_v) 

71 self.writer.sync() 

72 else: 

73 # Read settings 

74 reader = LineDensityReader(filename) 

75 self.density_type = reader.density_type 

76 self.c = reader.c 

77 h_v = reader.h_v 

78 

79 # Append to earlier file 

80 self.writer = Writer(filename, paw.world, mode='a', 

81 tag=self.__class__.ulmtag) 

82 self.dx = np.linalg.norm(h_v) 

83 assert self.density_type in ['comp', 'pseudo', 'pseudocoarse'], \ 

84 f'Unknown density type: {self.density_type}' 

85 

86 def _update(self, paw): 

87 # Write metadata to main writer 

88 self.writer.write(niter=paw.niter, time=paw.time, action=paw.action) 

89 if paw.action == 'kick': 

90 self.writer.write(kick_strength=paw.kick_strength) 

91 

92 density = paw.density 

93 

94 if self.density_type == 'comp': 

95 rho_g = density.rhot_g 

96 gd = density.finegd 

97 elif self.density_type == 'pseudo': 

98 rho_g = density.nt_sg.sum(axis=0) 

99 gd = density.finegd 

100 elif self.density_type == 'pseudocoarse': 

101 rho_g = density.nt_sG.sum(axis=0) 

102 gd = density.gd 

103 else: 

104 raise RuntimeError('Unknown density type: %s' % self.density_type) 

105 

106 rho_g = gd.collect(rho_g) 

107 # TODO: instead of collecting on master, 

108 # do integration in each domain and gather/sum to master 

109 if gd.comm.rank == 0: 

110 sum_axis = tuple({0, 1, 2}.difference({self.c})) 

111 rho_x = rho_g.sum(axis=sum_axis) * (gd.dv / self.dx) 

112 else: 

113 rho_x = None 

114 self.writer.write(rho_x=rho_x) 

115 

116 # Sync the main writer 

117 self.writer.sync() 

118 

119 def __del__(self): 

120 self.writer.close()