Coverage for gpaw/utilities/__init__.py: 64%

232 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-07-12 00:18 +0000

1# Copyright (C) 2003 CAMP 

2# Please see the accompanying LICENSE file for further information. 

3 

4"""Utility functions and classes.""" 

5 

6import io 

7import os 

8import re 

9import sys 

10import time 

11from contextlib import contextmanager 

12from math import sqrt 

13from pathlib import Path 

14from typing import Union 

15 

16import gpaw.cgpaw as cgpaw 

17import gpaw.mpi as mpi 

18import numpy as np 

19from ase import Atoms 

20from ase.data import covalent_radii 

21from ase.neighborlist import neighbor_list 

22from gpaw import GPAW_NO_C_EXTENSION, debug 

23from gpaw.typing import DTypeLike 

24 

25# Code will crash for setups without any projectors. Setups that have 

26# no projectors therefore receive a dummy projector as a hacky 

27# workaround. The projector is assigned a certain, small size. If 

28# the grid is so coarse that no point falls within the projector's range, 

29# there'll also be an error. So this limits allowed grid spacings. 

30min_locfun_radius = 0.85 # Bohr 

31smallest_safe_grid_spacing = 2 * min_locfun_radius / np.sqrt(3) # ~0.52 Ang 

32 

33 

34class AtomsTooClose(ValueError): 

35 pass 

36 

37 

38def check_atoms_too_close(atoms: Atoms) -> None: 

39 radii = covalent_radii[atoms.numbers] * 0.01 

40 dists = neighbor_list('d', atoms, radii) 

41 if len(dists): 

42 raise AtomsTooClose(f'Atoms are too close, e.g. {dists[0]} Å') 

43 

44 

45def check_atoms_too_close_to_boundary(atoms: Atoms, 

46 dist: float = 0.2) -> None: 

47 """Check if any atoms are too close to the boundary of the box. 

48 

49 >>> atoms = Atoms('H', cell=[1, 1, 1]) 

50 >>> check_atoms_too_close_to_boundary(atoms) 

51 Traceback (most recent call last): 

52 ... 

53 raise AtomsTooClose('Atoms too close to boundary') 

54 gpaw.utilities.AtomsTooClose: Atoms too close to boundary 

55 >>> atoms.center() 

56 >>> check_atoms_too_close_to_boundary(atoms) 

57 >>> atoms = Atoms('H', 

58 ... positions=[[0.5, 0.5, 0.0]], 

59 ... cell=[1, 1, 0], # no bounday in z-direction 

60 ... pbc=(1, 1, 0)) 

61 >>> check_atoms_too_close_to_boundary(atoms) 

62 """ 

63 for axis_v, recip_v, pbc in zip(atoms.cell, 

64 atoms.cell.reciprocal(), 

65 atoms.pbc): 

66 if pbc: 

67 continue 

68 L = np.linalg.norm(axis_v) 

69 if L < 1e-12: # L==0 means no boundary 

70 continue 

71 spos_a = atoms.positions @ recip_v 

72 eps = dist / L 

73 if (spos_a < eps).any() or (spos_a > 1 - eps).any(): 

74 raise AtomsTooClose('Atoms too close to boundary') 

75 

76 

77def unpack_atomic_matrices(M_sP, setups, new=False, density=False): 

78 M_asp = {} 

79 P1 = 0 

80 for a, setup in enumerate(setups): 

81 ni = setup.ni 

82 P2 = P1 + ni * (ni + 1) // 2 

83 M_sp = M_sP[:, P1:P2] 

84 if new: 

85 M2_sp = np.empty_like(M_sp) 

86 pnew = 0 

87 for c in range(ni): 

88 for r in range(c + 1): 

89 pold = c - r + r * (2 * ni - r + 1) // 2 

90 if density and r < c: 

91 M2_sp[:, pold] = 2 * M_sp[:, pnew] 

92 else: 

93 M2_sp[:, pold] = M_sp[:, pnew] 

94 pnew += 1 

95 M_asp[a] = M2_sp 

96 else: 

97 M_asp[a] = M_sp.copy() 

98 P1 = P2 

99 return M_asp 

100 

101 

102def pack_atomic_matrices(M_asp): 

103 M2_asp = M_asp.copy() 

104 M2_asp.redistribute(M2_asp.partition.as_serial()) 

105 return M2_asp.toarray(axis=1) 

106 

107 

108def h2gpts(h, cell_cv, idiv=4): 

