Coverage for gpaw/new/xc.py: 58%

284 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-07-14 00:18 +0000

1"""""" 

2from __future__ import annotations 

3 

4from typing import Callable 

5 

6import numpy as np 

7 

8from gpaw.core import UGArray, UGDesc 

9from gpaw.gpu import einsum 

10from gpaw.new import zips 

11from gpaw.new.c import (add_to_density, add_to_density_gpu, evaluate_lda_gpu, 

12 evaluate_pbe_gpu) 

13from gpaw.new.ibzwfs import IBZWaveFunctions 

14from gpaw.new.pwfd.wave_functions import PWFDWaveFunctions 

15from gpaw.typing import Array2D 

16from gpaw.xc import XC 

17from gpaw.xc.functional import XCFunctional as OldXCFunctional 

18from gpaw.xc.gga import add_gradient_correction 

19from gpaw.xc.libvdwxc import VDWXC 

20from gpaw.xc.mgga import MGGA 

21from gpaw.xc.vdw import VDWFunctionalBase 

22from gpaw.hybrids import HybridXC 

23 

24 

25def create_functional(xc: OldXCFunctional | str | dict, 

26 grid: UGDesc, 

27 xp=np) -> Functional: 

28 exx_fraction = 0.0 

29 exx_omega = 0.0 

30 exx_yukawa = False 

31 if isinstance(xc, (str, dict)): 

32 xc = XC(xc) 

33 

34 if xc.type == 'HYB': 

35 assert isinstance(xc, HybridXC) 

36 exx_fraction = xc.exx_fraction 

37 exx_omega = xc.omega 

38 exx_yukawa = xc.yukawa 

39 xc = xc.xc 

40 

41 if xc.type == 'LDA': 

42 functional = LDAFunctional(xc, grid, xp) 

43 elif xc.type == 'GGA': 

44 functional = GGAFunctional(xc, grid, xp) 

45 elif xc.type == 'MGGA': 

46 functional = MGGAFunctional(xc, grid) 

47 else: 

48 raise ValueError(f'{xc.type} not supported') 

49 

50 functional.exx_fraction = exx_fraction 

51 functional.exx_omega = exx_omega 

52 functional.exx_yukawa = exx_yukawa 

53 

54 return functional 

55 

56 

57class Functional: 

58 def __init__(self, 

59 xc: OldXCFunctional, 

60 grid: UGDesc, 

61 xp=np): 

62 self.xc = xc 

63 self.grid = grid 

64 self.xp = xp 

65 self.setup_name = self.xc.get_setup_name() 

66 self.name = self.xc.name 

67 self.type = self.xc.type 

68 self.xc.xp = xp 

69 self.xc.set_grid_descriptor(grid._gd) 

70 self.exx_fraction = 0.0 

71 self.exx_omega = 0.0 

72 self.exx_yukawa = False 

73 self.energies: dict[str, float] = {} 

74 

75 def __str__(self): 

76 return f'name: {self.xc.get_description()}' 

77 

78 def calculate(self, 

79 nt_sr: UGArray, 

80 taut_sr: UGArray | None = None) -> tuple[float, 

81 UGArray, 

82 UGArray | None]: 

83 raise NotImplementedError 

84 

85 def calculate_paw_correction(self, setup, d, h=None): 

86 return self.xc.calculate_paw_correction(setup, d, h) 

87 

88 def get_setup_name(self) -> str: 

89 return self.name 

90 

91 def stress_contribution(self, 

92 ibzwfs, density, 

93 interpolate: Callable[[UGArray], UGArray] 

94 ) -> Array2D: 

95 args, kwargs = self._args(ibzwfs, density, interpolate) 

96 if ibzwfs.kpt_band_comm.rank == 0: 

97 if self.xp is np: 

98 if isinstance(self.xc, VDWXC): 

99 xck = self.xc.semilocal_xc.kernel 

100 xck.calculate(*[array.data for array in args]) 

101 wrapper = self.xc.redist_wrapper 

