Coverage for gpaw/utilities/__init__.py: 64%
232 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-07-12 00:18 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-07-12 00:18 +0000
1# Copyright (C) 2003 CAMP
2# Please see the accompanying LICENSE file for further information.
4"""Utility functions and classes."""
6import io
7import os
8import re
9import sys
10import time
11from contextlib import contextmanager
12from math import sqrt
13from pathlib import Path
14from typing import Union
16import gpaw.cgpaw as cgpaw
17import gpaw.mpi as mpi
18import numpy as np
19from ase import Atoms
20from ase.data import covalent_radii
21from ase.neighborlist import neighbor_list
22from gpaw import GPAW_NO_C_EXTENSION, debug
23from gpaw.typing import DTypeLike
25# Code will crash for setups without any projectors. Setups that have
26# no projectors therefore receive a dummy projector as a hacky
27# workaround. The projector is assigned a certain, small size. If
28# the grid is so coarse that no point falls within the projector's range,
29# there'll also be an error. So this limits allowed grid spacings.
30min_locfun_radius = 0.85 # Bohr
31smallest_safe_grid_spacing = 2 * min_locfun_radius / np.sqrt(3) # ~0.52 Ang
34class AtomsTooClose(ValueError):
35 pass
38def check_atoms_too_close(atoms: Atoms) -> None:
39 radii = covalent_radii[atoms.numbers] * 0.01
40 dists = neighbor_list('d', atoms, radii)
41 if len(dists):
42 raise AtomsTooClose(f'Atoms are too close, e.g. {dists[0]} Å')
45def check_atoms_too_close_to_boundary(atoms: Atoms,
46 dist: float = 0.2) -> None:
47 """Check if any atoms are too close to the boundary of the box.
49 >>> atoms = Atoms('H', cell=[1, 1, 1])
50 >>> check_atoms_too_close_to_boundary(atoms)
51 Traceback (most recent call last):
52 ...
53 raise AtomsTooClose('Atoms too close to boundary')
54 gpaw.utilities.AtomsTooClose: Atoms too close to boundary
55 >>> atoms.center()
56 >>> check_atoms_too_close_to_boundary(atoms)
57 >>> atoms = Atoms('H',
58 ... positions=[[0.5, 0.5, 0.0]],
59 ... cell=[1, 1, 0], # no bounday in z-direction
60 ... pbc=(1, 1, 0))
61 >>> check_atoms_too_close_to_boundary(atoms)
62 """
63 for axis_v, recip_v, pbc in zip(atoms.cell,
64 atoms.cell.reciprocal(),
65 atoms.pbc):
66 if pbc:
67 continue
68 L = np.linalg.norm(axis_v)
69 if L < 1e-12: # L==0 means no boundary
70 continue
71 spos_a = atoms.positions @ recip_v
72 eps = dist / L
73 if (spos_a < eps).any() or (spos_a > 1 - eps).any():
74 raise AtomsTooClose('Atoms too close to boundary')
77def unpack_atomic_matrices(M_sP, setups, new=False, density=False):
78 M_asp = {}
79 P1 = 0
80 for a, setup in enumerate(setups):
81 ni = setup.ni
82 P2 = P1 + ni * (ni + 1) // 2
83 M_sp = M_sP[:, P1:P2]
84 if new:
85 M2_sp = np.empty_like(M_sp)
86 pnew = 0
87 for c in range(ni):
88 for r in range(c + 1):
89 pold = c - r + r * (2 * ni - r + 1) // 2
90 if density and r < c:
91 M2_sp[:, pold] = 2 * M_sp[:, pnew]
92 else:
93 M2_sp[:, pold] = M_sp[:, pnew]
94 pnew += 1
95 M_asp[a] = M2_sp
96 else:
97 M_asp[a] = M_sp.copy()
98 P1 = P2
99 return M_asp
102def pack_atomic_matrices(M_asp):
103 M2_asp = M_asp.copy()
104 M2_asp.redistribute(M2_asp.partition.as_serial())
105 return M2_asp.toarray(axis=1)
108def h2gpts(h, cell_cv, idiv=4):
109 """Convert grid spacing to number of grid points divisible by idiv.
111 Note that units of h and cell_cv must match!
113 h: float
114 Desired grid spacing in.
115 cell_cv: 3x3 ndarray
116 Unit cell.
117 """
119 L_c = (np.linalg.inv(cell_cv)**2).sum(0)**-0.5
120 return np.maximum(idiv, (L_c / h / idiv + 0.5).astype(int) * idiv)
123def is_contiguous(array, dtype=None):
124 """Check for contiguity and type."""
125 if dtype is None:
126 return array.flags.c_contiguous
127 else:
128 return array.flags.c_contiguous and array.dtype == dtype
131# Radial-grid Hartree solver:
132#
133# l
134# __ __ r
135# 1 \ 4|| < * ^ ^
136# ------ = ) ---- ---- Y (r)Y (r'),
137# _ _ /__ 2l+1 l+1 lm lm
138# |r-r'| lm r
139# >
140# where
141#
142# r = min(r, r')
143# <
144#
145# and
146#
147# r = max(r, r')
148# >
149#
150def hartree(l: int, nrdr: np.ndarray, r: np.ndarray, vr: np.ndarray) -> None:
151 """Calculates radial Coulomb integral.
153 The following integral is calculated::
155 ^
156 n (r')Y (r')
157 ^ / _ l lm
158 v (r)Y (r) = |dr' --------------,
159 l lm / _ _
160 |r - r'|
162 where input and output arrays `nrdr` and `vr`::
164 dr
165 n (r) r -- and v (r) r.
166 l dg l
167 """
168 assert is_contiguous(nrdr, float)
169 assert is_contiguous(r, float)
170 assert is_contiguous(vr, float)
171 assert nrdr.shape == vr.shape and len(vr.shape) == 1
172 assert len(r.shape) == 1
173 assert len(r) >= len(vr)
174 return cgpaw.hartree(l, nrdr, r, vr)
177def packed_index(i1, i2, ni):
178 """Return a packed index"""
179 if i1 > i2:
180 return (i2 * (2 * ni - 1 - i2) // 2) + i1
181 else:
182 return (i1 * (2 * ni - 1 - i1) // 2) + i2
185def unpacked_indices(p, ni):
186 """Return unpacked indices corresponding to upper triangle"""
187 assert 0 <= p < ni * (ni + 1) // 2
188 i1 = int(ni + .5 - sqrt((ni - .5)**2 - 2 * (p - ni)))
189 return i1, p - i1 * (2 * ni - 1 - i1) // 2
192packing_conventions = """\n
193The convention is that density matrices are constructed using (un)pack_density
194and anything that should be multiplied onto such, e.g. corrections to the
195Hamiltonian, are constructed according to (un)pack_hermitian.
196"""
199def pack_hermitian(M2, tolerance=1e-10):
200 r"""Pack Hermitian
202 This functions packs a Hermitian 2D array to a
203 1D array, averaging off-diagonal terms with complex conjugation.
205 The matrix::
207 / a00 a01 a02 \
208 A = | a10 a11 a12 |
209 \ a20 a21 a22 /
211 is transformed to the vector::
213 (a00, [a01 + a10*]/2, [a02 + a20*]/2, a11, [a12 + a21*]/2, a22)
214 """
215 if M2.ndim == 3:
216 return np.array([pack_hermitian(m2) for m2 in M2])
217 n = len(M2)
218 M = np.zeros(n * (n + 1) // 2, M2.dtype)
219 p = 0
220 for r in range(n):
221 M[p] = M2[r, r]
222 p += 1
223 for c in range(r + 1, n):
224 M[p] = (M2[r, c] + np.conjugate(M2[c, r])) / 2. # note / 2.
225 error = abs(M2[r, c] - np.conjugate(M2[c, r]))
226 assert error < tolerance, 'Pack not symmetric by %s' % error + ' %'
227 p += 1
228 assert p == len(M)
229 return M
232def unpack_hermitian(M):
233 """Unpack 1D array to Hermitian 2D array,
234 assuming a packing as in ``pack_hermitian``."""
236 if M.ndim == 2:
237 return np.array([unpack_hermitian(m) for m in M])
238 assert is_contiguous(M)
239 assert M.ndim == 1
240 n = int(sqrt(0.25 + 2.0 * len(M)))
241 M2 = np.zeros((n, n), M.dtype.char)
242 if M.dtype == complex:
243 cgpaw.unpack_complex(M, M2)
244 else:
245 cgpaw.unpack(M, M2)
246 return M2
249def pack_density(A: np.ndarray) -> np.ndarray:
250 r"""Pack off-diagonal sum
252 This function packs a 2D Hermitian array to 1D, adding off-diagonal terms.
254 The matrix::
256 / a00 a01 a02 \
257 A = | a10 a11 a12 |
258 \ a20 a21 a22 /
260 is transformed to the vector::
262 (a00, a01 + a10, a02 + a20, a11, a12 + a21, a22)"""
264 assert A.ndim == 2
265 assert A.shape[0] == A.shape[1]
266 assert A.dtype in [float, complex]
267 return cgpaw.pack(A)
270# We cannot recover the complex part of the off-diag elements from a
271# pack_density array since they are summed to zero (we only pack Hermitian
272# arrays). We should consider if "unpack_density" even makes sense to have.
275def unpack_density(M):
276 """Unpack 1D array to 2D Hermitian array,
277 assuming a packing as in ``pack_density``."""
278 if M.ndim == 2:
279 return np.array([unpack_density(m) for m in M])
280 M2 = unpack_hermitian(M)
281 M2 *= 0.5 # divide all by 2
282 M2.flat[0::len(M2) + 1] *= 2 # rescale diagonal to original size
283 return M2
286for method in (pack_hermitian, unpack_hermitian, pack_density, pack_density):
287 method.__doc__ += packing_conventions # type: ignore
290def element_from_packed(M, i, j):
291 """Return a specific element from a packed array (by ``pack``)."""
292 n = int(sqrt(2 * len(M) + .25))
293 assert i < n and j < n
294 p = packed_index(i, j, n)
295 if i == j:
296 return M[p]
297 elif i > j:
298 return .5 * M[p]
299 else:
300 return .5 * np.conjugate(M[p])
303def logfile(name, rank=0):
304 """Create file object from name.
306 Use None for /dev/null and '-' for sys.stdout. Ranks > 0 will
307 get /dev/null."""
309 if rank == 0:
310 if name is None:
311 fd = devnull
312 elif name == '-':
313 fd = sys.stdout
314 elif isinstance(name, str):
315 fd = open(name, 'w')
316 else:
317 fd = name
318 else:
319 fd = devnull
320 return fd
323def uncamelcase(name):
324 """Convert a CamelCase name to a string of space-seperated words."""
325 words = re.split('([A-Z]{1}[a-z]+)', name)
326 return ' '.join([word for word in words if word != ''])
329def divrl(a_g, l, r_g):
330 """Return array divided by r to the l'th power."""
331 b_g = a_g.copy()
332 if l > 0:
333 b_g[1:] /= r_g[1:]**l
334 b1, b2 = b_g[1:3]
335 r12, r22 = r_g[1:3]**2
336 b_g[0] = (b1 * r22 - b2 * r12) / (r22 - r12)
337 return b_g
340def compiled_with_sl():
341 return hasattr(cgpaw, 'new_blacs_context')
344def compiled_with_libvdwxc():
345 return hasattr(cgpaw, 'libvdwxc_create')
348def load_balance(paw, atoms):
349 try:
350 paw.initialize(atoms)
351 except SystemExit:
352 pass
353 atoms_r = np.zeros(paw.wfs.world.size)
354 rnk_a = paw.wfs.gd.get_ranks_from_positions(paw.spos_ac)
355 for rnk in rnk_a:
356 atoms_r[rnk] += 1
357 max_atoms = max(atoms_r)
358 min_atoms = min(atoms_r)
359 ave_atoms = atoms_r.sum() / paw.wfs.world.size
360 stddev_atoms = sqrt((atoms_r**2).sum() / paw.wfs.world.size - ave_atoms**2)
361 print("Information about load balancing")
362 print("--------------------------------")
363 print("Number of atoms:", len(paw.spos_ac))
364 print("Number of CPUs:", paw.wfs.world.size)
365 print("Max. number of atoms/CPU: ", max_atoms)
366 print("Min. number of atoms/CPU: ", min_atoms)
367 print("Average number of atoms/CPU:", ave_atoms)
368 print(" standard deviation: %5.1f" % stddev_atoms)
371if not debug and not GPAW_NO_C_EXTENSION:
372 hartree = cgpaw.hartree # noqa
373 pack_density = cgpaw.pack
376def unlink(path: Union[str, Path], world=None):
377 """Safely unlink path (delete file or symbolic link)."""
379 if isinstance(path, str):
380 path = Path(path)
381 if world is None:
382 world = mpi.world
384 # Remove file:
385 if world.rank == 0:
386 try:
387 path.unlink()
388 except FileNotFoundError:
389 pass
390 else:
391 while path.is_file():
392 time.sleep(1.0)
393 world.barrier()
396@contextmanager
397def file_barrier(path: Union[str, Path], world=None):
398 """Context manager for writing a file.
400 After the with-block all cores will be able to read the file.
402 >>> with file_barrier('something.txt'):
403 ... result = 2 + 2
404 ... Path('something.txt').write_text(f'{result}') # doctest: +SKIP
406 This will remove the file, write the file and wait for the file.
407 """
409 if isinstance(path, str):
410 path = Path(path)
411 if world is None:
412 world = mpi.world
414 # Remove file:
415 unlink(path, world)
417 yield
419 # Wait for file:
420 while not path.is_file():
421 time.sleep(1.0)
422 world.barrier()
425class _NullIO(io.BufferedIOBase):
426 # Implement as few methods as possible in order to be the target of
427 # TextIOWrapper. Python docs are not very specific.
428 def writable(self):
429 return True
431 def flush(self):
432 pass
435devnull = io.TextIOWrapper(_NullIO()) # type: ignore
438def convert_string_to_fd(name, world=None):
439 """Create a file-descriptor for text output.
441 Will open a file for writing with given name. Use None for no output and
442 '-' for sys.stdout.
443 """
444 if world is None:
445 from ase.parallel import world
446 if name is None or world.rank != 0:
447 return open(os.devnull, 'w')
448 if name == '-':
449 return sys.stdout
450 if isinstance(name, (str, Path)):
451 return open(name, 'w')
452 return name # we assume name is already a file-descriptor
455_complex_float = {
456 np.float32: np.complex64,
457 np.float64: np.complex128,
458 np.complex64: np.complex64,
459 np.complex128: np.complex128,
460 float: complex,
461 complex: complex}
463_real_float = {
464 np.complex64: np.float32,
465 np.complex128: np.float64,
466 np.float32: np.float32,
467 np.float64: np.float64,
468 complex: float,
469 float: float}
472def as_complex_dtype(dtype: DTypeLike) -> np.dtype:
473 """Convert dtype to complex dtype.
475 >>> [as_complex_dtype(dt) for dt in
476 ... [np.float32, np.float64, complex]]
477 [dtype('complex64'), dtype('complex128'), dtype('complex128')]
478 """
479 return np.dtype(_complex_float[np.dtype(dtype).type])
482def as_real_dtype(dtype: DTypeLike) -> np.dtype:
483 """Convert dtype to real dtype.
485 >>> [as_real_dtype(dt) for dt in
486 ... [np.float32, np.float64, complex]]
487 [dtype('float32'), dtype('float64'), dtype('float64')]
488 """
489 return np.dtype(_real_float[np.dtype(dtype).type])