ESyS-Particle  4.0.1
comm_coll.hpp
00001 
00002 //                                                         //
00003 // Copyright (c) 2003-2011 by The University of Queensland //
00004 // Earth Systems Science Computational Centre (ESSCC)      //
00005 // http://www.uq.edu.au/esscc                              //
00006 //                                                         //
00007 // Primary Business: Brisbane, Queensland, Australia       //
00008 // Licensed under the Open Software License version 3.0    //
00009 // http://www.opensource.org/licenses/osl-3.0.php          //
00010 //                                                         //
00012 
00013 
00014 // --- collective communication primitives for TML_Comm ---
00015 #include "tml/message/packed_multi_message.h"
00016 
00022 template <typename T> 
00023 void TML_Comm::broadcast(T data)
00024 {
00025   MPI_Bcast(&data,1,GetType(data),rank(),m_comm);
00026 }
00027 
00034 template <typename T> 
00035 void TML_Comm::broadcast_array(T* data,int ndata)
00036 {
00037   MPI_Bcast(data,ndata,GetType(*data),rank(),m_comm);
00038 }
00045 template <typename T> 
00046 void TML_Comm::broadcast_cont(const T& data)
00047 {
00048   int data_size=data.size();
00049 
00050   // setup buffer
00051   typename T::value_type *buffer=new typename T::value_type[data_size];
00052 // put data into buffer
00053   int count=0;
00054   
00055   for(typename T::const_iterator iter=data.begin();
00056       iter!=data.end();
00057       iter++){
00058     void* buf=reinterpret_cast<void*>(&(buffer[count])); // get memory adress for buffer element
00059     new(buf)(typename T::value_type)(*iter); // initialize object at this adress
00060     // the placement new stuff (see Stroustrup, p.255ff) is necessary
00061     // because assignment buffer[count]=(*iter) doesn't work if the 
00062     // T::value_type contains constant member data. Initialization by 
00063     // calling the copy constructor directly doesn't work for builtin
00064     // types
00065     count++;
00066   }
00067 
00068   // broadcast size 
00069   broadcast(data_size);
00070 
00071   // broadcast buffer
00072   broadcast_array(buffer,data_size);
00073 
00074   // clean up
00075   delete [] buffer;
00076 }
00077 
00084 template <typename T> 
00085 void TML_Comm::broadcast_cont_packed(const T &data)
00086 {
00087   TML_Packed_Message* msg=new TML_Packed_Message(m_comm);
00088   int nb_data=data.size();
00089   
00090   msg->pack(nb_data); // pack number of items first
00091   // pack data
00092   for(typename T::const_iterator iter=data.begin();
00093       iter!=data.end();
00094       iter++){
00095     msg->pack(*iter);
00096   }
00097   // broadcast size
00098   broadcast(msg->size());
00099 
00100   // broadcast data
00101   broadcast_array(msg->buffer(),msg->size());
00102 
00103   delete msg;
00104 }
00105 
00112 template <typename T> 
00113 void TML_Comm::recv_broadcast(T& data,int root)
00114 {
00115   MPI_Bcast(&data,1,GetType(data),root,m_comm);
00116 }
00117 
00125 template <typename T> 
00126 void TML_Comm::recv_broadcast_array(T* data,int ndata,int root)
00127 {
00128   MPI_Bcast(data,ndata,GetType(*data),root,m_comm);
00129 }
00130 
00138 template <typename T> 
00139 void TML_Comm::recv_broadcast_cont(T& data,int root)
00140 {
00141   int data_size;
00142 
00143   //get size
00144   recv_broadcast(data_size,root);
00145   // setup recv buffer
00146   typename T::value_type *buffer=new typename T::value_type[data_size];
00147 
00148   //get data
00149   recv_broadcast_array(buffer,data_size,root);
00150   // insert into container
00151   for(int i=0;i<data_size;i++){
00152     data.insert(data.end(),buffer[i]);
00153   }
00154 
00155   //clean up
00156   delete [] buffer;
00157 }
00165 template <typename T> 
00166 void TML_Comm::recv_broadcast_cont_packed(T& data,int root)
00167 {
00168   int msg_size; // total size of the message
00169   int nb_data;  // number of data (i.e. the expected data.size())
00170 
00171   //get size
00172   recv_broadcast(msg_size,root);
00173 
00174   //setup message
00175   TML_Packed_Message* msg=new TML_Packed_Message(m_comm,msg_size);
00176   
00177   //get data
00178   recv_broadcast_array(msg->buffer(),msg_size,root);
00179    // extract nuber of items
00180   nb_data=msg->pop_int();
00181 
00182   // unpack data
00183   for(int i=0;i<nb_data;i++){
00184     typename T::value_type tv;
00185     msg->unpack(tv);
00186     data.insert(data.end(),tv);
00187   }
00188 
00189   // clean up
00190   delete msg;
00191 }
00192 
00200 template <typename T> 
00201 void TML_Comm::scatter(const multimap<int,T> data)
00202 {
00203   // put data into buffer
00204   int total_size=data.size();
00205   T* buffer=new T[total_size];
00206   int comm_size=size();
00207   int *size_buffer=new int[comm_size];
00208   int *offs_buffer=new int[comm_size];
00209   int count=0;
00210   int count_p=0;
00211   for(int i=0;i<comm_size;i++){
00212     typename multimap<int,T>::const_iterator begin=data.find(i);
00213     typename multimap<int,T>::const_iterator end=data.upper_bound(i);
00214     if(begin!=data.end()){
00215       for(typename multimap<int,T>::const_iterator iter=begin;
00216           iter!=end;
00217           iter++){
00218         buffer[count]=iter->second;
00219         count++;
00220       }
00221     }
00222     size_buffer[i]=count-count_p;
00223     count_p=count;
00224   }
00225   // send size info
00226   int dummy;
00227   MPI_Scatter(size_buffer,1,MPI_INT,&dummy,1,MPI_INT,rank(),m_comm);
00228   // construct offsets from sizes
00229   offs_buffer[0]=0;
00230   for(int i=1;i<comm_size;i++){
00231     offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1];
00232   }
00233   // send data
00234   T dummy2;
00235   MPI_Scatterv(buffer,size_buffer,offs_buffer,GetType(*buffer),&dummy2,0,GetType(*buffer),rank(),m_comm);
00236   // clean up
00237   delete [] size_buffer;
00238   delete [] offs_buffer;
00239   delete [] buffer;
00240 }
00241 
00248 template <typename T> 
00249 void TML_Comm::recv_scatter(T& data,int root)
00250 {
00251   // receive size
00252   int size;
00253   MPI_Scatter(NULL,0,MPI_INT,&size,1,MPI_INT,root,m_comm);
00254   // allocate buffer buffer
00255   typename T::value_type *buffer=new typename T::value_type[size];
00256   // receive data
00257   MPI_Scatterv(NULL,NULL,NULL,MPI_INT,buffer,size,GetType(*buffer),root,m_comm);
00258   // put data in container
00259   for(int i=0;i<size;i++){
00260     data.insert(data.end(),buffer[i]);
00261   }
00262   // clean up
00263   delete [] buffer;
00264 }
00265 
00272 template <typename T> 
00273 void TML_Comm::gather(multimap<int,T>& data)
00274 {
00275   int dummy=0;
00276   int comm_size=size();
00277   int *size_buffer=new int[comm_size];
00278   int *offs_buffer=new int[comm_size];
00279   // receive sizes
00280   MPI_Gather(&dummy,1,MPI_INT,size_buffer,1,MPI_INT,rank(),m_comm);
00281   int totalsize=0;
00282   for(int i=0;i<comm_size;i++){
00283     totalsize+=size_buffer[i];
00284   }
00285   // setup receive buffer
00286   T *buffer=new T[totalsize];
00287   // construct offsets from sizes
00288   offs_buffer[0]=0;
00289   for(int i=1;i<comm_size;i++){
00290     offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1];
00291   }
00292   // receive data
00293   T dummy2;
00294   MPI_Gatherv(&dummy2,0,GetType(dummy),buffer,size_buffer,offs_buffer,GetType(*buffer),rank(),m_comm);
00295   // put data into multimap
00296   for(int i=0;i<comm_size;i++){
00297     for(int j=offs_buffer[i];j<offs_buffer[i]+size_buffer[i];j++){
00298       data.insert(pair<int,T>(i,buffer[j]));
00299     }
00300   }
00301   // clean up
00302   delete [] size_buffer;        
00303   delete [] offs_buffer;
00304   delete [] buffer;
00305 }
00306 
00314 template <typename T> 
00315 void TML_Comm::gather_debug(multimap<int,T>& data)
00316 {
00317   int dummy=0;
00318   int comm_size=size();
00319   int *size_buffer=new int[comm_size];
00320   int *offs_buffer=new int[comm_size];
00321   // receive sizes
00322   MPI_Gather(&dummy,1,MPI_INT,size_buffer,1,MPI_INT,rank(),m_comm);
00323   int totalsize=0;
00324   for(int i=0;i<comm_size;i++){
00325     console.Debug() << "buffer size " << i << " - " << size_buffer[i] << "\n";
00326     totalsize+=size_buffer[i];
00327   }
00328   // setup receive buffer
00329   T *buffer=new T[totalsize];
00330   // construct offsets from sizes
00331   offs_buffer[0]=0;
00332   for(int i=1;i<comm_size;i++){
00333     offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1];
00334   }
00335   // receive data
00336   T dummy2;
00337   MPI_Gatherv(&dummy2,0,GetType(dummy),buffer,size_buffer,offs_buffer,GetType(*buffer),rank(),m_comm);
00338   // put data into multimap
00339   for(int i=0;i<comm_size;i++){
00340     for(int j=offs_buffer[i];j<offs_buffer[i]+size_buffer[i];j++){
00341       data.insert(pair<int,T>(i,buffer[j]));
00342     }
00343   }
00344   // clean up
00345   delete [] size_buffer;        
00346   delete [] offs_buffer;
00347   delete [] buffer;
00348 }
00349 
00356 template <typename T> 
00357 void TML_Comm::send_gather(T& data,int root)
00358 {
00359   // send size
00360   int size=data.size();
00361   MPI_Gather(&size,1,MPI_INT,NULL,0,MPI_INT,root,m_comm);
00362   // setup send buffer
00363   typename T::value_type *buffer=new typename T::value_type[size];
00364   // put data into send buffer
00365   int count=0;
00366   for(typename T::const_iterator iter=data.begin();
00367       iter!=data.end();
00368       iter++){
00369     buffer[count]=*iter;
00370     count++;
00371   }
00372   // send data
00373   MPI_Gatherv(buffer,size,GetType(*buffer),NULL,NULL,NULL,MPI_INT,root,m_comm);
00374   // clean up
00375   delete [] buffer;
00376 }
00377 
00378  
00385 template <typename T> 
00386 void TML_Comm::send_gather_debug(T& data,int root)
00387 {
00388   // send size
00389   int size=data.size();
00390   console.Debug() << "send size :" << size << "\n";
00391   MPI_Gather(&size,1,MPI_INT,NULL,0,MPI_INT,root,m_comm);
00392   // setup send buffer
00393   typename T::value_type *buffer=new typename T::value_type[size];
00394   // put data into send buffer
00395   int count=0;
00396   for(typename T::const_iterator iter=data.begin();
00397       iter!=data.end();
00398       iter++){
00399     buffer[count]=*iter;
00400     count++;
00401   }
00402   console.Debug() << "send count :" << count << "\n";
00403   // send data
00404   MPI_Gatherv(buffer,size,GetType(*buffer),NULL,NULL,NULL,MPI_INT,root,m_comm);
00405   // clean up
00406   delete [] buffer;
00407 }
00408 
00409  
00417 template <typename T> 
00418 void TML_Comm::scatter_packed(const multimap<int,T> data)
00419 {
00420   int comm_size=size();
00421   // construct new packed multimessage
00422   TML_PackedMultiMessage *msg=new TML_PackedMultiMessage(m_comm);
00423   // pack data
00424   for(int i=1;i<comm_size;i++){
00425     typename multimap<int,T>::const_iterator begin=data.find(i);
00426     typename multimap<int,T>::const_iterator end=data.upper_bound(i);
00427     (*msg)[i].pack(int(data.count(i)));
00428     if(begin!=data.end()){
00429       for(typename multimap<int,T>::const_iterator iter=begin;
00430           iter!=end;
00431           iter++){
00432         (*msg)[i].pack(iter->second);
00433       }
00434     }
00435   }
00436   // send size
00437   int dummy;
00438   MPI_Scatter(msg->sizes(),1,MPI_INT,&dummy,1,MPI_INT,rank(),m_comm);  
00439   // send data
00440   T dummy2;
00441   MPI_Datatype data_type=GetType(*(msg->buffer()));
00442   MPI_Scatterv(msg->buffer(),msg->sizes(),msg->offsets(),data_type,
00443                &dummy2,0,data_type,rank(),m_comm);
00444   // clean up
00445   delete msg;
00446 }
00447 
00454 template <typename T> 
00455 void TML_Comm::recv_scatter_packed(T& data,int root)
00456 {
00457   // receive size
00458   int msg_size;
00459   MPI_Scatter(NULL,0,MPI_INT,&msg_size,1,MPI_INT,root,m_comm);
00460   // construct packed message of sufficient size
00461   TML_Packed_Message* msg=new TML_Packed_Message(m_comm,msg_size);
00462 
00463   // recceive data
00464   MPI_Datatype data_type=GetType(*(msg->buffer()));
00465   MPI_Scatterv(NULL,NULL,NULL,MPI_INT,msg->buffer(),msg_size,data_type,root,m_comm);
00466   // extract number of items
00467   msg->begin_unpack();
00468   int nitems=msg->pop_int();
00469   // unpack data
00470   typename T::value_type item;
00471   for(int i=0;i<nitems;i++){
00472     msg->unpack(item);
00473     data.insert(data.end(),item);
00474   }
00475   // clean up
00476   delete msg;
00477 }
00478 
00479 template <typename T> 
00480 void TML_Comm::gather_packed(multimap<int,T> &data)
00481 {
00482   console.Debug() << "TML_Comm::gather_packed: enter\n";
00483   int dummy=0;
00484   int comm_size=size();
00485   int *size_buffer=new int[comm_size];
00486   int *offs_buffer=new int[comm_size];
00487   // receive sizes
00488   console.Debug()
00489      << "TML_Comm::gather_packed: gathering sizes\n";
00490   MPI_Gather(&dummy,1,MPI_INT,size_buffer,1,MPI_INT,rank(),m_comm);
00491   int totalsize=0;
00492   for(int i=0;i<comm_size;i++){
00493     //console.Debug()
00494     //  << "TML_Comm::gather_packed:"
00495     //  << " size from rank " << i << " = " << size_buffer[i] << "\n";
00496     totalsize+=size_buffer[i];
00497   }
00498   console.Debug()
00499      << "TML_Comm::gather_packed: total msg size = " << totalsize << "\n";
00500   // setup receive buffer
00501     //setup message
00502   TML_Packed_Message* msg=new TML_Packed_Message(m_comm,totalsize);
00503   
00504   // construct offsets from sizes
00505   offs_buffer[0]=0;
00506   for(int i=1;i<comm_size;i++){
00507     offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1];
00508   }
00509   // receive data
00510   T dummy2;
00511   console.Debug()
00512      << "TML_Comm::gather_packed: gathering data\n";
00513   MPI_Gatherv(
00514     &dummy2,0, GetType(dummy), msg->buffer(), size_buffer, offs_buffer,
00515     GetType(*(msg->buffer())),rank(),m_comm
00516   );
00517   // put data into multimap
00518   console.Debug()
00519      << "TML_Comm::gather_packed: unpacking into multi-map\n";
00520   for(int i=0;i<comm_size;i++){
00521     if (size_buffer[i] > 0)
00522     {
00523       const int numElems = msg->pop_int();
00524       for(int j=0; j < numElems; j++){
00525         //console.Debug()
00526         //  << "TML_Comm::gather_packed:"
00527         //  << " unpacking object (" << i << "," << j << ")\n";
00528         T t;
00529         msg->unpack(t);
00530         data.insert(pair<int,T>(i, t));
00531       }
00532     }
00533   }
00534   console.Debug() << "TML_Comm::gather_packed: done unpacking into multi-map\n";
00535   // clean up
00536   delete [] size_buffer;
00537   delete [] offs_buffer;
00538   delete msg;
00539   console.Debug() << "TML_Comm::gather_packed: exit\n";
00540 }
00541 
00542 template <typename T> 
00543 void TML_Comm::send_gather_packed(const T &data, int root)
00544 {
00545   console.Debug() << "TML_Comm::send_gather_packed: enter\n";
00546   // setup send buffer
00547   TML_Packed_Message* msg =
00548     new TML_Packed_Message(
00549       m_comm,
00550       std::max(static_cast<size_t>(64), data.size()*sizeof(typename T::value_type))
00551   );
00552   // put data into send buffer
00553   const int numElems = data.size();
00554   msg->pack(numElems);
00555   for(typename T::const_iterator iter=data.begin();
00556       iter!=data.end();
00557       iter++){
00558     msg->pack(*iter);
00559   }
00560 
00561   // send size
00562   int size = msg->size();
00563   console.Debug() << "TML_Comm::send_gather_packed: sending data size...\n";
00564   MPI_Gather(&size,1,MPI_INT,NULL,0,MPI_INT,root,m_comm);
00565   
00566   // send data
00567   console.Debug() << "TML_Comm::send_gather_packed: sending data...\n";
00568   MPI_Gatherv(
00569     msg->buffer(), msg->size(),
00570     GetType(*(msg->buffer())),
00571     NULL, NULL, NULL, MPI_INT, root, m_comm
00572   );
00573   // clean up
00574   delete msg;
00575   console.Debug() << "TML_Comm::send_gather_packed: exit\n";
00576 }
00577 
00578 template <typename T> 
00579 T TML_Comm::sum_all(const T& data)
00580 {
00581   T res;
00582 
00583   MPI_Datatype data_type=GetType(data);
00584   MPI_Allreduce((void*)(&data),(void*)(&res),1,data_type,MPI_SUM,m_comm);
00585 
00586   return res;
00587 }