102 stress_vv = wrapper.calculate(args[1].data, args[3].data, 

103 args[2].data, args[4].data, 

104 get_stress=True)[1] 

105 return stress_vv + self._stress(*args, **kwargs) 

106 else: 

107 self.xc.kernel.calculate(*[array.data for array in args]) 

108 # Special GPU cases: 

109 elif self.name == 'LDA': 

110 e_r, nt_sr, vt_sr = args 

111 evaluate_lda_gpu(nt_sr.data, vt_sr.data, e_r.data) 

112 elif self.name == 'PBE': 

113 e_r, nt_sr, vt_sr, sigma_xr, dedsigma_xr = args 

114 evaluate_pbe_gpu(nt_sr.data, vt_sr.data, e_r.data, 

115 sigma_xr.data, dedsigma_xr.data) 

116 else: 

117 1 / 0 

118 return self._stress(*args, **kwargs) 

119 return self.xp.zeros((3, 3)) 

120 

121 def _args(self, 

122 ibzwfs, 

123 density, 

124 interpolate: Callable[[UGArray], UGArray] 

125 ) -> tuple[tuple[UGArray, ...], dict]: 

126 raise NotImplementedError 

127 

128 def _stress(self, *args: UGArray, **kwargs) -> Array2D: 

129 raise NotImplementedError 

130 

131 

132class LDAFunctional(Functional): 

133 def calculate(self, 

134 nt_sr: UGArray, 

135 taut_sr: UGArray | None = None) -> tuple[float, 

136 UGArray, 

137 UGArray | None]: 

138 vxct_sr = nt_sr.new() 

139 vxct_sr.data[:] = 0.0 

140 e_r = nt_sr.desc.empty(xp=self.xp) 

141 if self.xp is np: 

142 self.xc.kernel.calculate(e_r.data, nt_sr.data, vxct_sr.data) 

143 else: 

144 if self.name != 'LDA': 

145 raise ValueError(f'{self.name} not supported on GPU') 

146 evaluate_lda_gpu(nt_sr.data, vxct_sr.data, e_r.data) 

147 exc = e_r.integrate() 

148 return exc, vxct_sr, None 

149 

150 def _args(self, ibzwfs, density, interpolate): 

151 if ibzwfs.kpt_band_comm.rank != 0: 

152 return (), {} 

153 nt_sR = density.nt_sR 

154 e_r = self.grid.empty(xp=self.xp) 

155 nt_sr = interpolate(nt_sR) 

156 vt_sr = nt_sr.new(zeroed=True) 

157 return (e_r, nt_sr, vt_sr), {} 

158 

159 def _stress(self, # type: ignore 

160 e_r: UGArray, 

161 nt_sr: UGArray, 

162 vt_sr: UGArray) -> Array2D: 

163 P = e_r.integrate(skip_sum=True) 

164 for vt_r, nt_r in zip(vt_sr, nt_sr): 

165 P -= vt_r.integrate(nt_r, skip_sum=True) 

166 return float(P) * self.xp.eye(3) 

167 

168 

169class GGAFunctional(LDAFunctional): 

170 def __init__(self, 

171 xc: OldXCFunctional, 

172 grid: UGDesc, 

173 xp=np): 

174 super().__init__(xc, grid, xp) 

175 # xc already has Gradient.apply bound methods!!! 

176 self.grad_v = [grad.__self__ for grad in xc.grad_v] # type: ignore 

177 

178 def _evaluate_xc_cpu(self, args): 

179 if 'vdW' not in self.name: 

180 self.xc.kernel.calculate(*args) 

181 elif isinstance(self.xc, VDWXC): 

182 libvdwxc = self.xc.libvdwxc 

183 nspins = args[1].shape[0] 

184 if libvdwxc is None or self.xc._nspins != nspins: 

185 self.xc._nspins = nspins 

186 self.xc.initialize_backend(self.xc.gd) 

187 self.xc.semilocal_xc.kernel.calculate(*args) 

188 self.xc.calculate_vdw(*args[:5]) 

189 else: 

