Coverage for gpaw/new/gpw.py: 81%

238 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-07-09 00:21 +0000

1"""GPW file-format. 

2 

3Versions: 

4 

51) The beginning ... 

6 

72) Lost in history. 

8 

93) Legacy GPAW. 

10 

114) New GPAW: 

12 

13 * new packing convention for D^a_ij and delta-H^a_ij 

14 * contains also electrostatic potential 

15 

165) Bug-fix: wave_functions.kpts.rotations are now U_scc 

17 as in version 3 (instead of U_svv). 

18 

196) Write energy contributions to "energy_contributions". 

20 

21""" 

22from __future__ import annotations 

23 

24from dataclasses import dataclass 

25from pathlib import Path 

26from typing import IO, Any, Union, Callable 

27 

28import ase.io.ulm as ulm 

29import gpaw 

30import gpaw.mpi as mpi 

31import numpy as np 

32from ase import Atoms 

33from ase.io.trajectory import read_atoms, write_atoms 

34from ase.units import Bohr, Ha 

35from gpaw.core.atom_arrays import AtomArraysLayout 

36from gpaw.new.builder import DFTComponentsBuilder 

37from gpaw.new.calculation import DFTCalculation, units 

38from gpaw.new.density import Density 

39from gpaw.new.logger import Logger 

40from gpaw.new.potential import Potential 

41from gpaw.utilities import unpack_hermitian, unpack_density 

42from gpaw.new.energies import DFTEnergies 

43from gpaw.dft import Parameters 

44 

45 

46def as_single_precision(array): 

47 """Convert 64 bit floating point numbers to 32 bit. 

48 

49 >>> as_single_precision(np.ones(3)) 

50 array([1., 1., 1.], dtype=float32) 

51 """ 

52 assert array.dtype in [np.float64, np.complex128] 

53 dtype = np.float32 if array.dtype == np.float64 else np.complex64 

54 return np.array(array, dtype=dtype) 

55 

56 

57def as_double_precision(array): 

58 """Convert 32 bit floating point numbers to 64 bit. 

59 

60 >>> as_double_precision(np.ones(3, dtype=np.float32)) 

61 array([1., 1., 1.]) 

62 """ 

63 if array is None: 

64 return None 

65 assert array.dtype in [np.float32, np.complex64] 

66 if array.dtype == np.float32: 

67 dtype = np.float64 

68 else: 

69 dtype = complex 

70 return np.array(array, dtype=dtype) 

71 

72 

73@dataclass 

74class GPWFlags: 

75 include_wfs: bool 

76 include_projections: bool 

77 precision: str 

78 

79 def __post_init__(self) -> None: 

80 if self.precision not in ['single', 'double']: 

81 raise ValueError('precision must be either "single" or "double"') 

82 

83 def storage_dtype(self, dtype): 

84 dtype = np.dtype(dtype) 

85 if self.precision == 'double': 

86 return dtype 

87 

88 if dtype == float: 

89 return np.dtype(np.float32) 

90 

91 if dtype == complex: 

92 return np.dtype(np.complex64) 

93 

94 raise ValueError(f'Unexpected dtype: {dtype}') 

95 

96 def to_storage_dtype(self, array: np.ndarray) -> np.ndarray: 

97 if self.precision == 'double': 

98 return array 

99 return array.astype(self.storage_dtype(array.dtype)) 

100 

101 

102def write_gpw(filename: str | Path, 

103 dft: DFTCalculation, 

104 flags: GPWFlags) -> None: 

105 

106 comm = dft.comm 

107 

108 writer: ulm.Writer | ulm.DummyWriter 

109 if comm.rank == 0: 

110 writer = ulm.Writer(filename, tag='gpaw') 

111 else: 

112 writer = ulm.DummyWriter() 

113 

114 with writer: 

115 writer.write(version=6, 

116 gpaw_version=gpaw.__version__, 

117 ha=Ha, 

118 bohr=Bohr, 

119 precision=flags.precision) 

120 

121 write_atoms(writer.child('atoms'), dft.atoms) 

122 

123 results = {key: value * units[key] 

124 for key, value in dft.results.items()} 