109 """Convert grid spacing to number of grid points divisible by idiv. 

110 

111 Note that units of h and cell_cv must match! 

112 

113 h: float 

114 Desired grid spacing in. 

115 cell_cv: 3x3 ndarray 

116 Unit cell. 

117 """ 

118 

119 L_c = (np.linalg.inv(cell_cv)**2).sum(0)**-0.5 

120 return np.maximum(idiv, (L_c / h / idiv + 0.5).astype(int) * idiv) 

121 

122 

123def is_contiguous(array, dtype=None): 

124 """Check for contiguity and type.""" 

125 if dtype is None: 

126 return array.flags.c_contiguous 

127 else: 

128 return array.flags.c_contiguous and array.dtype == dtype 

129 

130 

131# Radial-grid Hartree solver: 

132# 

133# l 

134# __ __ r 

135# 1 \ 4|| < * ^ ^ 

136# ------ = ) ---- ---- Y (r)Y (r'), 

137# _ _ /__ 2l+1 l+1 lm lm 

138# |r-r'| lm r 

139# > 

140# where 

141# 

142# r = min(r, r') 

143# < 

144# 

145# and 

146# 

147# r = max(r, r') 

148# > 

149# 

150def hartree(l: int, nrdr: np.ndarray, r: np.ndarray, vr: np.ndarray) -> None: 

151 """Calculates radial Coulomb integral. 

152 

153 The following integral is calculated:: 

154 

155 ^ 

156 n (r')Y (r') 

157 ^ / _ l lm 

158 v (r)Y (r) = |dr' --------------, 

159 l lm / _ _ 

160 |r - r'| 

161 

162 where input and output arrays `nrdr` and `vr`:: 

163 

164 dr 

165 n (r) r -- and v (r) r. 

166 l dg l 

167 """ 

168 assert is_contiguous(nrdr, float) 

169 assert is_contiguous(r, float) 

170 assert is_contiguous(vr, float) 

171 assert nrdr.shape == vr.shape and len(vr.shape) == 1 

172 assert len(r.shape) == 1 

173 assert len(r) >= len(vr) 

174 return cgpaw.hartree(l, nrdr, r, vr) 

175 

176 

177def packed_index(i1, i2, ni): 

178 """Return a packed index""" 

179 if i1 > i2: 