190 assert isinstance(self.xc, VDWFunctionalBase) 

191 self.xc.kernel.calculate(*args) 

192 self.xc.calculate_correlation(*args[:5]) 

193 

194 def calculate(self, 

195 nt_sr: UGArray, 

196 taut_sr: UGArray | None = None) -> tuple[float, 

197 UGArray, 

198 UGArray | None]: 

199 gradn_svr, sigma_xr = gradient_and_sigma(self.grad_v, nt_sr) 

200 

201 vxct_sr = nt_sr.new(zeroed=True) 

202 dedsigma_xr = sigma_xr.new() 

203 e_r = self.grid.empty(xp=self.xp) 

204 

205 if self.xp is np: 

206 args = [a.data 

207 for a in [e_r, nt_sr, vxct_sr, sigma_xr, dedsigma_xr]] 

208 self._evaluate_xc_cpu(args) 

209 else: 

210 if self.name != 'PBE': 

211 raise ValueError(f'{self.name} not supported on GPU') 

212 evaluate_pbe_gpu(nt_sr.data, vxct_sr.data, e_r.data, 

213 sigma_xr.data, dedsigma_xr.data) 

214 

215 add_gradient_correction([grad.apply for grad in self.grad_v], 

216 gradn_svr.data, sigma_xr.data, 

217 dedsigma_xr.data, vxct_sr.data) 

218 exc = e_r.integrate() 

219 return exc, vxct_sr, None 

220 

221 def _args(self, 

222 ibzwfs, 

223 density, 

224 interpolate: Callable[[UGArray], UGArray] 

225 ) -> tuple[tuple[UGArray, ...], dict]: 

226 args, kwargs = LDAFunctional._args(self, ibzwfs, density, interpolate) 

227 if args: 

228 e_r, nt_sr, vt_sr = args 

229 gradn_svr, sigma_xr = gradient_and_sigma(self.grad_v, nt_sr) 

230 dedsigma_xr = sigma_xr.new() 

231 args += (sigma_xr, dedsigma_xr) 

232 kwargs = {'gradn_svr': gradn_svr} 

233 return args, kwargs 

234 

235 def _stress(self, # type: ignore 

236 e_r, nt_sr, vt_sr, sigma_xr, dedsigma_xr, 

237 gradn_svr, 

238 ) -> Array2D: 

239 stress_vv = LDAFunctional._stress(self, e_r, nt_sr, vt_sr) 

240 P = 0.0 

241 for sigma_r, dedsigma_r in zip(sigma_xr, dedsigma_xr): 

242 P -= 2 * sigma_r.integrate(dedsigma_r, skip_sum=True) 

243 stress_vv += float(P) * self.xp.eye(3) 

244 

245 nspins = len(nt_sr) 

246 for v1 in range(3): 

247 for v2 in range(3): 

248 stress_vv[v1, v2] -= 2 * dedsigma_xr[0].integrate( 

249 gradn_svr[0, v1] * gradn_svr[0, v2], skip_sum=True) 

250 if nspins == 2: 

251 stress_vv[v1, v2] -= 2 * dedsigma_xr[1].integrate( 

252 gradn_svr[0, v1] * gradn_svr[1, v2], skip_sum=True) 

253 stress_vv[v1, v2] -= 2 * dedsigma_xr[2].integrate( 

254 gradn_svr[1, v1] * gradn_svr[1, v2], skip_sum=True) 

255 

256 return stress_vv 

257 

258 

259def gradient_and_sigma(grad_v, n_sr: UGArray) -> tuple[UGArray, UGArray]: 

260 """Calculate gradient of density and sigma. 

261 

262 Returns::: 

263 

264 _ _ 

265 ∇ n (r) 

266 s 

267 

268 and::: 

269 

270 _ _ 2 _ _ _ 2 

271 σ (r) = |∇n |, σ = ∇n . ∇n , σ = |∇n | 

272 0 0 1 0 1 2 1 

273 """ 

274 nspins = n_sr.dims[0] 

275 xp = n_sr.xp 

