Coverage for gpaw/utilities/grid_redistribute.py: 71%

381 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-07-09 00:21 +0000

1import itertools 

2import numpy as np 

3from gpaw.grid_descriptor import GridDescriptor 

4 

5 

6class AlignedGridRedistributor: 

7 """Perform redistributions between two grids. 

8 

9 See the redistribute function.""" 

10 def __init__(self, gd, distribute_dir, reduce_dir): 

11 self.gd = gd 

12 self.distribute_dir = distribute_dir 

13 self.reduce_dir = reduce_dir 

14 self.gd2 = get_compatible_grid_descriptor(gd, distribute_dir, 

15 reduce_dir) 

16 

17 def _redist(self, src, op): 

18 return redistribute(self.gd, self.gd2, src, self.distribute_dir, 

19 self.reduce_dir, operation=op) 

20 

21 def forth(self, src): 

22 return self._redist(src, 'forth') 

23 

24 def back(self, src): 

25 return self._redist(src, 'back') 

26 

27 

28def redistribute(gd, gd2, src, distribute_dir, reduce_dir, operation='forth'): 

29 """Perform certain simple redistributions among two grid descriptors. 

30 

31 Redistribute src from gd with decomposition X x Y x Z to gd2 with 

32 decomposition X x YZ x 1, or some variation of this. We say that 

33 we "reduce" along Z while we "distribute" along Y. The 

34 redistribution is one-to-one. 

35 

36 ____________ ____________ 

37 i / / /| r / / / / /| 

38 n /_____/_____/ | i / / / / / | 

39 d / / /| | d / / / / / | 

40 e /_____/_____/ | j forth /__/__/__/__/ j 

41 p | | | |/| e -------> | | | | | /| 

42 e | | | ł | c <------- | | | | | / | 

43 n |_____|_____|/| j u back |__|__|__|__|/ j 

44 d | | | |/ d | | | | | / 

45 e | | | / e | | | | | / 

46 n |_____|_____|/ r |__|__|__|__|/ 

47 t 

48 

49 d i s t r i b u t e d i r 

50 

51 Directions are specified as 0, 1, or 2. gd2 must be serial along 

52 the axis of reduction and must parallelize enough over the 

53 distribution axis to match the size of gd.comm. 

54 

55 Returns the redistributed array which is compatible with gd2. 

56 

57 Note: The communicator of gd2 must in general be a special 

58 permutation of that of gd in order for the redistribution axes to 

59 align with domain rank assignment. Use the helper function 

60 get_compatible_grid_descriptor to obtain a grid descriptor which 

61 uses a compatible communicator.""" 

62 

63 assert reduce_dir != distribute_dir 

64 assert gd.comm.size == gd2.comm.size 

65 # Actually: The two communicators should be equal!! 

66 for c in [reduce_dir, distribute_dir]: 

67 assert 0 <= c and c < 3 

68 

69 # Determine the direction in which nothing happens. 

70 for c in range(3): 

71 if c != reduce_dir and c != distribute_dir: 

72 independent_dir = c 

73 break 

74 assert np.all(gd.N_c == gd2.N_c) 

75 assert np.all(gd.pbc_c == gd2.pbc_c) 

76 assert gd.n_c[independent_dir] == gd2.n_c[independent_dir] 

77 assert gd.parsize_c[independent_dir] == gd2.parsize_c[independent_dir] 

78 assert gd2.parsize_c[reduce_dir] == 1 

79 assert gd2.parsize_c[distribute_dir] == gd.parsize_c[reduce_dir] \ 

80 * gd.parsize_c[distribute_dir] 

81 assert operation == 'forth' or operation == 'back' 

82 forward = (operation == 'forth') 

83 if forward: 

84 assert np.all(src.shape == gd.n_c) 

85 else: 

86 assert np.all(src.shape == gd2.n_c) 

87 assert gd.comm.compare(gd2.comm) != 'unequal' 

88 

89 # We want this to work no matter which direction is distribute and 

90 # reduce. But that is tricky to code. So we use a standard order 