180 return (i2 * (2 * ni - 1 - i2) // 2) + i1 

181 else: 

182 return (i1 * (2 * ni - 1 - i1) // 2) + i2 

183 

184 

185def unpacked_indices(p, ni): 

186 """Return unpacked indices corresponding to upper triangle""" 

187 assert 0 <= p < ni * (ni + 1) // 2 

188 i1 = int(ni + .5 - sqrt((ni - .5)**2 - 2 * (p - ni))) 

189 return i1, p - i1 * (2 * ni - 1 - i1) // 2 

190 

191 

192packing_conventions = """\n 

193The convention is that density matrices are constructed using (un)pack_density 

194and anything that should be multiplied onto such, e.g. corrections to the 

195Hamiltonian, are constructed according to (un)pack_hermitian. 

196""" 

197 

198 

199def pack_hermitian(M2, tolerance=1e-10): 

200 r"""Pack Hermitian 

201 

202 This functions packs a Hermitian 2D array to a 

203 1D array, averaging off-diagonal terms with complex conjugation. 

204 

205 The matrix:: 

206 

207 / a00 a01 a02 \ 

208 A = | a10 a11 a12 | 

209 \ a20 a21 a22 / 

210 

211 is transformed to the vector:: 

212 

213 (a00, [a01 + a10*]/2, [a02 + a20*]/2, a11, [a12 + a21*]/2, a22) 

214 """ 

215 if M2.ndim == 3: 

216 return np.array([pack_hermitian(m2) for m2 in M2]) 

217 n = len(M2) 

218 M = np.zeros(n * (n + 1) // 2, M2.dtype) 

219 p = 0 

220 for r in range(n): 

221 M[p] = M2[r, r] 

222 p += 1 

223 for c in range(r + 1, n): 

224 M[p] = (M2[r, c] + np.conjugate(M2[c, r])) / 2. # note / 2. 

225 error = abs(M2[r, c] - np.conjugate(M2[c, r])) 

226 assert error < tolerance, 'Pack not symmetric by %s' % error + ' %' 

227 p += 1 

228 assert p == len(M) 

229 return M 

230 

231 

232def unpack_hermitian(M): 

233 """Unpack 1D array to Hermitian 2D array, 

234 assuming a packing as in ``pack_hermitian``.""" 

235 

236 if M.ndim == 2: 

237 return np.array([unpack_hermitian(m) for m in M]) 

238 assert is_contiguous(M) 

239 assert M.ndim == 1 

240 n = int(sqrt(0.25 + 2.0 * len(M))) 

241 M2 = np.zeros((n, n), M.dtype.char) 

242 if M.dtype == complex: 

243 cgpaw.unpack_complex(M, M2) 

244 else: 

245 cgpaw.unpack(M, M2) 

246 return M2 

247 

248 

249def pack_density(A: np.ndarray) -> np.ndarray: 

250 r"""Pack off-diagonal sum 

251 

252 This function packs a 2D Hermitian array to 1D, adding off-diagonal terms. 

253 

254 The matrix:: 

255 

256 / a00 a01 a02 \ 

257 A = | a10 a11 a12 | 

258 \ a20 a21 a22 / 

259 

260 is transformed to the vector:: 

261 

262 (a00, a01 + a10, a02 + a20, a11, a12 + a21, a22)""" 

263 

264 assert A.ndim == 2 

265 assert A.shape[0] == A.shape[1] 

266 assert A.dtype in [float, complex] 

267 return cgpaw.pack(A) 

268 

269 

270# We cannot recover the complex part of the off-diag elements from a 

271# pack_density array since they are summed to zero (we only pack Hermitian 

272# arrays). We should consider if "unpack_density" even makes sense to have. 

273 

274 

275def unpack_density(M): 

276 """Unpack 1D array to 2D Hermitian array, 

277 assuming a packing as in ``pack_density``.""" 

278 if M.ndim == 2: 

279 return np.array([unpack_density(m) for m in M]) 

280 M2 = unpack_hermitian(M) 

281 M2 *= 0.5 # divide all by 2 

282 M2.flat[0::len(M2) + 1] *= 2 # rescale diagonal to original size 

283 return M2 

284 

285 

286for method in (pack_hermitian, unpack_hermitian, pack_density, pack_density): 

287 method.__doc__ += packing_conventions # type: ignore 

288 

289 

290def element_from_packed(M, i, j): 

291 """Return a specific element from a packed array (by ``pack``).""" 

292 n = int(sqrt(2 * len(M) + .25)) 

293 assert i < n and j < n 

294 p = packed_index(i, j, n) 

295 if i == j: 

296 return M[p] 

297 elif i > j: 

298 return .5 * M[p] 

299 else: 

300 return .5 * np.conjugate(M[p]) 

301 

302 

303def logfile(name, rank=0): 

304 """Create file object from name. 

305 

306 Use None for /dev/null and '-' for sys.stdout. Ranks > 0 will 

307 get /dev/null.""" 

308 

309 if rank == 0: 

310 if name is None: 

311 fd = devnull 

312 elif name == '-': 

313 fd = sys.stdout 

314 elif isinstance(name, str): 

315 fd = open(name, 'w') 

316 else: 

317 fd = name 

318 else: 

319 fd = devnull 

320 return fd 

321 

322 

323def uncamelcase(name): 

324 """Convert a CamelCase name to a string of space-seperated words.""" 

325 words = re.split('([A-Z]{1}[a-z]+)', name) 

326 return ' '.join([word for word in words if word != '']) 

327 

328 

329def divrl(a_g, l, r_g): 

330 """Return array divided by r to the l'th power.""" 

331 b_g = a_g.copy() 

332 if l > 0: 

333 b_g[1:] /= r_g[1:]**l 

334 b1, b2 = b_g[1:3] 

335 r12, r22 = r_g[1:3]**2 

336 b_g[0] = (b1 * r22 - b2 * r12) / (r22 - r12) 

337 return b_g 

338 

339 

340def compiled_with_sl(): 

341 return hasattr(cgpaw, 'new_blacs_context') 

342 

343 

344def compiled_with_libvdwxc(): 

345 return hasattr(cgpaw, 'libvdwxc_create') 

346 

347 

348def load_balance(paw, atoms): 

349 try: 

350 paw.initialize(atoms) 

351 except SystemExit: 

352 pass 

353 atoms_r = np.zeros(paw.wfs.world.size) 

354 rnk_a = paw.wfs.gd.get_ranks_from_positions(paw.spos_ac) 

355 for rnk in rnk_a: 

356 atoms_r[rnk] += 1 

357 max_atoms = max(atoms_r) 

358 min_atoms = min(atoms_r) 

359 ave_atoms = atoms_r.sum() / paw.wfs.world.size 

360 stddev_atoms = sqrt((atoms_r**2).sum() / paw.wfs.world.size - ave_atoms**2) 

361 print("Information about load balancing") 

362 print("--------------------------------") 

363 print("Number of atoms:", len(paw.spos_ac)) 

364 print("Number of CPUs:", paw.wfs.world.size) 

365 print("Max. number of atoms/CPU: ", max_atoms) 

366 print("Min. number of atoms/CPU: ", min_atoms) 

367 print("Average number of atoms/CPU:", ave_atoms) 

368 print(" standard deviation: %5.1f" % stddev_atoms) 

369 

370 

371if not debug and not GPAW_NO_C_EXTENSION: 

372 hartree = cgpaw.hartree # noqa 

373 pack_density = cgpaw.pack 

374 

375 

376def unlink(path: Union[str, Path], world=None): 

377 """Safely unlink path (delete file or symbolic link).""" 

378 

379 if isinstance(path, str): 

380 path = Path(path) 

381 if world is None: 

382 world = mpi.world 

383 

384 # Remove file: 

385 if world.rank == 0: 

386 try: 

387 path.unlink() 

388 except FileNotFoundError: 

389 pass 

390 else: 

391 while path.is_file(): 

392 time.sleep(1.0) 

393 world.barrier() 

394 

395 

396@contextmanager 

397def file_barrier(path: Union[str, Path], world=None): 

398 """Context manager for writing a file. 

399 

400 After the with-block all cores will be able to read the file. 

401 

402 >>> with file_barrier('something.txt'): 

403 ... result = 2 + 2 

404 ... Path('something.txt').write_text(f'{result}') # doctest: +SKIP 

405 

406 This will remove the file, write the file and wait for the file. 

407 """ 

408 

409 if isinstance(path, str): 

410 path = Path(path) 

411 if world is None: 

412 world = mpi.world 

413 

414 # Remove file: 

415 unlink(path, world) 

416 

417 yield 

418 

419 # Wait for file: 

420 while not path.is_file(): 

421 time.sleep(1.0) 

422 world.barrier() 

423 

424 

425class _NullIO(io.BufferedIOBase): 

426 # Implement as few methods as possible in order to be the target of 

427 # TextIOWrapper. Python docs are not very specific. 

428 def writable(self): 

429 return True 

430 

431 def flush(self): 

432 pass 

433 

434 

435devnull = io.TextIOWrapper(_NullIO()) # type: ignore 

436 

437 

438def convert_string_to_fd(name, world=None): 

439 """Create a file-descriptor for text output. 

440 

441 Will open a file for writing with given name. Use None for no output and 

442 '-' for sys.stdout. 

443 """ 

444 if world is None: 

445 from ase.parallel import world 

446 if name is None or world.rank != 0: 

447 return open(os.devnull, 'w') 

448 if name == '-': 

449 return sys.stdout 

450 if isinstance(name, (str, Path)): 

451 return open(name, 'w') 

452 return name # we assume name is already a file-descriptor 

453 

454 

455_complex_float = { 

456 np.float32: np.complex64, 

457 np.float64: np.complex128, 

458 np.complex64: np.complex64, 

459 np.complex128: np.complex128, 

460 float: complex, 

461 complex: complex} 

462 

463_real_float = { 

464 np.complex64: np.float32, 

465 np.complex128: np.float64, 

466 np.float32: np.float32, 

467 np.float64: np.float64, 

468 complex: float, 

469 float: float} 

470 

471 

472def as_complex_dtype(dtype: DTypeLike) -> np.dtype: 

473 """Convert dtype to complex dtype. 

474 

475 >>> [as_complex_dtype(dt) for dt in 

476 ... [np.float32, np.float64, complex]] 

477 [dtype('complex64'), dtype('complex128'), dtype('complex128')] 

478 """ 

479 return np.dtype(_complex_float[np.dtype(dtype).type]) 

480 

481 

482def as_real_dtype(dtype: DTypeLike) -> np.dtype: 

483 """Convert dtype to real dtype. 

484 

485 >>> [as_real_dtype(dt) for dt in 

486 ... [np.float32, np.float64, complex]] 

487 [dtype('float32'), dtype('float64'), dtype('float64')] 

488 """ 

489 return np.dtype(_real_float[np.dtype(dtype).type])