ESyS-Particle  4.0.1
comm_coll.hpp
1 
2 // //
3 // Copyright (c) 2003-2011 by The University of Queensland //
4 // Earth Systems Science Computational Centre (ESSCC) //
5 // http://www.uq.edu.au/esscc //
6 // //
7 // Primary Business: Brisbane, Queensland, Australia //
8 // Licensed under the Open Software License version 3.0 //
9 // http://www.opensource.org/licenses/osl-3.0.php //
10 // //
12 
13 
14 // --- collective communication primitives for TML_Comm ---
15 #include "tml/message/packed_multi_message.h"
16 
22 template <typename T>
23 void TML_Comm::broadcast(T data)
24 {
25  MPI_Bcast(&data,1,GetType(data),rank(),m_comm);
26 }
27 
34 template <typename T>
35 void TML_Comm::broadcast_array(T* data,int ndata)
36 {
37  MPI_Bcast(data,ndata,GetType(*data),rank(),m_comm);
38 }
45 template <typename T>
46 void TML_Comm::broadcast_cont(const T& data)
47 {
48  int data_size=data.size();
49 
50  // setup buffer
51  typename T::value_type *buffer=new typename T::value_type[data_size];
52 // put data into buffer
53  int count=0;
54 
55  for(typename T::const_iterator iter=data.begin();
56  iter!=data.end();
57  iter++){
58  void* buf=reinterpret_cast<void*>(&(buffer[count])); // get memory adress for buffer element
59  new(buf)(typename T::value_type)(*iter); // initialize object at this adress
60  // the placement new stuff (see Stroustrup, p.255ff) is necessary
61  // because assignment buffer[count]=(*iter) doesn't work if the
62  // T::value_type contains constant member data. Initialization by
63  // calling the copy constructor directly doesn't work for builtin
64  // types
65  count++;
66  }
67 
68  // broadcast size
69  broadcast(data_size);
70 
71  // broadcast buffer
72  broadcast_array(buffer,data_size);
73 
74  // clean up
75  delete [] buffer;
76 }
77 
84 template <typename T>
86 {
87  TML_Packed_Message* msg=new TML_Packed_Message(m_comm);
88  int nb_data=data.size();
89 
90  msg->pack(nb_data); // pack number of items first
91  // pack data
92  for(typename T::const_iterator iter=data.begin();
93  iter!=data.end();
94  iter++){
95  msg->pack(*iter);
96  }
97  // broadcast size
98  broadcast(msg->size());
99 
100  // broadcast data
101  broadcast_array(msg->buffer(),msg->size());
102 
103  delete msg;
104 }
105 
112 template <typename T>
113 void TML_Comm::recv_broadcast(T& data,int root)
114 {
115  MPI_Bcast(&data,1,GetType(data),root,m_comm);
116 }
117 
125 template <typename T>
126 void TML_Comm::recv_broadcast_array(T* data,int ndata,int root)
127 {
128  MPI_Bcast(data,ndata,GetType(*data),root,m_comm);
129 }
130 
138 template <typename T>
139 void TML_Comm::recv_broadcast_cont(T& data,int root)
140 {
141  int data_size;
142 
143  //get size
144  recv_broadcast(data_size,root);
145  // setup recv buffer
146  typename T::value_type *buffer=new typename T::value_type[data_size];
147 
148  //get data
149  recv_broadcast_array(buffer,data_size,root);
150  // insert into container
151  for(int i=0;i<data_size;i++){
152  data.insert(data.end(),buffer[i]);
153  }
154 
155  //clean up
156  delete [] buffer;
157 }
165 template <typename T>
167 {
168  int msg_size; // total size of the message
169  int nb_data; // number of data (i.e. the expected data.size())
170 
171  //get size
172  recv_broadcast(msg_size,root);
173 
174  //setup message
175  TML_Packed_Message* msg=new TML_Packed_Message(m_comm,msg_size);
176 
177  //get data
178  recv_broadcast_array(msg->buffer(),msg_size,root);
179  // extract nuber of items
180  nb_data=msg->pop_int();
181 
182  // unpack data
183  for(int i=0;i<nb_data;i++){
184  typename T::value_type tv;
185  msg->unpack(tv);
186  data.insert(data.end(),tv);
187  }
188 
189  // clean up
190  delete msg;
191 }
192 
200 template <typename T>
201 void TML_Comm::scatter(const multimap<int,T> data)
202 {
203  // put data into buffer
204  int total_size=data.size();
205  T* buffer=new T[total_size];
206  int comm_size=size();
207  int *size_buffer=new int[comm_size];
208  int *offs_buffer=new int[comm_size];
209  int count=0;
210  int count_p=0;
211  for(int i=0;i<comm_size;i++){
212  typename multimap<int,T>::const_iterator begin=data.find(i);
213  typename multimap<int,T>::const_iterator end=data.upper_bound(i);
214  if(begin!=data.end()){
215  for(typename multimap<int,T>::const_iterator iter=begin;
216  iter!=end;
217  iter++){
218  buffer[count]=iter->second;
219  count++;
220  }
221  }
222  size_buffer[i]=count-count_p;
223  count_p=count;
224  }
225  // send size info
226  int dummy;
227  MPI_Scatter(size_buffer,1,MPI_INT,&dummy,1,MPI_INT,rank(),m_comm);
228  // construct offsets from sizes
229  offs_buffer[0]=0;
230  for(int i=1;i<comm_size;i++){
231  offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1];
232  }
233  // send data
234  T dummy2;
235  MPI_Scatterv(buffer,size_buffer,offs_buffer,GetType(*buffer),&dummy2,0,GetType(*buffer),rank(),m_comm);
236  // clean up
237  delete [] size_buffer;
238  delete [] offs_buffer;
239  delete [] buffer;
240 }
241 
248 template <typename T>
249 void TML_Comm::recv_scatter(T& data,int root)
250 {
251  // receive size
252  int size;
253  MPI_Scatter(NULL,0,MPI_INT,&size,1,MPI_INT,root,m_comm);
254  // allocate buffer buffer
255  typename T::value_type *buffer=new typename T::value_type[size];
256  // receive data
257  MPI_Scatterv(NULL,NULL,NULL,MPI_INT,buffer,size,GetType(*buffer),root,m_comm);
258  // put data in container
259  for(int i=0;i<size;i++){
260  data.insert(data.end(),buffer[i]);
261  }
262  // clean up
263  delete [] buffer;
264 }
265 
272 template <typename T>
273 void TML_Comm::gather(multimap<int,T>& data)
274 {
275  int dummy=0;
276  int comm_size=size();
277  int *size_buffer=new int[comm_size];
278  int *offs_buffer=new int[comm_size];
279  // receive sizes
280  MPI_Gather(&dummy,1,MPI_INT,size_buffer,1,MPI_INT,rank(),m_comm);
281  int totalsize=0;
282  for(int i=0;i<comm_size;i++){
283  totalsize+=size_buffer[i];
284  }
285  // setup receive buffer
286  T *buffer=new T[totalsize];
287  // construct offsets from sizes
288  offs_buffer[0]=0;
289  for(int i=1;i<comm_size;i++){
290  offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1];
291  }
292  // receive data
293  T dummy2;
294  MPI_Gatherv(&dummy2,0,GetType(dummy),buffer,size_buffer,offs_buffer,GetType(*buffer),rank(),m_comm);
295  // put data into multimap
296  for(int i=0;i<comm_size;i++){
297  for(int j=offs_buffer[i];j<offs_buffer[i]+size_buffer[i];j++){
298  data.insert(pair<int,T>(i,buffer[j]));
299  }
300  }
301  // clean up
302  delete [] size_buffer;
303  delete [] offs_buffer;
304  delete [] buffer;
305 }
306 
314 template <typename T>
315 void TML_Comm::gather_debug(multimap<int,T>& data)
316 {
317  int dummy=0;
318  int comm_size=size();
319  int *size_buffer=new int[comm_size];
320  int *offs_buffer=new int[comm_size];
321  // receive sizes
322  MPI_Gather(&dummy,1,MPI_INT,size_buffer,1,MPI_INT,rank(),m_comm);
323  int totalsize=0;
324  for(int i=0;i<comm_size;i++){
325  console.Debug() << "buffer size " << i << " - " << size_buffer[i] << "\n";
326  totalsize+=size_buffer[i];
327  }
328  // setup receive buffer
329  T *buffer=new T[totalsize];
330  // construct offsets from sizes
331  offs_buffer[0]=0;
332  for(int i=1;i<comm_size;i++){
333  offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1];
334  }
335  // receive data
336  T dummy2;
337  MPI_Gatherv(&dummy2,0,GetType(dummy),buffer,size_buffer,offs_buffer,GetType(*buffer),rank(),m_comm);
338  // put data into multimap
339  for(int i=0;i<comm_size;i++){
340  for(int j=offs_buffer[i];j<offs_buffer[i]+size_buffer[i];j++){
341  data.insert(pair<int,T>(i,buffer[j]));
342  }
343  }
344  // clean up
345  delete [] size_buffer;
346  delete [] offs_buffer;
347  delete [] buffer;
348 }
349 
356 template <typename T>
357 void TML_Comm::send_gather(T& data,int root)
358 {
359  // send size
360  int size=data.size();
361  MPI_Gather(&size,1,MPI_INT,NULL,0,MPI_INT,root,m_comm);
362  // setup send buffer
363  typename T::value_type *buffer=new typename T::value_type[size];
364  // put data into send buffer
365  int count=0;
366  for(typename T::const_iterator iter=data.begin();
367  iter!=data.end();
368  iter++){
369  buffer[count]=*iter;
370  count++;
371  }
372  // send data
373  MPI_Gatherv(buffer,size,GetType(*buffer),NULL,NULL,NULL,MPI_INT,root,m_comm);
374  // clean up
375  delete [] buffer;
376 }
377 
378 
385 template <typename T>
386 void TML_Comm::send_gather_debug(T& data,int root)
387 {
388  // send size
389  int size=data.size();
390  console.Debug() << "send size :" << size << "\n";
391  MPI_Gather(&size,1,MPI_INT,NULL,0,MPI_INT,root,m_comm);
392  // setup send buffer
393  typename T::value_type *buffer=new typename T::value_type[size];
394  // put data into send buffer
395  int count=0;
396  for(typename T::const_iterator iter=data.begin();
397  iter!=data.end();
398  iter++){
399  buffer[count]=*iter;
400  count++;
401  }
402  console.Debug() << "send count :" << count << "\n";
403  // send data
404  MPI_Gatherv(buffer,size,GetType(*buffer),NULL,NULL,NULL,MPI_INT,root,m_comm);
405  // clean up
406  delete [] buffer;
407 }
408 
409 
417 template <typename T>
418 void TML_Comm::scatter_packed(const multimap<int,T> data)
419 {
420  int comm_size=size();
421  // construct new packed multimessage
423  // pack data
424  for(int i=1;i<comm_size;i++){
425  typename multimap<int,T>::const_iterator begin=data.find(i);
426  typename multimap<int,T>::const_iterator end=data.upper_bound(i);
427  (*msg)[i].pack(int(data.count(i)));
428  if(begin!=data.end()){
429  for(typename multimap<int,T>::const_iterator iter=begin;
430  iter!=end;
431  iter++){
432  (*msg)[i].pack(iter->second);
433  }
434  }
435  }
436  // send size
437  int dummy;
438  MPI_Scatter(msg->sizes(),1,MPI_INT,&dummy,1,MPI_INT,rank(),m_comm);
439  // send data
440  T dummy2;
441  MPI_Datatype data_type=GetType(*(msg->buffer()));
442  MPI_Scatterv(msg->buffer(),msg->sizes(),msg->offsets(),data_type,
443  &dummy2,0,data_type,rank(),m_comm);
444  // clean up
445  delete msg;
446 }
447 
454 template <typename T>
455 void TML_Comm::recv_scatter_packed(T& data,int root)
456 {
457  // receive size
458  int msg_size;
459  MPI_Scatter(NULL,0,MPI_INT,&msg_size,1,MPI_INT,root,m_comm);
460  // construct packed message of sufficient size
461  TML_Packed_Message* msg=new TML_Packed_Message(m_comm,msg_size);
462 
463  // recceive data
464  MPI_Datatype data_type=GetType(*(msg->buffer()));
465  MPI_Scatterv(NULL,NULL,NULL,MPI_INT,msg->buffer(),msg_size,data_type,root,m_comm);
466  // extract number of items
467  msg->begin_unpack();
468  int nitems=msg->pop_int();
469  // unpack data
470  typename T::value_type item;
471  for(int i=0;i<nitems;i++){
472  msg->unpack(item);
473  data.insert(data.end(),item);
474  }
475  // clean up
476  delete msg;
477 }
478 
479 template <typename T>
480 void TML_Comm::gather_packed(multimap<int,T> &data)
481 {
482  console.Debug() << "TML_Comm::gather_packed: enter\n";
483  int dummy=0;
484  int comm_size=size();
485  int *size_buffer=new int[comm_size];
486  int *offs_buffer=new int[comm_size];
487  // receive sizes
488  console.Debug()
489  << "TML_Comm::gather_packed: gathering sizes\n";
490  MPI_Gather(&dummy,1,MPI_INT,size_buffer,1,MPI_INT,rank(),m_comm);
491  int totalsize=0;
492  for(int i=0;i<comm_size;i++){
493  //console.Debug()
494  // << "TML_Comm::gather_packed:"
495  // << " size from rank " << i << " = " << size_buffer[i] << "\n";
496  totalsize+=size_buffer[i];
497  }
498  console.Debug()
499  << "TML_Comm::gather_packed: total msg size = " << totalsize << "\n";
500  // setup receive buffer
501  //setup message
502  TML_Packed_Message* msg=new TML_Packed_Message(m_comm,totalsize);
503 
504  // construct offsets from sizes
505  offs_buffer[0]=0;
506  for(int i=1;i<comm_size;i++){
507  offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1];
508  }
509  // receive data
510  T dummy2;
511  console.Debug()
512  << "TML_Comm::gather_packed: gathering data\n";
513  MPI_Gatherv(
514  &dummy2,0, GetType(dummy), msg->buffer(), size_buffer, offs_buffer,
515  GetType(*(msg->buffer())),rank(),m_comm
516  );
517  // put data into multimap
518  console.Debug()
519  << "TML_Comm::gather_packed: unpacking into multi-map\n";
520  for(int i=0;i<comm_size;i++){
521  if (size_buffer[i] > 0)
522  {
523  const int numElems = msg->pop_int();
524  for(int j=0; j < numElems; j++){
525  //console.Debug()
526  // << "TML_Comm::gather_packed:"
527  // << " unpacking object (" << i << "," << j << ")\n";
528  T t;
529  msg->unpack(t);
530  data.insert(pair<int,T>(i, t));
531  }
532  }
533  }
534  console.Debug() << "TML_Comm::gather_packed: done unpacking into multi-map\n";
535  // clean up
536  delete [] size_buffer;
537  delete [] offs_buffer;
538  delete msg;
539  console.Debug() << "TML_Comm::gather_packed: exit\n";
540 }
541 
542 template <typename T>
543 void TML_Comm::send_gather_packed(const T &data, int root)
544 {
545  console.Debug() << "TML_Comm::send_gather_packed: enter\n";
546  // setup send buffer
547  TML_Packed_Message* msg =
548  new TML_Packed_Message(
549  m_comm,
550  std::max(static_cast<size_t>(64), data.size()*sizeof(typename T::value_type))
551  );
552  // put data into send buffer
553  const int numElems = data.size();
554  msg->pack(numElems);
555  for(typename T::const_iterator iter=data.begin();
556  iter!=data.end();
557  iter++){
558  msg->pack(*iter);
559  }
560 
561  // send size
562  int size = msg->size();
563  console.Debug() << "TML_Comm::send_gather_packed: sending data size...\n";
564  MPI_Gather(&size,1,MPI_INT,NULL,0,MPI_INT,root,m_comm);
565 
566  // send data
567  console.Debug() << "TML_Comm::send_gather_packed: sending data...\n";
568  MPI_Gatherv(
569  msg->buffer(), msg->size(),
570  GetType(*(msg->buffer())),
571  NULL, NULL, NULL, MPI_INT, root, m_comm
572  );
573  // clean up
574  delete msg;
575  console.Debug() << "TML_Comm::send_gather_packed: exit\n";
576 }
577 
578 template <typename T>
579 T TML_Comm::sum_all(const T& data)
580 {
581  T res;
582 
583  MPI_Datatype data_type=GetType(data);
584  MPI_Allreduce((void*)(&data),(void*)(&res),1,data_type,MPI_SUM,m_comm);
585 
586  return res;
587 }