Coverage for gpaw/lcaotddft/linedensity.py: 20%
86 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-07-09 00:21 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-07-09 00:21 +0000
1import numpy as np
3import ase.io.ulm as ulm
5from gpaw.io import Writer
6from gpaw.lcaotddft.observer import TDDFTObserver
9class LineDensityReader:
10 def __init__(self, filename):
11 self.reader = ulm.Reader(filename)
12 tag = self.reader.get_tag()
13 if tag != LineDensityWriter.ulmtag:
14 raise RuntimeError('Unknown tag %s' % tag)
15 self.filename = filename
17 def __getattr__(self, attr):
18 return getattr(self.reader, attr)
20 @property
21 def kick_strength(self):
22 for ri in range(1, len(self.reader)):
23 r = self.reader[ri]
24 if r.action == 'kick':
25 return r.kick_strength
27 def read(self, skip_duplicates=True, zero_pad='both'):
28 assert zero_pad in ['both', False, None]
29 time_t = []
30 rho_tx = []
31 time0 = -np.inf
32 for ri in range(1, len(self.reader)):
33 r = self.reader[ri]
34 time = r.time
35 if time > time0 or not skip_duplicates:
36 time_t.append(time)
37 rho_x = r.rho_x
38 if zero_pad == 'both':
39 rho_x = np.concatenate(([0], rho_x, [0]))
40 rho_tx.append(rho_x)
41 time0 = time
42 time_t = np.array(time_t)
43 rho_tx = np.array(rho_tx)
44 return time_t, rho_tx
46 def close(self):
47 self.reader.close()
50class LineDensityWriter(TDDFTObserver):
51 version = 1
52 ulmtag = 'LineDensity'
54 def __init__(self, paw, filename, c=0, density_type='comp', interval=1):
55 TDDFTObserver.__init__(self, paw, interval)
56 if paw.niter == 0:
57 self.density_type = density_type
58 self.c = c
59 density = paw.density
60 if 'coarse' in density_type:
61 gd = density.gd
62 else:
63 gd = density.finegd
64 h_v = gd.h_cv[self.c]
66 self.writer = Writer(filename, paw.world, mode='w',
67 tag=self.__class__.ulmtag)
68 self.writer.write(version=self.__class__.version,
69 density_type=self.density_type,
70 c=self.c, h_v=h_v)
71 self.writer.sync()
72 else:
73 # Read settings
74 reader = LineDensityReader(filename)
75 self.density_type = reader.density_type
76 self.c = reader.c
77 h_v = reader.h_v
79 # Append to earlier file
80 self.writer = Writer(filename, paw.world, mode='a',
81 tag=self.__class__.ulmtag)
82 self.dx = np.linalg.norm(h_v)
83 assert self.density_type in ['comp', 'pseudo', 'pseudocoarse'], \
84 f'Unknown density type: {self.density_type}'
86 def _update(self, paw):
87 # Write metadata to main writer
88 self.writer.write(niter=paw.niter, time=paw.time, action=paw.action)
89 if paw.action == 'kick':
90 self.writer.write(kick_strength=paw.kick_strength)
92 density = paw.density
94 if self.density_type == 'comp':
95 rho_g = density.rhot_g
96 gd = density.finegd
97 elif self.density_type == 'pseudo':
98 rho_g = density.nt_sg.sum(axis=0)
99 gd = density.finegd
100 elif self.density_type == 'pseudocoarse':
101 rho_g = density.nt_sG.sum(axis=0)
102 gd = density.gd
103 else:
104 raise RuntimeError('Unknown density type: %s' % self.density_type)
106 rho_g = gd.collect(rho_g)
107 # TODO: instead of collecting on master,
108 # do integration in each domain and gather/sum to master
109 if gd.comm.rank == 0:
110 sum_axis = tuple({0, 1, 2}.difference({self.c}))
111 rho_x = rho_g.sum(axis=sum_axis) * (gd.dv / self.dx)
112 else:
113 rho_x = None
114 self.writer.write(rho_x=rho_x)
116 # Sync the main writer
117 self.writer.sync()
119 def __del__(self):
120 self.writer.close()