91 # of the three directions. 

92 # 

93 # Thus we have to always transpose the src/dst arrays consistently 

94 # when interacting with the contiguous MPI send/recv buffers. An 

95 # alternative is to use np.take, but that sometimes produces 

96 # copies where slicing does not, and we want to write back into 

97 # slices. 

98 # 

99 # We only support some of them though... 

100 dirs = (independent_dir, distribute_dir, reduce_dir) 

101 src = src.transpose(*dirs) 

102 

103 # Construct a communicator consisting of all those processes that 

104 # participate in domain decomposition along the reduction 

105 # direction. 

106 # 

107 # All necessary communication can be done within that 

108 # subcommunicator using MPI alltoallv. 

109 # 

110 # We also construct the "same" communicator from gd2.comm, but with the 

111 # sole purpose of testing that the ranks are consistent between the two. 

112 # If they are not, the two grid descriptors are incompatible. 

113 pos_c = gd.parpos_c.copy() 

114 pos2_c = gd2.parpos_c.copy() 

115 positions2_offset = pos_c[distribute_dir] * gd.parsize_c[reduce_dir] 

116 peer_ranks = [] 

117 peer_ranks2 = [] 

118 for i in range(gd.parsize_c[reduce_dir]): 

119 pos_c[reduce_dir] = i 

120 pos2_c[distribute_dir] = i + positions2_offset 

121 peer_ranks.append(gd.get_rank_from_processor_position(pos_c)) 

122 peer_ranks2.append(gd2.get_rank_from_processor_position(pos2_c)) 

123 peer_comm = gd.comm.new_communicator(peer_ranks) 

124 test_peer_comm2 = gd2.comm.new_communicator(peer_ranks2) 

125 if test_peer_comm2.compare(peer_comm) != 'congruent': 

126 raise ValueError('Grids are not compatible. ' 

127 'Use get_compatible_grid_descriptor to construct ' 

128 'a compatible grid.') 

129 # assert peer_comm2 is not None 

130 assert peer_comm.compare( 

131 gd2.comm.new_communicator(peer_ranks2)) == 'congruent' 

132 # print('COMPARE', peer_ranks, peer_ranks2, peer_comm.compare(peer_comm2)) 

133 

134 # Now check that peer_comm encompasses the same physical processes 

135 # on the communicators of the two grid descriptors. 

136 # test1 = peer_comm.translate_ranks(gd.comm, np.arange(peer_comm.size)) 

137 # test2 = peer_comm.translate_ranks(gd.comm, np.arange(peer_comm.size)) 

138 # print(peer_comm) 

139 

140 members = peer_comm.get_members() 

141 

142 mynpts1_rdir = gd.n_c[reduce_dir] 

143 mynpts2_ddir = gd2.n_c[distribute_dir] 

144 mynpts_idir = gd.n_c[independent_dir] 

145 assert mynpts_idir == gd2.n_c[independent_dir] 

146 

147 offsets1_rdir_p = gd.n_cp[reduce_dir] 

148 offsets2_ddir_p = gd2.n_cp[distribute_dir] 

149 

150 npts1_rdir_p = offsets1_rdir_p[1:] - offsets1_rdir_p[:-1] 

151 npts2_ddir_p = offsets2_ddir_p[1:] - offsets2_ddir_p[:-1] 

152 

153 # We have the sendbuffer, and it is contiguous. But the parts 

154 # that we are going to send to each CPU are not contiguous! We 

155 # need to loop over all the little chunks that we want to send, 

156 # and put them into a contiguous buffer for MPI. Moreover, the 

157 # received data will unsurprisingly be in that very same order. 

158 # Therefore, we need to know how to unpack those data and put them 

159 # into the return array too. 

160 # 

161 # The following loop builds the send buffer, and manages the logic 

162 # for the receive buffer. However since we have not received the 

163 # data, we obviously cannot copy anything out of the receive 

164 # buffer yet. Therefore we create a list of ChunkCopiers that 

165 # contain all the information that they need to later copy things 

166 # into the appropriate places of the return array. 

