Coverage for gpaw/lrtddft/excited_state.py: 75%

320 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-07-08 00:17 +0000

1from pathlib import Path 

2import numpy as np 

3from typing import Dict, Any 

4 

5from ase.units import Hartree 

6from ase.utils.timing import Timer 

7from ase.calculators.calculator import Calculator 

8 

9import gpaw.mpi as mpi 

10from gpaw.calculator import GPAW 

11from gpaw import __version__, restart 

12from gpaw.density import RealSpaceDensity 

13from gpaw.lrtddft import LrTDDFT 

14from gpaw.lrtddft.finite_differences import FiniteDifference 

15from gpaw.lrtddft.excitation import ExcitationLogger 

16from gpaw.utilities.blas import axpy 

17from gpaw.wavefunctions.lcao import LCAOWaveFunctions 

18 

19 

20class ExcitedState(GPAW): 

21 nparts = 1 

22 implemented_properties = ['energy', 'forces'] 

23 default_parameters: Dict[str, Any] = {} 

24 

25 def __init__(self, lrtddft, index, d=0.001, log=None, txt='-', 

26 parallel=1, communicator=None): 

27 """ExcitedState object. 

28 lrtddft: 

29 LrTDDFT object 

30 index: 

31 Excited state index 

32 parallel: int 

33 Can be used to parallelize the numerical force calculation 

34 over images. Splits world into # parallel workers. 

35 E. g. if world.size is 20 and parallel is 10, then 2 cores 

36 are used per GPAW and LrTDDFT calculation. 

37 Defaults to 1 (i.e. use all cores). 

38 over images. 

39 """ 

40 self.timer = Timer() 

41 if isinstance(index, int): 

42 self.index = UnconstraintIndex(index) 

43 else: 

44 self.index = index 

45 

46 if communicator is None: 

47 try: 

48 communicator = lrtddft.calculator.wfs.world 

49 except AttributeError: 

50 communicator = mpi.world 

51 self.world = communicator 

52 

53 self.lrtddft = lrtddft 

54 self.calculator = self.lrtddft.calculator 

55 

56 Calculator.__init__(self) 

57 

58 self.log = self.calculator.log 

59 self.atoms = self.calculator.atoms 

60 

61 self.d = d 

62 

63 self.results = {} 

64 self.parameters = {'d': d, 'index': self.index} 

65 

66 # set output 

67 if log: 

68 self.log = log 

69 else: 

70 self.log = ExcitationLogger(mpi.world) 

71 self.log.fd = txt 

72 

73 self.log('#', self.__class__.__name__, __version__) 

74 self.log('#', self.index) 

75 self.log('# Force displacement:', self.d) 

76 self.log 

77 

78 self.split(parallel) 

79 

80 @property 

81 def name(self): 

82 return 'excitedstate' 

83 

84 def __del__(self): 

85 self.timer.write(self.log.fd) 

86 

87 def set(self, **kwargs): 

88 self.calculator.set(**kwargs) 

89 

90 def set_positions(self, atoms): 

91 """Update the positions of the atoms.""" 

92 self.atoms = atoms.copy() 

93 self.results = {} 

94 

95 def write(self, dirname, mode=''): 

96 """Write yourself to a directory 

97 

98 Paramaters 

99 ---------- 

100 dirname: string or path 

101 Write the files to the directory dirname. The directory 

102 is created in case it does not exist. 

103 mode: string 

104 Mode for writing the calculator (GPAW object). Default ''. 

105 """ 

106 directory = Path(dirname) 

107 directory.mkdir(parents=True, exist_ok=True) 

108 filename = str(directory / 'exst') 

109 

110 self.calculator.write(filename=filename, mode=mode) 

111 self.lrtddft.write(filename=filename + '.lr.dat.gz') 

112 

113 if self.world.rank == 0: 

114 with open(filename + '.exst', 'w') as f: 

