Coverage for gpaw/lrtddft/excited_state.py: 75%
320 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-07-08 00:17 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-07-08 00:17 +0000
1from pathlib import Path
2import numpy as np
3from typing import Dict, Any
5from ase.units import Hartree
6from ase.utils.timing import Timer
7from ase.calculators.calculator import Calculator
9import gpaw.mpi as mpi
10from gpaw.calculator import GPAW
11from gpaw import __version__, restart
12from gpaw.density import RealSpaceDensity
13from gpaw.lrtddft import LrTDDFT
14from gpaw.lrtddft.finite_differences import FiniteDifference
15from gpaw.lrtddft.excitation import ExcitationLogger
16from gpaw.utilities.blas import axpy
17from gpaw.wavefunctions.lcao import LCAOWaveFunctions
20class ExcitedState(GPAW):
21 nparts = 1
22 implemented_properties = ['energy', 'forces']
23 default_parameters: Dict[str, Any] = {}
25 def __init__(self, lrtddft, index, d=0.001, log=None, txt='-',
26 parallel=1, communicator=None):
27 """ExcitedState object.
28 lrtddft:
29 LrTDDFT object
30 index:
31 Excited state index
32 parallel: int
33 Can be used to parallelize the numerical force calculation
34 over images. Splits world into # parallel workers.
35 E. g. if world.size is 20 and parallel is 10, then 2 cores
36 are used per GPAW and LrTDDFT calculation.
37 Defaults to 1 (i.e. use all cores).
38 over images.
39 """
40 self.timer = Timer()
41 if isinstance(index, int):
42 self.index = UnconstraintIndex(index)
43 else:
44 self.index = index
46 if communicator is None:
47 try:
48 communicator = lrtddft.calculator.wfs.world
49 except AttributeError:
50 communicator = mpi.world
51 self.world = communicator
53 self.lrtddft = lrtddft
54 self.calculator = self.lrtddft.calculator
56 Calculator.__init__(self)
58 self.log = self.calculator.log
59 self.atoms = self.calculator.atoms
61 self.d = d
63 self.results = {}
64 self.parameters = {'d': d, 'index': self.index}
66 # set output
67 if log:
68 self.log = log
69 else:
70 self.log = ExcitationLogger(mpi.world)
71 self.log.fd = txt
73 self.log('#', self.__class__.__name__, __version__)
74 self.log('#', self.index)
75 self.log('# Force displacement:', self.d)
76 self.log
78 self.split(parallel)
80 @property
81 def name(self):
82 return 'excitedstate'
84 def __del__(self):
85 self.timer.write(self.log.fd)
87 def set(self, **kwargs):
88 self.calculator.set(**kwargs)
90 def set_positions(self, atoms):
91 """Update the positions of the atoms."""
92 self.atoms = atoms.copy()
93 self.results = {}
95 def write(self, dirname, mode=''):
96 """Write yourself to a directory
98 Paramaters
99 ----------
100 dirname: string or path
101 Write the files to the directory dirname. The directory
102 is created in case it does not exist.
103 mode: string
104 Mode for writing the calculator (GPAW object). Default ''.
105 """
106 directory = Path(dirname)
107 directory.mkdir(parents=True, exist_ok=True)
108 filename = str(directory / 'exst')
110 self.calculator.write(filename=filename, mode=mode)
111 self.lrtddft.write(filename=filename + '.lr.dat.gz')
113 if self.world.rank == 0:
114 with open(filename + '.exst', 'w') as f:
115 f.write('# ' + self.__class__.__name__ + __version__ + '\n')
116 f.write(f'Displacement: {self.d}' + '\n')
117 f.write('Index: ' + self.index.__class__.__name__ + '\n')
118 for k, v in self.index.__dict__.items():
119 f.write(f'{k}, {v}' + '\n')
120 self.world.barrier()
122 @classmethod
123 def read(cls, dirname, communicator=None, log=None, txt=None):
124 """Read ExcitedState from a directory"""
125 filename = str(Path(dirname) / 'exst')
126 atoms, calculator = restart(filename,
127 communicator=communicator, txt=txt)
128 if log is not None:
129 calculator.log = log
130 E0 = calculator.get_potential_energy()
131 lrtddft = LrTDDFT.read(filename + '.lr.dat.gz',
132 log=calculator.log)
133 lrtddft.calculator = calculator
135 with open(filename + '.exst') as f:
136 f.readline()
137 d = f.readline().replace('\n', '').split()[1]
138 indextype = f.readline().replace('\n', '').split()[1]
139 if indextype == 'UnconstraintIndex':
140 iex = int(f.readline().replace('\n', '').split()[1])
141 index = UnconstraintIndex(iex)
142 else:
143 direction = f.readline().replace('\n', '').split()[1]
144 if direction in [str(0), str(1), str(2)]:
145 direction = int(direction)
146 else:
147 direction = None
149 val = f.readline().replace('\n', '').split()
150 if indextype == 'MinimalOSIndex':
152 index = MinimalOSIndex(float(val[1]), direction)
153 else:
154 emin = float(val[2])
155 emax = float(val[3].replace(']', ''))
156 index = MaximalOSIndex([emin, emax], direction)
158 exst = cls(lrtddft, index, d, communicator=communicator,
159 txt=calculator.log.oldfd)
160 index = exst.index.apply(lrtddft)
161 exst.results['energy'] = E0 + lrtddft[index].energy * Hartree
163 return exst
165 def calculation_required(self, atoms, quantities):
166 if len(quantities) == 0:
167 return False
169 if self.atoms is None:
170 return True
172 elif (len(atoms) != len(self.atoms) or
173 (atoms.get_atomic_numbers() !=
174 self.atoms.get_atomic_numbers()).any() or
175 (atoms.get_initial_magnetic_moments() !=
176 self.atoms.get_initial_magnetic_moments()).any() or
177 (atoms.get_cell() != self.atoms.get_cell()).any() or
178 (atoms.get_pbc() != self.atoms.get_pbc()).any()):
179 return True
180 elif (atoms.get_positions() !=
181 self.atoms.get_positions()).any():
182 return True
184 for quantity in ['energy', 'forces']:
185 if quantity in quantities:
186 quantities.remove(quantity)
187 if quantity not in self.results:
188 return True
189 return len(quantities) > 0
191 def check_state(self, atoms, tol=1e-15):
192 system_changes = GPAW.check_state(self.calculator, atoms, tol)
193 return system_changes
195 def calculate(self, atoms, properties=['energy'],
196 system_changes=['cell']):
197 """Evaluate your energy if needed."""
198 self.set_positions(atoms)
200 self.calculator.calculate(atoms)
201 E0 = self.calculator.get_potential_energy()
202 atoms.calc = self
204 if hasattr(self, 'density'):
205 del self.density
206 self.lrtddft.forced_update()
207 self.lrtddft.diagonalize()
209 index = self.index.apply(self.lrtddft)
211 energy = E0 + self.lrtddft[index].energy * Hartree
213 self.log('--------------------------')
214 self.log('Excited state')
215 self.log(self.index)
216 self.log(f'Energy: {energy}')
217 self.log()
219 self.results['energy'] = energy
221 def split(self, nparts):
222 """Split world into parts and allow log in masters' part"""
223 # only split once
224 assert self.nparts == 1
226 if self.world.size == 1 or nparts == 1:
227 return
229 assert self.world.size % nparts == 0
230 self.nparts = nparts
231 allranks = np.array(range(self.world.size), dtype=int)
232 allranks = allranks.reshape(nparts, self.world.size // nparts)
234 # force hard reset
235 self.calculator.reset()
236 self.calculator.set(
237 external=self.calculator.parameters['external'])
239 for ranks in allranks:
240 if self.world.rank in ranks:
241 self.world = self.world.new_communicator(ranks)
242 self.calculator.world = self.world
243 if 0 not in ranks:
244 self.calculator.log.fd = None
245 self.lrtddft.log.fd = None
246 return
248 def get_forces(self, atoms=None, save=False):
249 """Get finite-difference forces
250 If save = True, restartfiles for every displacement are given
251 """
252 if atoms is None:
253 atoms = self.atoms
255 if self.calculation_required(atoms, ['forces']):
256 # do the ground state calculation to set all
257 # ranks to the same density to start with
258 p0 = atoms.get_positions().copy()
259 atoms.calc = self
261 finite = FiniteDifference(
262 atoms=atoms,
263 propertyfunction=atoms.get_potential_energy,
264 save=save,
265 name="excited_state", ending='.gpw',
266 d=self.d, log=self.log, parallel=self.nparts)
267 F_av = finite.run()
269 atoms.set_positions(p0)
270 self.calculate(atoms)
271 self.results['forces'] = F_av
273 self.log('Excited state forces in eV/Ang:')
274 symbols = self.atoms.get_chemical_symbols()
275 for a, symbol in enumerate(symbols):
276 self.log('%3d %-2s %10.5f %10.5f %10.5f' %
277 ((a, symbol) +
278 tuple(self.results['forces'][a])))
280 return self.results['forces']
282 def forces_indexn(self, index):
283 """ If restartfiles are created from the force calculation,
284 this function allows the calculation of forces for every
285 excited state index.
286 """
287 atoms = self.atoms
289 def reforce(self, name):
290 excalc = ExcitedState(index=index, restart=name)
291 return excalc.get_potential_energy()
293 fd = FiniteDifference(
294 atoms=atoms, save=True,
295 propertyfunction=self.atoms.get_potential_energy,
296 name="excited_state", ending='.gpw',
297 d=self.d, parallel=0)
298 atoms.calc = self
300 return fd.restart(reforce)
302 def get_stress(self, atoms):
303 """Return the stress for the current state of the Atoms."""
304 raise NotImplementedError
306 def initialize_density(self, method='dipole'):
307 if hasattr(self, 'density') and self.density.method == method:
308 return
310 gsdensity = self.calculator.density
311 lr = self.lrtddft
312 self.density = ExcitedStateDensity(
313 gsdensity.gd, gsdensity.finegd, lr.kss.npspins,
314 gsdensity.collinear,
315 gsdensity.charge,
316 method=method, redistributor=gsdensity.redistributor)
317 index = self.index.apply(self.lrtddft)
318 self.density.initialize(self.lrtddft, index)
319 self.density.update(self.calculator.wfs)
321 def get_pseudo_density(self, **kwargs):
322 """Return pseudo-density array."""
323 method = kwargs.pop('method', 'dipole')
324 self.initialize_density(method)
325 return GPAW.get_pseudo_density(self, **kwargs)
327 def get_all_electron_density(self, **kwargs):
328 """Return all electron density array."""
329 method = kwargs.pop('method', 'dipole')
330 self.initialize_density(method)
331 return GPAW.get_all_electron_density(self, **kwargs)
334class UnconstraintIndex:
336 def __init__(self, index):
337 self.index = index
339 def apply(self, *argv):
340 return self.index
342 def __str__(self):
343 return (self.__class__.__name__ + '(' + str(self.index) + ')')
345 def todict(self):
346 return {'class': self.__class__.__name__, 'index': self.index}
349class MinimalOSIndex:
351 """
352 Constraint on minimal oscillator strength.
354 Searches for the first excitation that has a larger
355 oscillator strength than the given minimum.
357 direction:
358 None: averaged (default)
359 0, 1, 2: x, y, z
360 """
362 def __init__(self, fmin=0.02, direction=None):
363 self.fmin = fmin
364 self.direction = direction
366 def apply(self, lrtddft):
367 i = 0
368 fmax = 0.
369 idir = 0
370 if self.direction is not None:
371 idir = 1 + self.direction
372 while i < len(lrtddft):
373 ex = lrtddft[i]
374 f = ex.get_oscillator_strength()[idir]
375 fmax = max(f, fmax)
376 if f > self.fmin:
377 return i
378 i += 1
379 error = 'The intensity constraint |f| > ' + str(self.fmin) + ' '
380 error += 'can not be satisfied (max(f) = ' + str(fmax) + ').'
381 raise RuntimeError(error)
384class MaximalOSIndex:
386 """
387 Select maximal oscillator strength.
389 Searches for the excitation with maximal oscillator strength
390 in a given energy range.
392 energy_range:
393 None: take all (default)
394 [Emin, Emax]: take only transition in this energy range
395 Emax: the same as [0, Emax]
396 direction:
397 None: averaged (default)
398 0, 1, 2: x, y, z
399 """
401 def __init__(self, energy_range=None, direction=None):
402 if energy_range is None:
403 energy_range = np.array([0.0, 1.e32])
404 elif isinstance(energy_range, (int, float)):
405 energy_range = np.array([0.0, energy_range]) / Hartree
406 self.energy_range = energy_range
408 self.direction = direction
410 def apply(self, lrtddft):
411 index = None
412 fmax = 0.
413 idir = 0
414 if self.direction is not None:
415 idir = 1 + self.direction
416 emin, emax = self.energy_range
417 for i, ex in enumerate(lrtddft):
418 f = ex.get_oscillator_strength()[idir]
419 e = ex.get_energy()
420 if e >= emin and e < emax and f > fmax:
421 fmax = f
422 index = i
423 if index is None:
424 raise RuntimeError('No transition in the energy range ' +
425 '[%g,%g]' % self.energy_range)
426 return index
429class ExcitedStateDensity(RealSpaceDensity):
431 """Approximate excited state density object."""
433 def __init__(self, *args, **kwargs):
434 self.method = kwargs.pop('method', 'dipole')
435 RealSpaceDensity.__init__(self, *args, **kwargs)
436 self.lrtddft = None
437 self.index = None
438 self.gsdensity = None
439 self.nbands = None
440 self.wocc_sn = None
441 self.wunocc_sn = None
443 def initialize(self, lrtddft, index):
444 self.lrtddft = lrtddft
445 self.index = index
447 calc = lrtddft.calculator
448 self.gsdensity = calc.density
449 self.gd = self.gsdensity.gd
450 self.nbands = calc.wfs.bd.nbands
452 # obtain weights
453 ex = lrtddft[index]
454 wocc_sn = np.zeros((self.nspins, self.nbands))
455 wunocc_sn = np.zeros((self.nspins, self.nbands))
456 for f, k in zip(ex.f, ex.kss):
457 # XXX why not k.fij * k.energy / energy ???
458 if self.method == 'dipole':
459 erat = k.energy / ex.energy
460 elif self.method == 'orthogonal':
461 erat = 1.
462 else:
463 raise NotImplementedError(
464 'method should be either "dipole" or "orthogonal"')
465 wocc_sn[k.pspin, k.i] += erat * f ** 2
466 wunocc_sn[k.pspin, k.j] += erat * f ** 2
467 self.wocc_sn = wocc_sn
468 self.wunocc_sn = wunocc_sn
470 RealSpaceDensity.initialize(
471 self, calc.wfs.setups, calc.timer, None, False)
473 self.set_positions(calc.spos_ac, calc.wfs.atom_partition)
475 D_asp = {}
476 for a, D_sp in self.gsdensity.D_asp.items():
477 repeats = self.nspins // self.gsdensity.nspins
478 # XXX does this work always?
479 D_asp[a] = (1. * D_sp).repeat(repeats, axis=0)
480 self.update_atomic_density_matrices(D_asp)
482 def update(self, wfs):
483 self.timer.start('Density')
484 self.timer.start('Pseudo density')
485 self.calculate_pseudo_density(wfs)
486 self.timer.stop('Pseudo density')
487 self.timer.start('Atomic density matrices')
488 f_un = []
489 for kpt in wfs.kpt_u:
490 f_n = kpt.f_n - self.wocc_sn[kpt.s] + self.wunocc_sn[kpt.s]
491 if self.nspins > self.gsdensity.nspins:
492 f_n = kpt.f_n - self.wocc_sn[1] + self.wunocc_sn[1]
493 f_un.append(f_n)
494 wfs.calculate_atomic_density_matrices_with_occupation(self.D_asp,
495 f_un)
496 self.timer.stop('Atomic density matrices')
497 self.timer.start('Multipole moments')
498 comp_charge, _Q_aL = self.calculate_multipole_moments()
499 self.timer.stop('Multipole moments')
501 if isinstance(wfs, LCAOWaveFunctions):
502 self.timer.start('Normalize')
503 self.normalize(comp_charge)
504 self.timer.stop('Normalize')
506 self.timer.stop('Density')
508 def calculate_pseudo_density(self, wfs):
509 """Calculate nt_sG from scratch.
511 nt_sG will be equal to nct_G plus the contribution from
512 wfs.add_to_density().
513 """
514 nvspins = wfs.kd.nspins
515 npspins = self.nspins
516 self.nt_xG = self.gd.zeros(self.ncomponents)
518 for s in range(npspins):
519 for kpt in wfs.kpt_u:
520 if s == kpt.s or npspins > nvspins:
521 f_n = kpt.f_n / (1. + int(npspins > nvspins))
522 for f, psit_G in zip((f_n - self.wocc_sn[s] +
523 self.wunocc_sn[s]),
524 kpt.psit_nG):
525 axpy(f, psit_G ** 2, self.nt_sG[s])
526 self.nt_sG[:] += self.nct_G