167 

168 if forward: 

169 dst = gd2.zeros(dtype=src.dtype) 

170 else: 

171 dst = gd.zeros(dtype=src.dtype) 

172 recvbuf = np.empty(dst.size, dtype=src.dtype) 

173 dst[:] = -2 

174 recvbuf[:] = -3 

175 

176 sendchunks = [] 

177 recvchunks = [] 

178 recv_chunk_copiers = [] 

179 

180 class ChunkCopier: 

181 def __init__(self, src_chunk, dst_chunk): 

182 self.src_chunk = src_chunk 

183 self.dst_chunk = dst_chunk 

184 

185 def copy(self): 

186 self.dst_chunk.flat[:] = self.src_chunk 

187 

188 # Convert from peer_comm 

189 ranks1to2 = gd.comm.translate_ranks(gd2.comm, np.arange(gd.comm.size)) 

190 assert (ranks1to2 != -1).all() 

191 

192 recvchunk_start = 0 

193 for i in range(peer_comm.size): 

194 parent_rank = members[i] 

195 parent_rank2 = ranks1to2[parent_rank] 

196 

197 parent_coord1 = \ 

198 gd.get_processor_position_from_rank(parent_rank)[reduce_dir] 

199 parent_coord2 = \ 

200 gd2.get_processor_position_from_rank(parent_rank2)[distribute_dir] 

201 

202 # Warning: Many sendXXX and recvXXX variables are badly named 

203 # because they change roles when operation='back'. 

204 sendstart_ddir = offsets2_ddir_p[parent_coord2] \ 

205 - gd.beg_c[distribute_dir] 

206 sendstop_ddir = sendstart_ddir + npts2_ddir_p[parent_coord2] 

207 sendnpts_ddir = sendstop_ddir - sendstart_ddir 

208 

209 # Compensate for the infinitely annoying convention that enumeration 

210 # of points starts at 1 in non-periodic directions. 

211 # 

212 # Also, if we want to handle more general redistributions, the 

213 # below buffers must have something subtracted to get a proper 

214 # local index. 

215 recvstart_rdir = offsets1_rdir_p[parent_coord1] \ 

216 - 1 + gd.pbc_c[reduce_dir] 

217 recvstop_rdir = recvstart_rdir + npts1_rdir_p[parent_coord1] 

218 recvnpts_rdir = recvstop_rdir - recvstart_rdir 

219 

220 # Grab subarray that is going to be sent to process i. 

221 if forward: 

222 assert 0 <= sendstart_ddir 

223 assert sendstop_ddir <= src.shape[1] 

224 sendchunk = src[:, sendstart_ddir:sendstop_ddir, :] 

225 assert (sendchunk.size == 

226 mynpts1_rdir * sendnpts_ddir * mynpts_idir), ( 

227 sendchunk.shape, (mynpts_idir, sendnpts_ddir, mynpts1_rdir)) 

228 else: 

229 sendchunk = src[:, :, recvstart_rdir:recvstop_rdir] 

230 assert sendchunk.size == recvnpts_rdir * mynpts2_ddir * mynpts_idir 

231 sendchunks.append(sendchunk) 

232 

233 if forward: 

234 recvchunksize = recvnpts_rdir * mynpts2_ddir * mynpts_idir 

235 else: 

236 recvchunksize = mynpts1_rdir * sendnpts_ddir * mynpts_idir 

237 recvchunk = recvbuf[recvchunk_start:recvchunk_start + recvchunksize] 

238 recvchunks.append(recvchunk) 

239 recvchunk_start += recvchunksize 

240 

241 if forward: 

242 dstchunk = dst.transpose(*dirs)[:, :, recvstart_rdir:recvstop_rdir] 

243 else: 

244 dstchunk = dst.transpose(*dirs)[:, sendstart_ddir:sendstop_ddir, :] 

245 copier = ChunkCopier(recvchunk, dstchunk) 

246 recv_chunk_copiers.append(copier) 

247 

248 sendcounts = np.array([chunk.size for chunk in sendchunks], dtype=int) 