276 

277 gradn_svr = n_sr.desc.empty((nspins, 3), xp=xp) 

278 for v, grad in enumerate(grad_v): 

279 for s in range(nspins): 

280 grad(n_sr[s], gradn_svr[s, v]) 

281 

282 sigma_xr = n_sr.desc.empty(nspins * 2 - 1, xp=xp) 

283 dot_product(gradn_svr[0], None, sigma_xr[0]) 

284 if nspins == 2: 

285 dot_product(gradn_svr[0], gradn_svr[1], sigma_xr[1]) 

286 dot_product(gradn_svr[1], None, sigma_xr[2]) 

287 

288 return gradn_svr, sigma_xr 

289 

290 

291def dot_product(a_vr, b_vr, out_r): 

292 xp = a_vr.xp 

293 if b_vr is None: 

294 out_r.data[:] = 0.0 

295 if xp is np: 

296 for a_r in a_vr.data: 

297 add_to_density(1.0, a_r, out_r.data) 

298 else: 

299 add_to_density_gpu(xp.ones(3), a_vr.data, out_r.data) 

300 else: 

301 einsum('vabc, vabc -> abc', a_vr.data, b_vr.data, out=out_r.data) 

302 

303 

304class MGGAFunctional(GGAFunctional): 

305 def get_setup_name(self): 

306 return 'PBE' 

307 

308 def calculate(self, 

309 nt_sr: UGArray, 

310 taut_sr: UGArray | None = None) -> tuple[float, 

311 UGArray, 

312 UGArray | None]: 

313 gradn_svr, sigma_xr = gradient_and_sigma(self.grad_v, nt_sr) 

314 if isinstance(self.xc, VDWXC): 

315 assert isinstance(self.xc.semilocal_xc, MGGA), self.xc.semilocal_xc 

316 else: 

317 assert isinstance(self.xc, MGGA), self.xc 

318 e_r = self.grid.empty() 

319 if taut_sr is None: 

320 taut_sr = nt_sr.new(zeroed=True) 

321 dedtaut_sr = taut_sr.new() 

322 vxct_sr = taut_sr.new() 

323 vxct_sr.data[:] = 0.0 

324 dedsigma_xr = sigma_xr.new() 

325 args = [e_r, nt_sr, vxct_sr, sigma_xr, dedsigma_xr, 

326 taut_sr, dedtaut_sr] 

327 args = [array.data for array in args] 

328 self._evaluate_xc_cpu(args) 

329 add_gradient_correction([grad.apply for grad in self.grad_v], 

330 gradn_svr.data, sigma_xr.data, 

331 dedsigma_xr.data, vxct_sr.data) 

332 return e_r.integrate(), vxct_sr, dedtaut_sr 

333 

334 def _args(self, 

335 ibzwfs, 

336 density, 

337 interpolate: Callable[[UGArray], UGArray]): 

338 args, kwargs = GGAFunctional._args(self, ibzwfs, density, interpolate) 

339 taut_swR = _taut(ibzwfs, density.nt_sR.desc) 

340 

341 if not args: 

342 return (), {} 

343 

344 e_r, nt_sr, vt_sr, sigma_xr, dedsigma_xr = args 

345 taut_sR = density.taut_sR 

346 assert taut_sR is not None 

347 taut_sr = interpolate(taut_sR) 

348 dedtaut_sr = taut_sr.new() 

349 args += (taut_sr, dedtaut_sr) 

350 kwargs['taut_swR'] = taut_swR 

351 kwargs['interpolate'] = interpolate 

352 

353 return args, kwargs 

354 

355 def _stress(self, # type: ignore 

356 e_r, 

357 nt_sr, vt_sr, 

358 sigma_xr, dedsigma_xr, 

359 taut_sr, dedtaut_sr, 

360 gradn_svr, 

361 taut_swR, 

362 interpolate 

363 ) -> Array2D: # type: ignore 

364 stress_vv = GGAFunctional._stress( 

365 self, e_r, nt_sr, vt_sr, sigma_xr, dedsigma_xr, 

366 gradn_svr=gradn_svr) 