125 writer.child('results').write(**results) 

126 

127 p = dft.params.todict() 

128 p.pop('parallel', None) 

129 # ULM does not know about numpy dtypes: 

130 if 'dtype' in p: 

131 p['dtype'] = np.dtype(p['dtype']).name 

132 writer.child('parameters').write(**p) 

133 

134 dft.density.write_to_gpw(writer.child('density'), flags) 

135 dft.potential.write_to_gpw(writer.child('hamiltonian'), flags) 

136 writer.write(e_stress=dft.potential.e_stress * Ha) 

137 dft.energies.write_to_gpw(writer.child('energy_contributions')) 

138 wf_writer = writer.child('wave_functions') 

139 dft.ibzwfs.write(wf_writer, flags=flags) 

140 

141 if flags.include_wfs and dft.params.mode.name == 'pw': 

142 write_wave_function_indices(wf_writer, 

143 dft.ibzwfs, 

144 dft.density.nt_sR.desc) 

145 

146 comm.barrier() 

147 

148 

149def write_wave_function_indices(writer, ibzwfs, grid): 

150 if ibzwfs.band_comm.rank != 0: 

151 return 

152 if ibzwfs.domain_comm.rank != 0: 

153 return 

154 

155 kpt_comm = ibzwfs.kpt_comm 

156 ibz = ibzwfs.ibz 

157 nG = ibzwfs.get_max_shape(global_shape=True)[-1] 

158 

159 writer.add_array('indices', (len(ibz), nG), np.int32) 

160 

161 index_G = np.zeros(nG, np.int32) 

162 size = tuple(grid.size) 

163 if ibzwfs.dtype == float: 