249 recvcounts = np.array([chunk.size for chunk in recvchunks], dtype=int) 

250 

251 assert sendcounts.sum() == src.size 

252 assert recvcounts.sum() == dst.size 

253 senddispls = np.array([0] + list(np.cumsum(sendcounts))[:-1], dtype=int) 

254 recvdispls = np.array([0] + list(np.cumsum(recvcounts))[:-1], dtype=int) 

255 

256 sendbuf = np.concatenate([sendchunk.ravel() for sendchunk in sendchunks]) 

257 

258 peer_comm.alltoallv(sendbuf, sendcounts, senddispls, 

259 recvbuf, recvcounts, recvdispls) 

260 

261 # Copy contiguous blocks of receive buffer back into precoded slices: 

262 for chunk_copier in recv_chunk_copiers: 

263 chunk_copier.copy() 

264 return dst 

265 

266 

267def get_compatible_grid_descriptor(gd, distribute_dir, reduce_dir): 

268 parsize2_c = list(gd.parsize_c) 

269 parsize2_c[reduce_dir] = 1 

270 parsize2_c[distribute_dir] = gd.parsize_c[reduce_dir] \ 

271 * gd.parsize_c[distribute_dir] 

272 

273 # Because of the way in which domains are assigned to ranks, some 

274 # redistributions cannot be represented on any grid descriptor 

275 # that uses the same communicator. However we can create a 

276 # different one which assigns ranks in a manner corresponding to 

277 # a permutation of the axes, and there always exists a compatible 

278 # such communicator. 

279 

280 # Probably there are two: a left-handed and a right-handed one 

281 # (i.e., positive or negative permutation of the axes). It would 

282 # probably be logical to always choose a right-handed one. Right 

283 # now the numbers correspond to whatever first was made to work 

284 # though! 

285 t = {(0, 1): (0, 1, 2), 

286 (0, 2): (0, 2, 1), 

287 (1, 0): (1, 0, 2), 

288 (1, 2): (0, 1, 2), 

289 (2, 1): (0, 2, 1), 

290 (2, 0): (1, 2, 0)}[(distribute_dir, reduce_dir)] 

291 

292 ranks = np.arange(gd.comm.size).reshape(gd.parsize_c).transpose(*t).ravel() 

293 comm2 = gd.comm.new_communicator(ranks) 

294 gd2 = gd.new_descriptor(comm=comm2, parsize_c=parsize2_c) 

295 return gd2 

296 

297 

298class Domains: 

299 def __init__(self, domains_cp): 

300 self.domains_cp = domains_cp 

301 self.parsize_c = tuple(len(domains_cp[c]) - 1 for c in range(3)) 

302 

303 def get_global_shape(self): 

304 return tuple(self.domains_cp[c][-1] 

305 - self.domains_cp[c][0] for c in range(3)) 

306 

307 def get_offset(self, parpos_c): 

308 offset_c = [self.domains_cp[c][parpos_c[c]] for c in range(3)] 

309 return np.array(offset_c) 

310 

311 def get_box(self, parpos_c): 

312 parpos_c = np.array(parpos_c) 

313 offset_c = self.get_offset(parpos_c) 

314 nextoffset_c = self.get_offset(parpos_c + 1) 

315 return offset_c, nextoffset_c - offset_c 

316 

317 def as_serial(self): 

318 return Domains([[self.domains_cp[c][0], self.domains_cp[c][-1]] 

319 for c in range(3)]) 

320 

321 

322def random_subcomm(comm, gen, size): 

323 allranks = np.arange(comm.size) 

324 wranks = gen.choice(allranks, size=size, replace=False) 

325 subcomm = comm.new_communicator(wranks) 

326 return wranks, subcomm 

327 

328 

329# For testing 

330class RandomDistribution: 

331 def __init__(self, comm, domains, gen): 

332 self.world = comm 

333 self.domains = domains 

334 size = np.prod(domains.parsize_c) 

335 wranks, subcomm = random_subcomm(comm, gen, size) 