367 for taut_wR, dedtaut_r in zips(taut_swR, dedtaut_sr): 

368 w = 0 

369 for v1 in range(3): 

370 for v2 in range(v1, 3): 

371 taut_r = interpolate(taut_wR[w]) 

372 s = taut_r.integrate(dedtaut_r, skip_sum=True) 

373 stress_vv[v1, v2] -= s 

374 if v2 != v1: 

375 stress_vv[v2, v1] -= s 

376 w += 1 

377 

378 P = 0.0 

379 for taut_r, dedtaut_r in zip(taut_sr, dedtaut_sr): 

380 P -= taut_r.integrate(dedtaut_r, skip_sum=True) 

381 stress_vv += P * np.eye(3) 

382 

383 return stress_vv 

384 

385 

386def _taut(ibzwfs: IBZWaveFunctions, grid: UGDesc) -> UGArray | None: 

387 """Calculate upper half of symmetric ked tensor. 

388 

389 Returns ``taut_swR=taut_svvR``. Mapping from ``w`` to ``vv``:: 

390 

391 0 1 2 

392 . 3 4 

393 . . 5 

394 

395 Only cores with ``kpt_comm.rank==0`` and ``band_comm.rank==0`` 

396 return the uniform grids - other cores return None. 

397 """ 

398 # "1" refers to undistributed arrays 

399 dpsit1_vR = grid.new(comm=None, dtype=ibzwfs.dtype).empty(3) 

400 taut1_swR = grid.new(comm=None).zeros((ibzwfs.nspins, 6)) 

401 assert isinstance(taut1_swR, UGArray) # Argggghhh! 

402 domain_comm = grid.comm 

403 

404 for wfs in ibzwfs: 

405 assert isinstance(wfs, PWFDWaveFunctions) 

406 psit_nG = wfs.psit_nX 

407 pw = psit_nG.desc 

408 

409 pw1 = pw.new(comm=None) 

410 psit1_G = pw1.empty() 

411 iGpsit1_G = pw1.empty() 

412 Gplusk1_Gv = pw1.reciprocal_vectors() 

413 

414 occ_n = wfs.weight * wfs.spin_degeneracy * wfs.myocc_n 

415 

416 (N,) = psit_nG.mydims 

417 for n1 in range(0, N, domain_comm.size): 

418 n2 = min(n1 + domain_comm.size, N) 

419 psit_nG[n1:n2].gather_all(psit1_G) 

420 n = n1 + domain_comm.rank 

421 if n >= N: 

422 continue 

423 f = occ_n[n] 

424 if f == 0.0: 

425 continue 

426 for Gplusk1_G, dpsit1_R in zips(Gplusk1_Gv.T, dpsit1_vR): 

427 iGpsit1_G.data[:] = psit1_G.data 

428 iGpsit1_G.data *= 1j * Gplusk1_G 

429 iGpsit1_G.ifft(out=dpsit1_R) 

430 w = 0 

431 for v1 in range(3): 

432 for v2 in range(v1, 3): 

433 taut1_swR[wfs.spin, w].data += ( 

434 f * (dpsit1_vR[v1].data.conj() * 

435 dpsit1_vR[v2].data).real) 

436 w += 1 

437 

438 ibzwfs.kpt_comm.sum(taut1_swR.data, 0) 

439 if ibzwfs.kpt_comm.rank == 0: 

440 ibzwfs.band_comm.sum(taut1_swR.data, 0) 

441 if ibzwfs.band_comm.rank == 0: 

442 domain_comm.sum(taut1_swR.data, 0) 

443 if domain_comm.rank == 0: 

444 symmetries = ibzwfs.ibz.symmetries 

445 taut1_swR.symmetrize(symmetries.rotation_scc, 

446 symmetries.translation_sc) 

447 taut_swR = grid.empty((ibzwfs.nspins, 6)) 

448 taut_swR.scatter_from(taut1_swR) 

449 return taut_swR 

450 return None