115 f.write('# ' + self.__class__.__name__ + __version__ + '\n') 

116 f.write(f'Displacement: {self.d}' + '\n') 

117 f.write('Index: ' + self.index.__class__.__name__ + '\n') 

118 for k, v in self.index.__dict__.items(): 

119 f.write(f'{k}, {v}' + '\n') 

120 self.world.barrier() 

121 

122 @classmethod 

123 def read(cls, dirname, communicator=None, log=None, txt=None): 

124 """Read ExcitedState from a directory""" 

125 filename = str(Path(dirname) / 'exst') 

126 atoms, calculator = restart(filename, 

127 communicator=communicator, txt=txt) 

128 if log is not None: 

129 calculator.log = log 

130 E0 = calculator.get_potential_energy() 

131 lrtddft = LrTDDFT.read(filename + '.lr.dat.gz', 

132 log=calculator.log) 

133 lrtddft.calculator = calculator 

134 

135 with open(filename + '.exst') as f: 

136 f.readline() 

137 d = f.readline().replace('\n', '').split()[1] 

138 indextype = f.readline().replace('\n', '').split()[1] 

139 if indextype == 'UnconstraintIndex': 

140 iex = int(f.readline().replace('\n', '').split()[1]) 

141 index = UnconstraintIndex(iex) 

142 else: 

143 direction = f.readline().replace('\n', '').split()[1] 

144 if direction in [str(0), str(1), str(2)]: 

145 direction = int(direction) 

146 else: 

147 direction = None 

148 

149 val = f.readline().replace('\n', '').split() 

150 if indextype == 'MinimalOSIndex': 

151 

152 index = MinimalOSIndex(float(val[1]), direction) 

153 else: 

154 emin = float(val[2]) 

155 emax = float(val[3].replace(']', '')) 

156 index = MaximalOSIndex([emin, emax], direction) 

157 

158 exst = cls(lrtddft, index, d, communicator=communicator, 

159 txt=calculator.log.oldfd) 

160 index = exst.index.apply(lrtddft) 

161 exst.results['energy'] = E0 + lrtddft[index].energy * Hartree 

162 

163 return exst 

164 

165 def calculation_required(self, atoms, quantities): 

166 if len(quantities) == 0: 

167 return False 

168 

169 if self.atoms is None: 

170 return True 

171 

172 elif (len(atoms) != len(self.atoms) or 

173 (atoms.get_atomic_numbers() != 

174 self.atoms.get_atomic_numbers()).any() or 

175 (atoms.get_initial_magnetic_moments() != 

176 self.atoms.get_initial_magnetic_moments()).any() or 

177 (atoms.get_cell() != self.atoms.get_cell()).any() or 

178 (atoms.get_pbc() != self.atoms.get_pbc()).any()): 

179 return True 

180 elif (atoms.get_positions() != 

181 self.atoms.get_positions()).any(): 

182 return True 

183 

184 for quantity in ['energy', 'forces']: 

185 if quantity in quantities: 

186 quantities.remove(quantity) 

187 if quantity not in self.results: 

188 return True 

189 return len(quantities) > 0 

190 

191 def check_state(self, atoms, tol=1e-15): 

192 system_changes = GPAW.check_state(self.calculator, atoms, tol) 

193 return system_changes 

194 

195 def calculate(self, atoms, properties=['energy'], 

196 system_changes=['cell']): 

197 """Evaluate your energy if needed.""" 

198 self.set_positions(atoms) 

199 

200 self.calculator.calculate(atoms) 

201 E0 = self.calculator.get_potential_energy() 

202 atoms.calc = self 

203 

204 if hasattr(self, 'density'): 

205 del self.density 

206 self.lrtddft.forced_update() 

207 self.lrtddft.diagonalize() 

208 

209 index = self.index.apply(self.lrtddft) 

210 

211 energy = E0 + self.lrtddft[index].energy * Hartree 

212 