336 self.comm = subcomm 

337 

338 # We really need to know this information globally, even where 

339 # subcomm is None. 

340 comm.broadcast(wranks, 0) 

341 self.wranks = wranks 

342 self.subranks = {} 

343 for i, wrank in enumerate(wranks): 

344 self.subranks[wrank] = i 

345 

346 domain_order = np.arange(size) 

347 gen.shuffle(domain_order) 

348 self.ranks3d = domain_order.reshape(domains.parsize_c) 

349 

350 coords_iter = itertools.product(*[range(domains.parsize_c[c]) 

351 for c in range(3)]) 

352 self.coords_by_rank = list(coords_iter) 

353 

354 def parpos2rank(self, parpos_c): 

355 return self.wranks[self.ranks3d[parpos_c]] 

356 

357 def rank2parpos(self, rank): 

358 subrank = self.subranks.get(rank, None) 

359 if subrank is None: 

360 return None 

361 return self.coords_by_rank[subrank] 

362 

363 def get_test_array(self, dtype=int): 

364 if self.comm is None: 

365 return np.zeros((0, 0, 0), dtype=dtype) 

366 

367 parpos_c = self.rank2parpos(self.world.rank) 

368 offset_c, size_c = self.domains.get_box(parpos_c) 

369 arr = np.empty(size_c, dtype=dtype) 

370 arr[:] = self.comm.rank 

371 return arr 

372 

373 

374def general_redistribute(comm, domains1, domains2, rank2parpos1, rank2parpos2, 

375 src_xg, dst_xg, behavior='overwrite', xp=np): 

376 """Redistribute array arbitrarily. 

377 

378 Generally, this function redistributes part of an array into part 

379 of another array. It is thus not necessarily one-to-one, but can 

380 be used to perform a redistribution to or from a larger, padded 

381 array, for example. 

382 

383 """ 

384 assert src_xg.dtype == dst_xg.dtype 

385 # Reshaping as arr.reshape(-1, *shape) fails when some dimensions are zero. 

386 # Also, make sure datatype is correct even for 0-length tuples: 

387 nx = np.prod(src_xg.shape[:-3], dtype=int) 

388 src_xg = src_xg.reshape(nx, *src_xg.shape[-3:]) 

389 dst_xg = dst_xg.reshape(nx, *dst_xg.shape[-3:]) 

390 assert src_xg.shape[0] == dst_xg.shape[0] 

391 assert behavior in ['overwrite', 'add'] 

392 

393 if not isinstance(domains1, Domains): 

394 domains1 = Domains(domains1) 

395 if not isinstance(domains2, Domains): 

396 domains2 = Domains(domains2) 

397 

398 # Get global coords for local slice 

399 myparpos1_c = rank2parpos1(comm.rank) 

400 if myparpos1_c is not None: 

401 myoffset1_c, mysize1_c = domains1.get_box(myparpos1_c) 

402 assert np.all(mysize1_c == src_xg.shape[-3:]), \ 

403 (mysize1_c, src_xg.shape[-3:]) 

404 myparpos2_c = rank2parpos2(comm.rank) 

405 if myparpos2_c is not None: 

406 myoffset2_c, mysize2_c = domains2.get_box(myparpos2_c) 

407 assert np.all(mysize2_c == dst_xg.shape[-3:]), \ 

408 (mysize2_c, dst_xg.shape[-3:]) 

409 

410 sendranks = [] 

411 recvranks = [] 

412 sendchunks = [] 

413 recvchunks = [] 

414 

415 sendcounts, recvcounts, senddispls, recvdispls = np.zeros((4, comm.size), 

416 dtype=int) 

417 

418 # The plan is to loop over all ranks, then figure out: 

419 # 

420 # 1) What do we have to send to that rank 

421 # 2) What are we going to receive from that rank 

422 # 

423 # Some ranks may not hold any data before, or after, or both. 

424 

425 def _intersection(myoffset_c, mysize_c, offset_c, size_c, arr_g): 

426 # Get intersection of two rectangles, given as offset and size 

