Coverage for gpaw/utilities/grid_redistribute.py: 71%
381 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-07-09 00:21 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-07-09 00:21 +0000
1import itertools
2import numpy as np
3from gpaw.grid_descriptor import GridDescriptor
6class AlignedGridRedistributor:
7 """Perform redistributions between two grids.
9 See the redistribute function."""
10 def __init__(self, gd, distribute_dir, reduce_dir):
11 self.gd = gd
12 self.distribute_dir = distribute_dir
13 self.reduce_dir = reduce_dir
14 self.gd2 = get_compatible_grid_descriptor(gd, distribute_dir,
15 reduce_dir)
17 def _redist(self, src, op):
18 return redistribute(self.gd, self.gd2, src, self.distribute_dir,
19 self.reduce_dir, operation=op)
21 def forth(self, src):
22 return self._redist(src, 'forth')
24 def back(self, src):
25 return self._redist(src, 'back')
28def redistribute(gd, gd2, src, distribute_dir, reduce_dir, operation='forth'):
29 """Perform certain simple redistributions among two grid descriptors.
31 Redistribute src from gd with decomposition X x Y x Z to gd2 with
32 decomposition X x YZ x 1, or some variation of this. We say that
33 we "reduce" along Z while we "distribute" along Y. The
34 redistribution is one-to-one.
36 ____________ ____________
37 i / / /| r / / / / /|
38 n /_____/_____/ | i / / / / / |
39 d / / /| | d / / / / / |
40 e /_____/_____/ | j forth /__/__/__/__/ j
41 p | | | |/| e -------> | | | | | /|
42 e | | | ł | c <------- | | | | | / |
43 n |_____|_____|/| j u back |__|__|__|__|/ j
44 d | | | |/ d | | | | | /
45 e | | | / e | | | | | /
46 n |_____|_____|/ r |__|__|__|__|/
47 t
49 d i s t r i b u t e d i r
51 Directions are specified as 0, 1, or 2. gd2 must be serial along
52 the axis of reduction and must parallelize enough over the
53 distribution axis to match the size of gd.comm.
55 Returns the redistributed array which is compatible with gd2.
57 Note: The communicator of gd2 must in general be a special
58 permutation of that of gd in order for the redistribution axes to
59 align with domain rank assignment. Use the helper function
60 get_compatible_grid_descriptor to obtain a grid descriptor which
61 uses a compatible communicator."""
63 assert reduce_dir != distribute_dir
64 assert gd.comm.size == gd2.comm.size
65 # Actually: The two communicators should be equal!!
66 for c in [reduce_dir, distribute_dir]:
67 assert 0 <= c and c < 3
69 # Determine the direction in which nothing happens.
70 for c in range(3):
71 if c != reduce_dir and c != distribute_dir:
72 independent_dir = c
73 break
74 assert np.all(gd.N_c == gd2.N_c)
75 assert np.all(gd.pbc_c == gd2.pbc_c)
76 assert gd.n_c[independent_dir] == gd2.n_c[independent_dir]
77 assert gd.parsize_c[independent_dir] == gd2.parsize_c[independent_dir]
78 assert gd2.parsize_c[reduce_dir] == 1
79 assert gd2.parsize_c[distribute_dir] == gd.parsize_c[reduce_dir] \
80 * gd.parsize_c[distribute_dir]
81 assert operation == 'forth' or operation == 'back'
82 forward = (operation == 'forth')
83 if forward:
84 assert np.all(src.shape == gd.n_c)
85 else:
86 assert np.all(src.shape == gd2.n_c)
87 assert gd.comm.compare(gd2.comm) != 'unequal'
89 # We want this to work no matter which direction is distribute and
90 # reduce. But that is tricky to code. So we use a standard order
91 # of the three directions.
92 #
93 # Thus we have to always transpose the src/dst arrays consistently
94 # when interacting with the contiguous MPI send/recv buffers. An
95 # alternative is to use np.take, but that sometimes produces
96 # copies where slicing does not, and we want to write back into
97 # slices.
98 #
99 # We only support some of them though...
100 dirs = (independent_dir, distribute_dir, reduce_dir)
101 src = src.transpose(*dirs)
103 # Construct a communicator consisting of all those processes that
104 # participate in domain decomposition along the reduction
105 # direction.
106 #
107 # All necessary communication can be done within that
108 # subcommunicator using MPI alltoallv.
109 #
110 # We also construct the "same" communicator from gd2.comm, but with the
111 # sole purpose of testing that the ranks are consistent between the two.
112 # If they are not, the two grid descriptors are incompatible.
113 pos_c = gd.parpos_c.copy()
114 pos2_c = gd2.parpos_c.copy()
115 positions2_offset = pos_c[distribute_dir] * gd.parsize_c[reduce_dir]
116 peer_ranks = []
117 peer_ranks2 = []
118 for i in range(gd.parsize_c[reduce_dir]):
119 pos_c[reduce_dir] = i
120 pos2_c[distribute_dir] = i + positions2_offset
121 peer_ranks.append(gd.get_rank_from_processor_position(pos_c))
122 peer_ranks2.append(gd2.get_rank_from_processor_position(pos2_c))
123 peer_comm = gd.comm.new_communicator(peer_ranks)
124 test_peer_comm2 = gd2.comm.new_communicator(peer_ranks2)
125 if test_peer_comm2.compare(peer_comm) != 'congruent':
126 raise ValueError('Grids are not compatible. '
127 'Use get_compatible_grid_descriptor to construct '
128 'a compatible grid.')
129 # assert peer_comm2 is not None
130 assert peer_comm.compare(
131 gd2.comm.new_communicator(peer_ranks2)) == 'congruent'
132 # print('COMPARE', peer_ranks, peer_ranks2, peer_comm.compare(peer_comm2))
134 # Now check that peer_comm encompasses the same physical processes
135 # on the communicators of the two grid descriptors.
136 # test1 = peer_comm.translate_ranks(gd.comm, np.arange(peer_comm.size))
137 # test2 = peer_comm.translate_ranks(gd.comm, np.arange(peer_comm.size))
138 # print(peer_comm)
140 members = peer_comm.get_members()
142 mynpts1_rdir = gd.n_c[reduce_dir]
143 mynpts2_ddir = gd2.n_c[distribute_dir]
144 mynpts_idir = gd.n_c[independent_dir]
145 assert mynpts_idir == gd2.n_c[independent_dir]
147 offsets1_rdir_p = gd.n_cp[reduce_dir]
148 offsets2_ddir_p = gd2.n_cp[distribute_dir]
150 npts1_rdir_p = offsets1_rdir_p[1:] - offsets1_rdir_p[:-1]
151 npts2_ddir_p = offsets2_ddir_p[1:] - offsets2_ddir_p[:-1]
153 # We have the sendbuffer, and it is contiguous. But the parts
154 # that we are going to send to each CPU are not contiguous! We
155 # need to loop over all the little chunks that we want to send,
156 # and put them into a contiguous buffer for MPI. Moreover, the
157 # received data will unsurprisingly be in that very same order.
158 # Therefore, we need to know how to unpack those data and put them
159 # into the return array too.
160 #
161 # The following loop builds the send buffer, and manages the logic
162 # for the receive buffer. However since we have not received the
163 # data, we obviously cannot copy anything out of the receive
164 # buffer yet. Therefore we create a list of ChunkCopiers that
165 # contain all the information that they need to later copy things
166 # into the appropriate places of the return array.
168 if forward:
169 dst = gd2.zeros(dtype=src.dtype)
170 else:
171 dst = gd.zeros(dtype=src.dtype)
172 recvbuf = np.empty(dst.size, dtype=src.dtype)
173 dst[:] = -2
174 recvbuf[:] = -3
176 sendchunks = []
177 recvchunks = []
178 recv_chunk_copiers = []
180 class ChunkCopier:
181 def __init__(self, src_chunk, dst_chunk):
182 self.src_chunk = src_chunk
183 self.dst_chunk = dst_chunk
185 def copy(self):
186 self.dst_chunk.flat[:] = self.src_chunk
188 # Convert from peer_comm
189 ranks1to2 = gd.comm.translate_ranks(gd2.comm, np.arange(gd.comm.size))
190 assert (ranks1to2 != -1).all()
192 recvchunk_start = 0
193 for i in range(peer_comm.size):
194 parent_rank = members[i]
195 parent_rank2 = ranks1to2[parent_rank]
197 parent_coord1 = \
198 gd.get_processor_position_from_rank(parent_rank)[reduce_dir]
199 parent_coord2 = \
200 gd2.get_processor_position_from_rank(parent_rank2)[distribute_dir]
202 # Warning: Many sendXXX and recvXXX variables are badly named
203 # because they change roles when operation='back'.
204 sendstart_ddir = offsets2_ddir_p[parent_coord2] \
205 - gd.beg_c[distribute_dir]
206 sendstop_ddir = sendstart_ddir + npts2_ddir_p[parent_coord2]
207 sendnpts_ddir = sendstop_ddir - sendstart_ddir
209 # Compensate for the infinitely annoying convention that enumeration
210 # of points starts at 1 in non-periodic directions.
211 #
212 # Also, if we want to handle more general redistributions, the
213 # below buffers must have something subtracted to get a proper
214 # local index.
215 recvstart_rdir = offsets1_rdir_p[parent_coord1] \
216 - 1 + gd.pbc_c[reduce_dir]
217 recvstop_rdir = recvstart_rdir + npts1_rdir_p[parent_coord1]
218 recvnpts_rdir = recvstop_rdir - recvstart_rdir
220 # Grab subarray that is going to be sent to process i.
221 if forward:
222 assert 0 <= sendstart_ddir
223 assert sendstop_ddir <= src.shape[1]
224 sendchunk = src[:, sendstart_ddir:sendstop_ddir, :]
225 assert (sendchunk.size ==
226 mynpts1_rdir * sendnpts_ddir * mynpts_idir), (
227 sendchunk.shape, (mynpts_idir, sendnpts_ddir, mynpts1_rdir))
228 else:
229 sendchunk = src[:, :, recvstart_rdir:recvstop_rdir]
230 assert sendchunk.size == recvnpts_rdir * mynpts2_ddir * mynpts_idir
231 sendchunks.append(sendchunk)
233 if forward:
234 recvchunksize = recvnpts_rdir * mynpts2_ddir * mynpts_idir
235 else:
236 recvchunksize = mynpts1_rdir * sendnpts_ddir * mynpts_idir
237 recvchunk = recvbuf[recvchunk_start:recvchunk_start + recvchunksize]
238 recvchunks.append(recvchunk)
239 recvchunk_start += recvchunksize
241 if forward:
242 dstchunk = dst.transpose(*dirs)[:, :, recvstart_rdir:recvstop_rdir]
243 else:
244 dstchunk = dst.transpose(*dirs)[:, sendstart_ddir:sendstop_ddir, :]
245 copier = ChunkCopier(recvchunk, dstchunk)
246 recv_chunk_copiers.append(copier)
248 sendcounts = np.array([chunk.size for chunk in sendchunks], dtype=int)
249 recvcounts = np.array([chunk.size for chunk in recvchunks], dtype=int)
251 assert sendcounts.sum() == src.size
252 assert recvcounts.sum() == dst.size
253 senddispls = np.array([0] + list(np.cumsum(sendcounts))[:-1], dtype=int)
254 recvdispls = np.array([0] + list(np.cumsum(recvcounts))[:-1], dtype=int)
256 sendbuf = np.concatenate([sendchunk.ravel() for sendchunk in sendchunks])
258 peer_comm.alltoallv(sendbuf, sendcounts, senddispls,
259 recvbuf, recvcounts, recvdispls)
261 # Copy contiguous blocks of receive buffer back into precoded slices:
262 for chunk_copier in recv_chunk_copiers:
263 chunk_copier.copy()
264 return dst
267def get_compatible_grid_descriptor(gd, distribute_dir, reduce_dir):
268 parsize2_c = list(gd.parsize_c)
269 parsize2_c[reduce_dir] = 1
270 parsize2_c[distribute_dir] = gd.parsize_c[reduce_dir] \
271 * gd.parsize_c[distribute_dir]
273 # Because of the way in which domains are assigned to ranks, some
274 # redistributions cannot be represented on any grid descriptor
275 # that uses the same communicator. However we can create a
276 # different one which assigns ranks in a manner corresponding to
277 # a permutation of the axes, and there always exists a compatible
278 # such communicator.
280 # Probably there are two: a left-handed and a right-handed one
281 # (i.e., positive or negative permutation of the axes). It would
282 # probably be logical to always choose a right-handed one. Right
283 # now the numbers correspond to whatever first was made to work
284 # though!
285 t = {(0, 1): (0, 1, 2),
286 (0, 2): (0, 2, 1),
287 (1, 0): (1, 0, 2),
288 (1, 2): (0, 1, 2),
289 (2, 1): (0, 2, 1),
290 (2, 0): (1, 2, 0)}[(distribute_dir, reduce_dir)]
292 ranks = np.arange(gd.comm.size).reshape(gd.parsize_c).transpose(*t).ravel()
293 comm2 = gd.comm.new_communicator(ranks)
294 gd2 = gd.new_descriptor(comm=comm2, parsize_c=parsize2_c)
295 return gd2
298class Domains:
299 def __init__(self, domains_cp):
300 self.domains_cp = domains_cp
301 self.parsize_c = tuple(len(domains_cp[c]) - 1 for c in range(3))
303 def get_global_shape(self):
304 return tuple(self.domains_cp[c][-1]
305 - self.domains_cp[c][0] for c in range(3))
307 def get_offset(self, parpos_c):
308 offset_c = [self.domains_cp[c][parpos_c[c]] for c in range(3)]
309 return np.array(offset_c)
311 def get_box(self, parpos_c):
312 parpos_c = np.array(parpos_c)
313 offset_c = self.get_offset(parpos_c)
314 nextoffset_c = self.get_offset(parpos_c + 1)
315 return offset_c, nextoffset_c - offset_c
317 def as_serial(self):
318 return Domains([[self.domains_cp[c][0], self.domains_cp[c][-1]]
319 for c in range(3)])
322def random_subcomm(comm, gen, size):
323 allranks = np.arange(comm.size)
324 wranks = gen.choice(allranks, size=size, replace=False)
325 subcomm = comm.new_communicator(wranks)
326 return wranks, subcomm
329# For testing
330class RandomDistribution:
331 def __init__(self, comm, domains, gen):
332 self.world = comm
333 self.domains = domains
334 size = np.prod(domains.parsize_c)
335 wranks, subcomm = random_subcomm(comm, gen, size)
336 self.comm = subcomm
338 # We really need to know this information globally, even where
339 # subcomm is None.
340 comm.broadcast(wranks, 0)
341 self.wranks = wranks
342 self.subranks = {}
343 for i, wrank in enumerate(wranks):
344 self.subranks[wrank] = i
346 domain_order = np.arange(size)
347 gen.shuffle(domain_order)
348 self.ranks3d = domain_order.reshape(domains.parsize_c)
350 coords_iter = itertools.product(*[range(domains.parsize_c[c])
351 for c in range(3)])
352 self.coords_by_rank = list(coords_iter)
354 def parpos2rank(self, parpos_c):
355 return self.wranks[self.ranks3d[parpos_c]]
357 def rank2parpos(self, rank):
358 subrank = self.subranks.get(rank, None)
359 if subrank is None:
360 return None
361 return self.coords_by_rank[subrank]
363 def get_test_array(self, dtype=int):
364 if self.comm is None:
365 return np.zeros((0, 0, 0), dtype=dtype)
367 parpos_c = self.rank2parpos(self.world.rank)
368 offset_c, size_c = self.domains.get_box(parpos_c)
369 arr = np.empty(size_c, dtype=dtype)
370 arr[:] = self.comm.rank
371 return arr
374def general_redistribute(comm, domains1, domains2, rank2parpos1, rank2parpos2,
375 src_xg, dst_xg, behavior='overwrite', xp=np):
376 """Redistribute array arbitrarily.
378 Generally, this function redistributes part of an array into part
379 of another array. It is thus not necessarily one-to-one, but can
380 be used to perform a redistribution to or from a larger, padded
381 array, for example.
383 """
384 assert src_xg.dtype == dst_xg.dtype
385 # Reshaping as arr.reshape(-1, *shape) fails when some dimensions are zero.
386 # Also, make sure datatype is correct even for 0-length tuples:
387 nx = np.prod(src_xg.shape[:-3], dtype=int)
388 src_xg = src_xg.reshape(nx, *src_xg.shape[-3:])
389 dst_xg = dst_xg.reshape(nx, *dst_xg.shape[-3:])
390 assert src_xg.shape[0] == dst_xg.shape[0]
391 assert behavior in ['overwrite', 'add']
393 if not isinstance(domains1, Domains):
394 domains1 = Domains(domains1)
395 if not isinstance(domains2, Domains):
396 domains2 = Domains(domains2)
398 # Get global coords for local slice
399 myparpos1_c = rank2parpos1(comm.rank)
400 if myparpos1_c is not None:
401 myoffset1_c, mysize1_c = domains1.get_box(myparpos1_c)
402 assert np.all(mysize1_c == src_xg.shape[-3:]), \
403 (mysize1_c, src_xg.shape[-3:])
404 myparpos2_c = rank2parpos2(comm.rank)
405 if myparpos2_c is not None:
406 myoffset2_c, mysize2_c = domains2.get_box(myparpos2_c)
407 assert np.all(mysize2_c == dst_xg.shape[-3:]), \
408 (mysize2_c, dst_xg.shape[-3:])
410 sendranks = []
411 recvranks = []
412 sendchunks = []
413 recvchunks = []
415 sendcounts, recvcounts, senddispls, recvdispls = np.zeros((4, comm.size),
416 dtype=int)
418 # The plan is to loop over all ranks, then figure out:
419 #
420 # 1) What do we have to send to that rank
421 # 2) What are we going to receive from that rank
422 #
423 # Some ranks may not hold any data before, or after, or both.
425 def _intersection(myoffset_c, mysize_c, offset_c, size_c, arr_g):
426 # Get intersection of two rectangles, given as offset and size
427 # in global coordinates. Returns None if no intersection,
428 # else the appropriate slice of the local array
429 start_c = np.max([myoffset_c, offset_c], axis=0)
430 stop_c = np.min([myoffset_c + mysize_c, offset_c + size_c], axis=0)
431 if (stop_c - start_c > 0).all():
432 # Reduce to local array coordinates:
433 start_c -= myoffset_c
434 stop_c -= myoffset_c
435 return arr_g[:, start_c[0]:stop_c[0], start_c[1]:stop_c[1],
436 start_c[2]:stop_c[2]]
437 else:
438 return None
440 def get_sendchunk(offset_c, size_c):
441 return _intersection(myoffset1_c, mysize1_c, offset_c, size_c, src_xg)
443 def get_recvchunk(offset_c, size_c):
444 return _intersection(myoffset2_c, mysize2_c, offset_c, size_c, dst_xg)
446 nsendtotal = 0
447 nrecvtotal = 0
448 for rank in range(comm.size):
449 # Proceed only if we have something to send
450 if myparpos1_c is not None:
451 parpos2_c = rank2parpos2(rank)
452 # Proceed only if other rank is going to receive something
453 if parpos2_c is not None:
454 offset2_c, size2_c = domains2.get_box(parpos2_c)
455 sendchunk = get_sendchunk(offset2_c, size2_c)
456 if sendchunk is not None:
457 sendcounts[rank] = sendchunk.size
458 senddispls[rank] = nsendtotal
459 nsendtotal += sendchunk.size
460 sendchunks.append(sendchunk)
461 sendranks.append(rank)
463 # Proceed only if we are going to receive something
464 if myparpos2_c is not None:
465 parpos1_c = rank2parpos1(rank)
466 # Proceed only if other rank has something to send
467 if parpos1_c is not None:
468 offset1_c, size1_c = domains1.get_box(parpos1_c)
469 recvchunk = get_recvchunk(offset1_c, size1_c)
470 if recvchunk is not None:
471 recvcounts[rank] = recvchunk.size
472 recvdispls[rank] = nrecvtotal
473 nrecvtotal += recvchunk.size
474 recvchunks.append(recvchunk)
475 recvranks.append(rank)
477 # MPI wants contiguous buffers; who are we to argue:
478 sendbuf = xp.empty(nsendtotal, src_xg.dtype)
479 recvbuf = xp.empty(nrecvtotal, src_xg.dtype)
481 # Copy non-contiguous slices into contiguous sendbuffer:
482 for sendrank, sendchunk in zip(sendranks, sendchunks):
483 nstart = senddispls[sendrank]
484 nstop = nstart + sendcounts[sendrank]
485 sendbuf[nstart:nstop] = sendchunk.ravel()
487 # Finally!
488 comm.alltoallv(sendbuf, sendcounts, senddispls,
489 recvbuf, recvcounts, recvdispls)
491 # Now copy from the recvbuffer into the actual destination array:
492 for recvrank, recvchunk in zip(recvranks, recvchunks):
493 nstart = recvdispls[recvrank]
494 nstop = nstart + recvcounts[recvrank]
495 buf = recvbuf[nstart:nstop]
496 if behavior == 'overwrite':
497 recvchunk.flat[:] = buf
498 elif behavior == 'add':
499 recvchunk.flat[:] += buf
502def test_general_redistribute():
503 from gpaw.mpi import world
505 domains1 = Domains([[0, 1],
506 [1, 3, 5, 6],
507 [0, 5, 9]])
508 domains2 = Domains([[0, 1],
509 [0, 2, 4, 7, 10],
510 [2, 4, 6, 7]])
512 serial = domains2.as_serial()
514 gen = np.random.RandomState(42)
516 dist1 = RandomDistribution(world, domains1, gen)
517 dist2 = RandomDistribution(world, domains2, gen)
519 arr1 = dist1.get_test_array()
520 arr2 = dist2.get_test_array()
521 print('shapes', arr1.shape, arr2.shape)
522 arr2[:] = -1
524 general_redistribute(world, domains1, domains2,
525 dist1.rank2parpos, dist2.rank2parpos,
526 arr1, arr2)
528 dist_serial = RandomDistribution(world, serial, gen)
529 arr3 = dist_serial.get_test_array()
530 general_redistribute(world, domains2, serial, dist2.rank2parpos,
531 dist_serial.rank2parpos, arr2, arr3)
532 print(arr3)
535def playground():
536 np.set_printoptions(linewidth=176)
537 # N_c = [4, 7, 9]
538 N_c = [4, 4, 2]
540 pbc_c = (1, 1, 1)
542 # 210
543 distribute_dir = 1
544 reduce_dir = 0
546 parsize_c = (2, 2, 2)
547 parsize2_c = list(parsize_c)
548 parsize2_c[reduce_dir] = 1
549 parsize2_c[distribute_dir] *= parsize_c[reduce_dir]
550 assert np.prod(parsize2_c) == np.prod(parsize_c)
552 gd = GridDescriptor(N_c=N_c, pbc_c=pbc_c, cell_cv=0.2 * np.array(N_c),
553 parsize_c=parsize_c)
555 gd2 = get_compatible_grid_descriptor(gd, distribute_dir, reduce_dir)
557 src = gd.zeros(dtype=complex)
558 src[:] = gd.comm.rank
560 src_global = gd.collect(src)
561 if gd.comm.rank == 0:
562 ind = np.indices(src_global.shape)
563 src_global += 1j * (ind[0] / 10. + ind[1] / 100. + ind[2] / 1000.)
564 # src_global[1] += 0.5j
565 print('GLOBAL ARRAY', src_global.shape)
566 print(src_global.squeeze())
567 gd.distribute(src_global, src)
568 goal = gd2.zeros(dtype=float)
569 goal[:] = gd.comm.rank # get_members()[gd2.comm.rank]
570 goal_global = gd2.collect(goal)
571 if gd.comm.rank == 0:
572 print('GOAL GLOBAL')
573 print(goal_global.squeeze())
574 gd.comm.barrier()
576 recvbuf = redistribute(gd, gd2, src, distribute_dir, reduce_dir,
577 operation='forth')
578 recvbuf_master = gd2.collect(recvbuf)
579 if gd2.comm.rank == 0:
580 print('RECV')
581 print(recvbuf_master)
582 err = src_global - recvbuf_master
583 print('MAXERR', np.abs(err).max())
585 hopefully_orig = redistribute(gd, gd2, recvbuf, distribute_dir, reduce_dir,
586 operation='back')
587 tmp = gd.collect(hopefully_orig)
588 if gd.comm.rank == 0:
589 print('FINALLY')
590 print(tmp)
591 err2 = src_global - tmp
592 print('MAXERR', np.abs(err2).max())
595def test(N_c, gd, gd2, reduce_dir, distribute_dir, verbose=True):
596 src = gd.zeros(dtype=complex)
597 src[:] = gd.comm.rank
599 # if gd.comm.rank == 0:
600 # print(gd)
601 # print('hmmm', gd, gd2)
603 src_global = gd.collect(src)
604 if gd.comm.rank == 0:
605 ind = np.indices(src_global.shape)
606 src_global += 1j * (ind[0] / 10. + ind[1] / 100. + ind[2] / 1000.)
607 # src_global[1] += 0.5j
608 if verbose:
609 print('GLOBAL ARRAY', src_global.shape)
610 print(src_global)
611 gd.distribute(src_global, src)
612 goal = gd2.zeros(dtype=float)
613 goal[:] = gd2.comm.rank
614 goal_global = gd2.collect(goal)
615 if gd.comm.rank == 0 and verbose:
616 print('GOAL GLOBAL')
617 print(goal_global)
618 gd.comm.barrier()
620 recvbuf = redistribute(gd, gd2, src, distribute_dir, reduce_dir,
621 operation='forth')
622 recvbuf_master = gd2.collect(recvbuf)
623 # if np.all(N_c == [10, 16, 24]):
624 # recvbuf_master[5,8,3] = 7
625 maxerr = 0.0
626 if gd2.comm.rank == 0:
627 # if N_c[0] == 7:
628 # recvbuf_master[5, 0, 0] = 7
629 # recvbuf_master[0,0,0] = 7
630 err = src_global - recvbuf_master
631 maxerr = np.abs(err).max()
632 if verbose:
633 print('RECV FORTH')
634 print(recvbuf_master)
635 print('MAXERR', maxerr)
636 maxerr = gd.comm.sum_scalar(maxerr)
637 assert maxerr == 0.0, 'bad values after distribute "forth"'
639 recvbuf2 = redistribute(gd, gd2, recvbuf, distribute_dir, reduce_dir,
640 operation='back')
642 final_err = gd.comm.sum_scalar(np.abs(src - recvbuf2).max())
643 assert final_err == 0.0, 'bad values after distribute "back"'
646def rigorous_testing():
647 from itertools import product, permutations, cycle
648 from gpaw.mpi import world
649 gridpointcounts = [1, 2, 10, 21]
650 cpucounts = np.arange(1, world.size + 1)
651 pbc = cycle(product([0, 1], [0, 1], [0, 1]))
653 # This yields all possible parallelizations!
654 for parsize_c in product(cpucounts, cpucounts, cpucounts):
655 if np.prod(parsize_c) != world.size:
656 continue
658 # All possible grid point counts
659 for N_c in product(gridpointcounts, gridpointcounts, gridpointcounts):
661 # We simply can't be bothered to also do all possible
662 # combinations with PBCs. Trying every possible set of
663 # boundary conditions at least ones should be quite fine
664 # enough.
665 pbc_c = next(pbc)
666 for dirs in permutations([0, 1, 2]):
667 independent_dir, distribute_dir, reduce_dir = dirs
669 parsize2_c = list(parsize_c)
670 parsize2_c[reduce_dir] = 1
671 parsize2_c[distribute_dir] *= parsize_c[reduce_dir]
672 parsize2_c = tuple(parsize2_c)
673 assert np.prod(parsize2_c) == np.prod(parsize_c)
675 try:
676 gd = GridDescriptor(N_c=N_c, pbc_c=pbc_c,
677 cell_cv=0.2 * np.array(N_c),
678 parsize_c=parsize_c)
679 gd2 = get_compatible_grid_descriptor(gd, distribute_dir,
680 reduce_dir)
682 # gd2 = gd.new_descriptor(parsize_c=parsize2_c)
683 except ValueError: # Skip illegal distributions
684 continue
686 if gd.comm.rank == 1:
687 # print(gd, gd2)
688 print('N_c=%s[%s] redist %s -> %s [ind=%d dist=%d red=%d]'
689 % (N_c, pbc_c, parsize_c, parsize2_c,
690 independent_dir, distribute_dir, reduce_dir))
691 gd.comm.barrier()
692 test(N_c, gd, gd2, reduce_dir, distribute_dir,
693 verbose=False)
696if __name__ == '__main__':
697 test_general_redistribute()
698 # playground()
699 # rigorous_testing()