213 self.log('--------------------------') 

214 self.log('Excited state') 

215 self.log(self.index) 

216 self.log(f'Energy: {energy}') 

217 self.log() 

218 

219 self.results['energy'] = energy 

220 

221 def split(self, nparts): 

222 """Split world into parts and allow log in masters' part""" 

223 # only split once 

224 assert self.nparts == 1 

225 

226 if self.world.size == 1 or nparts == 1: 

227 return 

228 

229 assert self.world.size % nparts == 0 

230 self.nparts = nparts 

231 allranks = np.array(range(self.world.size), dtype=int) 

232 allranks = allranks.reshape(nparts, self.world.size // nparts) 

233 

234 # force hard reset 

235 self.calculator.reset() 

236 self.calculator.set( 

237 external=self.calculator.parameters['external']) 

238 

239 for ranks in allranks: 

240 if self.world.rank in ranks: 

241 self.world = self.world.new_communicator(ranks) 

242 self.calculator.world = self.world 

243 if 0 not in ranks: 

244 self.calculator.log.fd = None 

245 self.lrtddft.log.fd = None 

246 return 

247 

248 def get_forces(self, atoms=None, save=False): 

249 """Get finite-difference forces 

250 If save = True, restartfiles for every displacement are given 

251 """ 

252 if atoms is None: 

253 atoms = self.atoms 

254 

255 if self.calculation_required(atoms, ['forces']): 

256 # do the ground state calculation to set all 

257 # ranks to the same density to start with 

258 p0 = atoms.get_positions().copy() 

259 atoms.calc = self 

260 

261 finite = FiniteDifference( 

262 atoms=atoms, 

263 propertyfunction=atoms.get_potential_energy, 

264 save=save, 

265 name="excited_state", ending='.gpw', 

266 d=self.d, log=self.log, parallel=self.nparts) 

267 F_av = finite.run() 

268 

269 atoms.set_positions(p0) 

270 self.calculate(atoms) 

271 self.results['forces'] = F_av 

272 

273 self.log('Excited state forces in eV/Ang:') 

274 symbols = self.atoms.get_chemical_symbols() 

275 for a, symbol in enumerate(symbols): 

276 self.log('%3d %-2s %10.5f %10.5f %10.5f' % 

277 ((a, symbol) + 

278 tuple(self.results['forces'][a]))) 

279 

280 return self.results['forces'] 

281 

282 def forces_indexn(self, index): 

283 """ If restartfiles are created from the force calculation, 

284 this function allows the calculation of forces for every 

285 excited state index. 

286 """ 

287 atoms = self.atoms 

288 

289 def reforce(self, name): 

290 excalc = ExcitedState(index=index, restart=name) 

291 return excalc.get_potential_energy() 

292 

293 fd = FiniteDifference( 

294 atoms=atoms, save=True, 

295 propertyfunction=self.atoms.get_potential_energy, 

296 name="excited_state", ending='.gpw', 

297 d=self.d, parallel=0) 

298 atoms.calc = self 

299 

300 return fd.restart(reforce) 

301 

302 def get_stress(self, atoms): 

303 """Return the stress for the current state of the Atoms.""" 

304 raise NotImplementedError 

305 

306 def initialize_density(self, method='dipole'): 

307 if hasattr(self, 'density') and self.density.method == method: 

308 return 

309 

310 gsdensity = self.calculator.density 

311 lr = self.lrtddft 

312 self.density = ExcitedStateDensity( 

313 gsdensity.gd, gsdensity.finegd, lr.kss.npspins, 

314 gsdensity.collinear, 

315 gsdensity.charge, 

316 method=method, redistributor=gsdensity.redistributor) 

317 index = self.index.apply(self.lrtddft) 

318 self.density.initialize(self.lrtddft, index) 

319 self.density.update(self.calculator.wfs) 

320 

321 def get_pseudo_density(self, **kwargs): 

322 """Return pseudo-density array.""" 

323 method = kwargs.pop('method', 'dipole') 

324 self.initialize_density(method) 

325 return GPAW.get_pseudo_density(self, **kwargs) 

326 

327 def get_all_electron_density(self, **kwargs): 

328 """Return all electron density array.""" 

329 method = kwargs.pop('method', 'dipole') 

330 self.initialize_density(method) 

331 return GPAW.get_all_electron_density(self, **kwargs) 

332 

333 

334class UnconstraintIndex: 

335 

336 def __init__(self, index): 

337 self.index = index 

338 

339 def apply(self, *argv): 

340 return self.index 

341 

342 def __str__(self): 

343 return (self.__class__.__name__ + '(' + str(self.index) + ')') 

344 

345 def todict(self): 

346 return {'class': self.__class__.__name__, 'index': self.index} 

347 

348 

349class MinimalOSIndex: 

350 

351 """ 

352 Constraint on minimal oscillator strength. 

353 

354 Searches for the first excitation that has a larger 

355 oscillator strength than the given minimum. 

356 

357 direction: 

358 None: averaged (default) 

359 0, 1, 2: x, y, z 

360 """ 

361 

362 def __init__(self, fmin=0.02, direction=None): 

363 self.fmin = fmin 

364 self.direction = direction 

365 

366 def apply(self, lrtddft): 

367 i = 0 

368 fmax = 0. 

369 idir = 0 

370 if self.direction is not None: 

371 idir = 1 + self.direction 

372 while i < len(lrtddft): 

373 ex = lrtddft[i] 

374 f = ex.get_oscillator_strength()[idir] 

375 fmax = max(f, fmax) 

376 if f > self.fmin: 

377 return i 

378 i += 1 

379 error = 'The intensity constraint |f| > ' + str(self.fmin) + ' ' 

380 error += 'can not be satisfied (max(f) = ' + str(fmax) + ').' 

381 raise RuntimeError(error) 

382 

383 

384class MaximalOSIndex: 

385 

386 """ 

387 Select maximal oscillator strength. 

388 

389 Searches for the excitation with maximal oscillator strength 

390 in a given energy range. 

391 

392 energy_range: 

393 None: take all (default) 

394 [Emin, Emax]: take only transition in this energy range 

395 Emax: the same as [0, Emax] 

396 direction: 

397 None: averaged (default) 

398 0, 1, 2: x, y, z 

399 """ 

400 

401 def __init__(self, energy_range=None, direction=None): 

402 if energy_range is None: 

403 energy_range = np.array([0.0, 1.e32]) 

404 elif isinstance(energy_range, (int, float)): 

405 energy_range = np.array([0.0, energy_range]) / Hartree 

406 self.energy_range = energy_range 

407 

408 self.direction = direction 

409 

410 def apply(self, lrtddft): 

411 index = None 

412 fmax = 0. 

413 idir = 0 

414 if self.direction is not None: 

415 idir = 1 + self.direction 

416 emin, emax = self.energy_range 

417 for i, ex in enumerate(lrtddft): 

418 f = ex.get_oscillator_strength()[idir] 

419 e = ex.get_energy() 

420 if e >= emin and e < emax and f > fmax: 

421 fmax = f 

422 index = i 

423 if index is None: 

424 raise RuntimeError('No transition in the energy range ' + 

425 '[%g,%g]' % self.energy_range) 

426 return index 

427 

428 

429class ExcitedStateDensity(RealSpaceDensity): 

430 

431 """Approximate excited state density object.""" 

432 

433 def __init__(self, *args, **kwargs): 

434 self.method = kwargs.pop('method', 'dipole') 

435 RealSpaceDensity.__init__(self, *args, **kwargs) 

436 self.lrtddft = None 

437 self.index = None 

438 self.gsdensity = None 

439 self.nbands = None 

440 self.wocc_sn = None 

441 self.wunocc_sn = None 

442 

443 def initialize(self, lrtddft, index): 

444 self.lrtddft = lrtddft 

445 self.index = index 

446 

447 calc = lrtddft.calculator 

448 self.gsdensity = calc.density 

449 self.gd = self.gsdensity.gd 

450 self.nbands = calc.wfs.bd.nbands 

451 

452 # obtain weights 

453 ex = lrtddft[index] 

454 wocc_sn = np.zeros((self.nspins, self.nbands)) 

455 wunocc_sn = np.zeros((self.nspins, self.nbands)) 

456 for f, k in zip(ex.f, ex.kss): 

457 # XXX why not k.fij * k.energy / energy ??? 

458 if self.method == 'dipole': 

459 erat = k.energy / ex.energy 

460 elif self.method == 'orthogonal': 

461 erat = 1. 

462 else: 

463 raise NotImplementedError( 

464 'method should be either "dipole" or "orthogonal"') 

465 wocc_sn[k.pspin, k.i] += erat * f ** 2 

466 wunocc_sn[k.pspin, k.j] += erat * f ** 2 

467 self.wocc_sn = wocc_sn 

468 self.wunocc_sn = wunocc_sn 

469 

470 RealSpaceDensity.initialize( 

471 self, calc.wfs.setups, calc.timer, None, False) 

472 

473 self.set_positions(calc.spos_ac, calc.wfs.atom_partition) 

474 

475 D_asp = {} 

476 for a, D_sp in self.gsdensity.D_asp.items(): 

477 repeats = self.nspins // self.gsdensity.nspins 

478 # XXX does this work always? 

479 D_asp[a] = (1. * D_sp).repeat(repeats, axis=0) 

480 self.update_atomic_density_matrices(D_asp) 

481 

482 def update(self, wfs): 

483 self.timer.start('Density') 

484 self.timer.start('Pseudo density') 

485 self.calculate_pseudo_density(wfs) 

486 self.timer.stop('Pseudo density') 

487 self.timer.start('Atomic density matrices') 

488 f_un = [] 

489 for kpt in wfs.kpt_u: 

490 f_n = kpt.f_n - self.wocc_sn[kpt.s] + self.wunocc_sn[kpt.s] 

491 if self.nspins > self.gsdensity.nspins: 

492 f_n = kpt.f_n - self.wocc_sn[1] + self.wunocc_sn[1] 

493 f_un.append(f_n) 

494 wfs.calculate_atomic_density_matrices_with_occupation(self.D_asp, 

495 f_un) 

496 self.timer.stop('Atomic density matrices') 

497 self.timer.start('Multipole moments') 

498 comp_charge, _Q_aL = self.calculate_multipole_moments() 

499 self.timer.stop('Multipole moments') 

500 

501 if isinstance(wfs, LCAOWaveFunctions): 

502 self.timer.start('Normalize') 

503 self.normalize(comp_charge) 

504 self.timer.stop('Normalize') 

505 

506 self.timer.stop('Density') 

507 

508 def calculate_pseudo_density(self, wfs): 

509 """Calculate nt_sG from scratch. 

510 

511 nt_sG will be equal to nct_G plus the contribution from 

512 wfs.add_to_density(). 

513 """ 

514 nvspins = wfs.kd.nspins 

515 npspins = self.nspins 

516 self.nt_xG = self.gd.zeros(self.ncomponents) 

517 

518 for s in range(npspins): 

519 for kpt in wfs.kpt_u: 

520 if s == kpt.s or npspins > nvspins: 

521 f_n = kpt.f_n / (1. + int(npspins > nvspins)) 

522 for f, psit_G in zip((f_n - self.wocc_sn[s] + 

523 self.wunocc_sn[s]), 

524 kpt.psit_nG): 

525 axpy(f, psit_G ** 2, self.nt_sG[s]) 

526 self.nt_sG[:] += self.nct_G