427 # in global coordinates. Returns None if no intersection, 

428 # else the appropriate slice of the local array 

429 start_c = np.max([myoffset_c, offset_c], axis=0) 

430 stop_c = np.min([myoffset_c + mysize_c, offset_c + size_c], axis=0) 

431 if (stop_c - start_c > 0).all(): 

432 # Reduce to local array coordinates: 

433 start_c -= myoffset_c 

434 stop_c -= myoffset_c 

435 return arr_g[:, start_c[0]:stop_c[0], start_c[1]:stop_c[1], 

436 start_c[2]:stop_c[2]] 

437 else: 

438 return None 

439 

440 def get_sendchunk(offset_c, size_c): 

441 return _intersection(myoffset1_c, mysize1_c, offset_c, size_c, src_xg) 

442 

443 def get_recvchunk(offset_c, size_c): 

444 return _intersection(myoffset2_c, mysize2_c, offset_c, size_c, dst_xg) 

445 

446 nsendtotal = 0 

447 nrecvtotal = 0 

448 for rank in range(comm.size): 

449 # Proceed only if we have something to send 

450 if myparpos1_c is not None: 

451 parpos2_c = rank2parpos2(rank) 

452 # Proceed only if other rank is going to receive something 

453 if parpos2_c is not None: 

454 offset2_c, size2_c = domains2.get_box(parpos2_c) 

455 sendchunk = get_sendchunk(offset2_c, size2_c) 

456 if sendchunk is not None: 

457 sendcounts[rank] = sendchunk.size 

458 senddispls[rank] = nsendtotal 

459 nsendtotal += sendchunk.size 

460 sendchunks.append(sendchunk) 

461 sendranks.append(rank) 

462 

463 # Proceed only if we are going to receive something 

464 if myparpos2_c is not None: 

465 parpos1_c = rank2parpos1(rank) 

466 # Proceed only if other rank has something to send 

467 if parpos1_c is not None: 

468 offset1_c, size1_c = domains1.get_box(parpos1_c) 

469 recvchunk = get_recvchunk(offset1_c, size1_c) 

470 if recvchunk is not None: 

471 recvcounts[rank] = recvchunk.size 

472 recvdispls[rank] = nrecvtotal 

473 nrecvtotal += recvchunk.size 

474 recvchunks.append(recvchunk) 

475 recvranks.append(rank) 

476 

477 # MPI wants contiguous buffers; who are we to argue: 

478 sendbuf = xp.empty(nsendtotal, src_xg.dtype) 

479 recvbuf = xp.empty(nrecvtotal, src_xg.dtype) 

480 

481 # Copy non-contiguous slices into contiguous sendbuffer: 

482 for sendrank, sendchunk in zip(sendranks, sendchunks): 

483 nstart = senddispls[sendrank] 

484 nstop = nstart + sendcounts[sendrank] 

485 sendbuf[nstart:nstop] = sendchunk.ravel() 

486 

487 # Finally! 

488 comm.alltoallv(sendbuf, sendcounts, senddispls, 

489 recvbuf, recvcounts, recvdispls) 

490 

491 # Now copy from the recvbuffer into the actual destination array: 

492 for recvrank, recvchunk in zip(recvranks, recvchunks): 

493 nstart = recvdispls[recvrank] 

494 nstop = nstart + recvcounts[recvrank] 

495 buf = recvbuf[nstart:nstop] 

496 if behavior == 'overwrite': 

497 recvchunk.flat[:] = buf 

498 elif behavior == 'add': 

499 recvchunk.flat[:] += buf 

500 

501 

502def test_general_redistribute(): 

503 from gpaw.mpi import world 

504 

505 domains1 = Domains([[0, 1], 

506 [1, 3, 5, 6], 

507 [0, 5, 9]]) 

508 domains2 = Domains([[0, 1], 

509 [0, 2, 4, 7, 10], 

510 [2, 4, 6, 7]]) 

511 

512 serial = domains2.as_serial() 

513 

514 gen = np.random.RandomState(42) 

515 