164 size = (size[0], size[1], size[2] // 2 + 1) 

165 

166 for k, rank in enumerate(ibzwfs.rank_k): 

167 if rank == kpt_comm.rank: 

168 wfs = ibzwfs.wfs_qs[ibzwfs.q_k[k]][0] 

169 i_G = wfs.psit_nX.desc.indices(size) 

170 index_G[:len(i_G)] = i_G 

171 index_G[len(i_G):] = -1 

172 if rank == 0: 

173 writer.fill(index_G) 

174 else: 

175 kpt_comm.send(index_G, 0) 

176 elif kpt_comm.rank == 0: 

177 kpt_comm.receive(index_G, rank) 

178 writer.fill(index_G) 

179 

180 

181def read_gpw(filename: Union[str, Path, IO[str]], 

182 *, 

183 log: Union[Logger, str, Path, IO[str]] = None, 

184 comm=None, 

185 parallel: dict[str, Any] = None, 

186 dtype=None, 

187 object_hooks: dict[str, Callable[[dict], Any]] | None = None 

188 ) -> tuple[Atoms, 

189 DFTCalculation, 

190 Parameters, 

191 DFTComponentsBuilder]: 

192 """ 

193 Read gpw file 

194 

195 Returns 

196 ------- 

197 atoms, calculation, params, builder 

198 """ 

199 parallel = parallel or {} 

200 

201 if not isinstance(log, Logger): 

202 log = Logger(log, comm or mpi.world) 

203 

204 comm = log.comm 

205 

206 log(f'Reading from {filename}') 

207 

208 reader = ulm.Reader(filename) 

209 bohr = reader.bohr 

210 ha = reader.ha 

211 singlep = reader.get('precision', 'double') == 'single' 

212 

213 atoms = read_atoms(reader.atoms) 

214 kwargs = reader.parameters.asdict() 

215 kwargs['parallel'] = parallel 

216 

217 if 'dtype' in kwargs: 

218 kwargs['dtype'] = np.dtype(kwargs['dtype']) 

219 

220 if dtype is not None: 

221 kwargs['dtype'] = dtype 

222 

223 # kwargs['nbands'] = reader.wave_functions.eigenvalues.shape[-1] 

224 

225 for old_keyword in ['fixdensity', 'txt']: 

226 kwargs.pop(old_keyword, None) 

227 

228 if object_hooks: 

229 for key, hook in object_hooks.items(): 

230 if key in kwargs: 

231 kwargs[key] = hook(kwargs[key]) 

232 

233 params = Parameters(**kwargs) 

234 builder = params.dft_component_builder(atoms, log=log) 

235 

236 if comm.rank == 0: 

237 nt_sR_array = reader.density.density * bohr**3 

238 vt_sR_array = reader.hamiltonian.potential / ha 

239 if singlep: 

240 nt_sR_array = as_double_precision(nt_sR_array) 

241 vt_sR_array = as_double_precision(vt_sR_array) 

242 if builder.xc.type == 'MGGA': 

243 taut_sR_array = reader.density.ked * (bohr**3 / ha) 

244 dedtaut_sR_array = reader.hamiltonian.mgga_potential * bohr**-3 

245 if singlep: 

246 taut_sR_array = as_double_precision(taut_sR_array) 

247 dedtaut_sR_array = as_double_precision(dedtaut_sR_array) 

248 D_sap_array = reader.density.atomic_density_matrices 

249 dH_sap_array = reader.hamiltonian.atomic_hamiltonian_matrices / ha 

250 shape = nt_sR_array.shape[1:] 

251 else: 

252 nt_sR_array = None 

253 vt_sR_array = None 

254 taut_sR_array = None 

255 dedtaut_sR_array = None 

256 D_sap_array = None 

257 dH_sap_array = None 

258 shape = None 

259 

260 if builder.grid.global_shape() != mpi.broadcast(shape, comm=comm): 

261 # old gpw-file: 

262 kwargs.pop('h', None) 

263 kwargs['gpts'] = nt_sR_array.shape[1:] 

264 params = Parameters(**kwargs) 

265 builder = params.dft_component_builder(atoms, log=log) 

266 

267 kpts = reader.wave_functions.kpts 

268 rotation_scc = kpts.rotations 

269 if len(rotation_scc) != len(builder.ibz.symmetries): 

270 # Use symmetries from gpw-file 

271 if reader.version == 4: 

272 # gpw-files with version=4 wrote the wrong rotations 

273 cell_cv = atoms.cell 

274 rotation_scc = (cell_cv @ 

275 rotation_scc @ 

276 np.linalg.inv(cell_cv)).round() 

277 kwargs['symmetry'] = {'rotations': rotation_scc, 

278 'translations': kpts.translations, 

279 'atommaps': kpts.atommap} 

280 params = Parameters(**kwargs) 

281 builder = params.dft_component_builder(atoms, log=log) 

282 

283 (kpt_comm, band_comm, domain_comm, kpt_band_comm) = ( 

284 builder.communicators[x] for x in 'kbdD') 

285 

286 nt_sR = builder.grid.empty(builder.ncomponents) 

287 vt_sR = builder.grid.empty(builder.ncomponents) 

288 

289 if builder.xc.type == 'MGGA': 

290 taut_sR = builder.grid.empty(builder.ncomponents) 

291 dedtaut_sR = builder.grid.empty(builder.ncomponents) 

292 else: 

293 taut_sR = None 

294 dedtaut_sR = None 

295 

296 dtype = float if builder.ncomponents < 4 else complex 

297 atom_array_layout = AtomArraysLayout( 

298 [(setup.ni * (setup.ni + 1) // 2) for setup in builder.setups], 

299 atomdist=builder.atomdist, dtype=dtype) 

300 D_asp = atom_array_layout.empty(builder.ncomponents) 

301 dH_asp = atom_array_layout.empty(builder.ncomponents) 

302 

303 if kpt_band_comm.rank == 0: 

304 nt_sR.scatter_from(nt_sR_array) 

305 vt_sR.scatter_from(vt_sR_array) 

306 if builder.xc.type == 'MGGA': 

307 taut_sR.scatter_from(taut_sR_array) 

308 dedtaut_sR.scatter_from(dedtaut_sR_array) 

309 D_asp.scatter_from(D_sap_array) 

310 dH_asp.scatter_from(dH_sap_array) 

311 if reader.version < 4: 

312 convert_to_new_packing_convention(D_asp, density=True) 

313 convert_to_new_packing_convention(dH_asp) 

314 

315 kpt_band_comm.broadcast(nt_sR.data, 0) 

316 kpt_band_comm.broadcast(vt_sR.data, 0) 

317 if builder.xc.type == 'MGGA': 

318 kpt_band_comm.broadcast(taut_sR.data, 0) 

319 kpt_band_comm.broadcast(dedtaut_sR.data, 0) 

320 kpt_band_comm.broadcast(D_asp.data, 0) 

321 kpt_band_comm.broadcast(dH_asp.data, 0) 

322 

323 # if reader.version >= 4: 

324 if 'electrostatic_potential' in reader.hamiltonian: 

325 if comm.rank == 0: 

326 vHt_x_array = reader.hamiltonian.electrostatic_potential / ha 

327 if singlep: 

328 vHt_x_array = as_double_precision(vHt_x_array) 

329 else: 

330 vHt_x_array = None 

331 vHt_x = builder.electrostatic_potential_desc.empty() 

332 if kpt_band_comm.rank == 0: 

333 vHt_x.scatter_from(vHt_x_array) 

334 kpt_band_comm.broadcast(vHt_x.data, 0) 

335 else: 

336 vHt_x = None 

337 

338 density = Density.from_data_and_setups( 

339 nt_sR, taut_sR, D_asp.to_full(), 

340 builder.params.charge, 

341 builder.setups, 

342 builder.get_pseudo_core_densities(), 

343 builder.get_pseudo_core_ked()) 

344 

345 if reader.version >= 6: 

346 ec = {name: e / ha 

347 for name, e in reader.energy_contributions.asdict().items()} 

348 e_stress = reader.e_stress / ha 

349 else: 

350 NAMES = ['kinetic', 'coulomb', 'zero', 'external', 

351 'xc', 'entropy', 

352 'total_free', 'total_extrapolated', 

353 'band', 'stress', 'spinorbit'] 

354 ec = {name: reader.hamiltonian.get(f'e_{name}', np.nan) / ha 

355 for name in NAMES} 

356 ec['kinetic_correction'] = ec['kinetic'] - ec['band'] 

357 ec['extrapolation'] = (ec.pop('total_extrapolated') - 

358 ec.pop('total_free')) 

359 e_stress = ec.pop('stress', np.nan) / ha 

360 

361 energies = DFTEnergies(**ec) 

362 

363 potential = Potential(vt_sR, dH_asp.to_full(), dedtaut_sR, vHt_x, e_stress) 

364 

365 ibzwfs = builder.read_ibz_wave_functions(reader) 

366 

367 dft = DFTCalculation( 

368 atoms, ibzwfs, density, potential, 

369 builder.setups, 

370 builder.create_scf_loop(), 

371 pot_calc=builder.create_potential_calculator(), 

372 params=params, 

373 energies=energies, 

374 log=log) 

375 

376 results = {key: value / units[key] 

377 for key, value in reader.results.asdict().items()} 

378 

379 if results: 

380 log(f'Read {", ".join(sorted(results))}') 

381 

382 if reader.version < 4 and 'magmoms' in results: 

383 magmom_a = results['magmoms'] 

384 magmom_av = np.pad(magmom_a[:, np.newaxis], [(0, 0), (2, 0)]) 

385 results['non_collinear_magmoms'] = magmom_av 

386 

387 dft.results = results 

388 

389 if builder.mode in ['pw', 'fd']: # fd = finite-difference 

390 data = ibzwfs.wfs_qs[0][0].psit_nX.data 

391 if not hasattr(data, 'fd'): # fd = file-descriptor 

392 reader.close() 

393 else: 

394 reader.close() 

395 

396 return atoms, dft, params, builder 

397 

398 

399def convert_to_new_packing_convention(a_asp, density=False): 

400 """Convert from old to new convention. 

401 

402 :: 

403 

404 1 2 3 1 2 4 

405 . 4 5 -> . 3 5 

406 . . 6 . . 6 

407 """ 

408 for a_sp in a_asp.values(): 

409 if density: 

410 a_sii = unpack_density(a_sp) 

411 else: 

412 a_sii = unpack_hermitian(a_sp) 

413 L = np.tril_indices(a_sii.shape[1]) 

414 a_sp[:] = a_sii[(...,) + L]