516 dist1 = RandomDistribution(world, domains1, gen) 

517 dist2 = RandomDistribution(world, domains2, gen) 

518 

519 arr1 = dist1.get_test_array() 

520 arr2 = dist2.get_test_array() 

521 print('shapes', arr1.shape, arr2.shape) 

522 arr2[:] = -1 

523 

524 general_redistribute(world, domains1, domains2, 

525 dist1.rank2parpos, dist2.rank2parpos, 

526 arr1, arr2) 

527 

528 dist_serial = RandomDistribution(world, serial, gen) 

529 arr3 = dist_serial.get_test_array() 

530 general_redistribute(world, domains2, serial, dist2.rank2parpos, 

531 dist_serial.rank2parpos, arr2, arr3) 

532 print(arr3) 

533 

534 

535def playground(): 

536 np.set_printoptions(linewidth=176) 

537 # N_c = [4, 7, 9] 

538 N_c = [4, 4, 2] 

539 

540 pbc_c = (1, 1, 1) 

541 

542 # 210 

543 distribute_dir = 1 

544 reduce_dir = 0 

545 

546 parsize_c = (2, 2, 2) 

547 parsize2_c = list(parsize_c) 

548 parsize2_c[reduce_dir] = 1 

549 parsize2_c[distribute_dir] *= parsize_c[reduce_dir] 

550 assert np.prod(parsize2_c) == np.prod(parsize_c) 

551 

552 gd = GridDescriptor(N_c=N_c, pbc_c=pbc_c, cell_cv=0.2 * np.array(N_c), 

553 parsize_c=parsize_c) 

554 

555 gd2 = get_compatible_grid_descriptor(gd, distribute_dir, reduce_dir) 

556 

557 src = gd.zeros(dtype=complex) 

558 src[:] = gd.comm.rank 

559 

560 src_global = gd.collect(src) 

561 if gd.comm.rank == 0: 

562 ind = np.indices(src_global.shape) 

563 src_global += 1j * (ind[0] / 10. + ind[1] / 100. + ind[2] / 1000.) 

564 # src_global[1] += 0.5j 

565 print('GLOBAL ARRAY', src_global.shape) 

566 print(src_global.squeeze()) 

567 gd.distribute(src_global, src) 

568 goal = gd2.zeros(dtype=float) 

569 goal[:] = gd.comm.rank # get_members()[gd2.comm.rank] 

570 goal_global = gd2.collect(goal) 

571 if gd.comm.rank == 0: 

572 print('GOAL GLOBAL') 

573 print(goal_global.squeeze()) 

574 gd.comm.barrier() 

575 

576 recvbuf = redistribute(gd, gd2, src, distribute_dir, reduce_dir, 

577 operation='forth') 

578 recvbuf_master = gd2.collect(recvbuf) 

579 if gd2.comm.rank == 0: 

580 print('RECV') 

581 print(recvbuf_master) 

582 err = src_global - recvbuf_master 

583 print('MAXERR', np.abs(err).max()) 

584 

585 hopefully_orig = redistribute(gd, gd2, recvbuf, distribute_dir, reduce_dir, 

586 operation='back') 

587 tmp = gd.collect(hopefully_orig) 

588 if gd.comm.rank == 0: 

589 print('FINALLY') 

590 print(tmp) 

591 err2 = src_global - tmp 

592 print('MAXERR', np.abs(err2).max()) 

593 

594 

595def test(N_c, gd, gd2, reduce_dir, distribute_dir, verbose=True): 

596 src = gd.zeros(dtype=complex) 

597 src[:] = gd.comm.rank 

598 

599 # if gd.comm.rank == 0: 

600 # print(gd) 

601 # print('hmmm', gd, gd2) 

602 

603 src_global = gd.collect(src) 

604 if gd.comm.rank == 0: 

605 ind = np.indices(src_global.shape) 

606 src_global += 1j * (ind[0] / 10. + ind[1] / 100. + ind[2] / 1000.) 

607 # src_global[1] += 0.5j 

608 if verbose: 

609 print('GLOBAL ARRAY', src_global.shape) 

610 print(src_global) 

611 gd.distribute(src_global, src) 

612 goal = gd2.zeros(dtype=float) 

613 goal[:] = gd2.comm.rank 

614 goal_global = gd2.collect(goal) 

615 if gd.comm.rank == 0 and verbose: 

616 print('GOAL GLOBAL') 

617 print(goal_global) 

618 gd.comm.barrier() 

619 

620 recvbuf = redistribute(gd, gd2, src, distribute_dir, reduce_dir, 

621 operation='forth') 

622 recvbuf_master = gd2.collect(recvbuf) 

623 # if np.all(N_c == [10, 16, 24]): 

624 # recvbuf_master[5,8,3] = 7 

625 maxerr = 0.0 

626 if gd2.comm.rank == 0: 

627 # if N_c[0] == 7: 

628 # recvbuf_master[5, 0, 0] = 7 

629 # recvbuf_master[0,0,0] = 7 

630 err = src_global - recvbuf_master 

631 maxerr = np.abs(err).max() 

632 if verbose: 

633 print('RECV FORTH') 

634 print(recvbuf_master) 

635 print('MAXERR', maxerr) 

636 maxerr = gd.comm.sum_scalar(maxerr) 

637 assert maxerr == 0.0, 'bad values after distribute "forth"' 

638 

639 recvbuf2 = redistribute(gd, gd2, recvbuf, distribute_dir, reduce_dir, 

640 operation='back') 

641 

642 final_err = gd.comm.sum_scalar(np.abs(src - recvbuf2).max()) 

643 assert final_err == 0.0, 'bad values after distribute "back"' 

644 

645 

646def rigorous_testing(): 

647 from itertools import product, permutations, cycle 

648 from gpaw.mpi import world 

649 gridpointcounts = [1, 2, 10, 21] 

650 cpucounts = np.arange(1, world.size + 1) 

651 pbc = cycle(product([0, 1], [0, 1], [0, 1])) 

652 

653 # This yields all possible parallelizations! 

654 for parsize_c in product(cpucounts, cpucounts, cpucounts): 

655 if np.prod(parsize_c) != world.size: 

656 continue 

657 

658 # All possible grid point counts 

659 for N_c in product(gridpointcounts, gridpointcounts, gridpointcounts): 

660 

661 # We simply can't be bothered to also do all possible 

662 # combinations with PBCs. Trying every possible set of 

663 # boundary conditions at least ones should be quite fine 

664 # enough. 

665 pbc_c = next(pbc) 

666 for dirs in permutations([0, 1, 2]): 

667 independent_dir, distribute_dir, reduce_dir = dirs 

668 

669 parsize2_c = list(parsize_c) 

670 parsize2_c[reduce_dir] = 1 

671 parsize2_c[distribute_dir] *= parsize_c[reduce_dir] 

672 parsize2_c = tuple(parsize2_c) 

673 assert np.prod(parsize2_c) == np.prod(parsize_c) 

674 

675 try: 

676 gd = GridDescriptor(N_c=N_c, pbc_c=pbc_c, 

677 cell_cv=0.2 * np.array(N_c), 

678 parsize_c=parsize_c) 

679 gd2 = get_compatible_grid_descriptor(gd, distribute_dir, 

680 reduce_dir) 

681 

682 # gd2 = gd.new_descriptor(parsize_c=parsize2_c) 

683 except ValueError: # Skip illegal distributions 

684 continue 

685 

686 if gd.comm.rank == 1: 

687 # print(gd, gd2) 

688 print('N_c=%s[%s] redist %s -> %s [ind=%d dist=%d red=%d]' 

689 % (N_c, pbc_c, parsize_c, parsize2_c, 

690 independent_dir, distribute_dir, reduce_dir)) 

691 gd.comm.barrier() 

692 test(N_c, gd, gd2, reduce_dir, distribute_dir, 

693 verbose=False) 

694 

695 

696if __name__ == '__main__': 

697 test_general_redistribute() 

698 # playground() 

699 # rigorous_testing()