ESyS-Particle
4.0.1
|
00001 00002 // // 00003 // Copyright (c) 2003-2011 by The University of Queensland // 00004 // Earth Systems Science Computational Centre (ESSCC) // 00005 // http://www.uq.edu.au/esscc // 00006 // // 00007 // Primary Business: Brisbane, Queensland, Australia // 00008 // Licensed under the Open Software License version 3.0 // 00009 // http://www.opensource.org/licenses/osl-3.0.php // 00010 // // 00012 00013 00014 // --- collective communication primitives for TML_Comm --- 00015 #include "tml/message/packed_multi_message.h" 00016 00022 template <typename T> 00023 void TML_Comm::broadcast(T data) 00024 { 00025 MPI_Bcast(&data,1,GetType(data),rank(),m_comm); 00026 } 00027 00034 template <typename T> 00035 void TML_Comm::broadcast_array(T* data,int ndata) 00036 { 00037 MPI_Bcast(data,ndata,GetType(*data),rank(),m_comm); 00038 } 00045 template <typename T> 00046 void TML_Comm::broadcast_cont(const T& data) 00047 { 00048 int data_size=data.size(); 00049 00050 // setup buffer 00051 typename T::value_type *buffer=new typename T::value_type[data_size]; 00052 // put data into buffer 00053 int count=0; 00054 00055 for(typename T::const_iterator iter=data.begin(); 00056 iter!=data.end(); 00057 iter++){ 00058 void* buf=reinterpret_cast<void*>(&(buffer[count])); // get memory adress for buffer element 00059 new(buf)(typename T::value_type)(*iter); // initialize object at this adress 00060 // the placement new stuff (see Stroustrup, p.255ff) is necessary 00061 // because assignment buffer[count]=(*iter) doesn't work if the 00062 // T::value_type contains constant member data. Initialization by 00063 // calling the copy constructor directly doesn't work for builtin 00064 // types 00065 count++; 00066 } 00067 00068 // broadcast size 00069 broadcast(data_size); 00070 00071 // broadcast buffer 00072 broadcast_array(buffer,data_size); 00073 00074 // clean up 00075 delete [] buffer; 00076 } 00077 00084 template <typename T> 00085 void TML_Comm::broadcast_cont_packed(const T &data) 00086 { 00087 TML_Packed_Message* msg=new TML_Packed_Message(m_comm); 00088 int nb_data=data.size(); 00089 00090 msg->pack(nb_data); // pack number of items first 00091 // pack data 00092 for(typename T::const_iterator iter=data.begin(); 00093 iter!=data.end(); 00094 iter++){ 00095 msg->pack(*iter); 00096 } 00097 // broadcast size 00098 broadcast(msg->size()); 00099 00100 // broadcast data 00101 broadcast_array(msg->buffer(),msg->size()); 00102 00103 delete msg; 00104 } 00105 00112 template <typename T> 00113 void TML_Comm::recv_broadcast(T& data,int root) 00114 { 00115 MPI_Bcast(&data,1,GetType(data),root,m_comm); 00116 } 00117 00125 template <typename T> 00126 void TML_Comm::recv_broadcast_array(T* data,int ndata,int root) 00127 { 00128 MPI_Bcast(data,ndata,GetType(*data),root,m_comm); 00129 } 00130 00138 template <typename T> 00139 void TML_Comm::recv_broadcast_cont(T& data,int root) 00140 { 00141 int data_size; 00142 00143 //get size 00144 recv_broadcast(data_size,root); 00145 // setup recv buffer 00146 typename T::value_type *buffer=new typename T::value_type[data_size]; 00147 00148 //get data 00149 recv_broadcast_array(buffer,data_size,root); 00150 // insert into container 00151 for(int i=0;i<data_size;i++){ 00152 data.insert(data.end(),buffer[i]); 00153 } 00154 00155 //clean up 00156 delete [] buffer; 00157 } 00165 template <typename T> 00166 void TML_Comm::recv_broadcast_cont_packed(T& data,int root) 00167 { 00168 int msg_size; // total size of the message 00169 int nb_data; // number of data (i.e. the expected data.size()) 00170 00171 //get size 00172 recv_broadcast(msg_size,root); 00173 00174 //setup message 00175 TML_Packed_Message* msg=new TML_Packed_Message(m_comm,msg_size); 00176 00177 //get data 00178 recv_broadcast_array(msg->buffer(),msg_size,root); 00179 // extract nuber of items 00180 nb_data=msg->pop_int(); 00181 00182 // unpack data 00183 for(int i=0;i<nb_data;i++){ 00184 typename T::value_type tv; 00185 msg->unpack(tv); 00186 data.insert(data.end(),tv); 00187 } 00188 00189 // clean up 00190 delete msg; 00191 } 00192 00200 template <typename T> 00201 void TML_Comm::scatter(const multimap<int,T> data) 00202 { 00203 // put data into buffer 00204 int total_size=data.size(); 00205 T* buffer=new T[total_size]; 00206 int comm_size=size(); 00207 int *size_buffer=new int[comm_size]; 00208 int *offs_buffer=new int[comm_size]; 00209 int count=0; 00210 int count_p=0; 00211 for(int i=0;i<comm_size;i++){ 00212 typename multimap<int,T>::const_iterator begin=data.find(i); 00213 typename multimap<int,T>::const_iterator end=data.upper_bound(i); 00214 if(begin!=data.end()){ 00215 for(typename multimap<int,T>::const_iterator iter=begin; 00216 iter!=end; 00217 iter++){ 00218 buffer[count]=iter->second; 00219 count++; 00220 } 00221 } 00222 size_buffer[i]=count-count_p; 00223 count_p=count; 00224 } 00225 // send size info 00226 int dummy; 00227 MPI_Scatter(size_buffer,1,MPI_INT,&dummy,1,MPI_INT,rank(),m_comm); 00228 // construct offsets from sizes 00229 offs_buffer[0]=0; 00230 for(int i=1;i<comm_size;i++){ 00231 offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1]; 00232 } 00233 // send data 00234 T dummy2; 00235 MPI_Scatterv(buffer,size_buffer,offs_buffer,GetType(*buffer),&dummy2,0,GetType(*buffer),rank(),m_comm); 00236 // clean up 00237 delete [] size_buffer; 00238 delete [] offs_buffer; 00239 delete [] buffer; 00240 } 00241 00248 template <typename T> 00249 void TML_Comm::recv_scatter(T& data,int root) 00250 { 00251 // receive size 00252 int size; 00253 MPI_Scatter(NULL,0,MPI_INT,&size,1,MPI_INT,root,m_comm); 00254 // allocate buffer buffer 00255 typename T::value_type *buffer=new typename T::value_type[size]; 00256 // receive data 00257 MPI_Scatterv(NULL,NULL,NULL,MPI_INT,buffer,size,GetType(*buffer),root,m_comm); 00258 // put data in container 00259 for(int i=0;i<size;i++){ 00260 data.insert(data.end(),buffer[i]); 00261 } 00262 // clean up 00263 delete [] buffer; 00264 } 00265 00272 template <typename T> 00273 void TML_Comm::gather(multimap<int,T>& data) 00274 { 00275 int dummy=0; 00276 int comm_size=size(); 00277 int *size_buffer=new int[comm_size]; 00278 int *offs_buffer=new int[comm_size]; 00279 // receive sizes 00280 MPI_Gather(&dummy,1,MPI_INT,size_buffer,1,MPI_INT,rank(),m_comm); 00281 int totalsize=0; 00282 for(int i=0;i<comm_size;i++){ 00283 totalsize+=size_buffer[i]; 00284 } 00285 // setup receive buffer 00286 T *buffer=new T[totalsize]; 00287 // construct offsets from sizes 00288 offs_buffer[0]=0; 00289 for(int i=1;i<comm_size;i++){ 00290 offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1]; 00291 } 00292 // receive data 00293 T dummy2; 00294 MPI_Gatherv(&dummy2,0,GetType(dummy),buffer,size_buffer,offs_buffer,GetType(*buffer),rank(),m_comm); 00295 // put data into multimap 00296 for(int i=0;i<comm_size;i++){ 00297 for(int j=offs_buffer[i];j<offs_buffer[i]+size_buffer[i];j++){ 00298 data.insert(pair<int,T>(i,buffer[j])); 00299 } 00300 } 00301 // clean up 00302 delete [] size_buffer; 00303 delete [] offs_buffer; 00304 delete [] buffer; 00305 } 00306 00314 template <typename T> 00315 void TML_Comm::gather_debug(multimap<int,T>& data) 00316 { 00317 int dummy=0; 00318 int comm_size=size(); 00319 int *size_buffer=new int[comm_size]; 00320 int *offs_buffer=new int[comm_size]; 00321 // receive sizes 00322 MPI_Gather(&dummy,1,MPI_INT,size_buffer,1,MPI_INT,rank(),m_comm); 00323 int totalsize=0; 00324 for(int i=0;i<comm_size;i++){ 00325 console.Debug() << "buffer size " << i << " - " << size_buffer[i] << "\n"; 00326 totalsize+=size_buffer[i]; 00327 } 00328 // setup receive buffer 00329 T *buffer=new T[totalsize]; 00330 // construct offsets from sizes 00331 offs_buffer[0]=0; 00332 for(int i=1;i<comm_size;i++){ 00333 offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1]; 00334 } 00335 // receive data 00336 T dummy2; 00337 MPI_Gatherv(&dummy2,0,GetType(dummy),buffer,size_buffer,offs_buffer,GetType(*buffer),rank(),m_comm); 00338 // put data into multimap 00339 for(int i=0;i<comm_size;i++){ 00340 for(int j=offs_buffer[i];j<offs_buffer[i]+size_buffer[i];j++){ 00341 data.insert(pair<int,T>(i,buffer[j])); 00342 } 00343 } 00344 // clean up 00345 delete [] size_buffer; 00346 delete [] offs_buffer; 00347 delete [] buffer; 00348 } 00349 00356 template <typename T> 00357 void TML_Comm::send_gather(T& data,int root) 00358 { 00359 // send size 00360 int size=data.size(); 00361 MPI_Gather(&size,1,MPI_INT,NULL,0,MPI_INT,root,m_comm); 00362 // setup send buffer 00363 typename T::value_type *buffer=new typename T::value_type[size]; 00364 // put data into send buffer 00365 int count=0; 00366 for(typename T::const_iterator iter=data.begin(); 00367 iter!=data.end(); 00368 iter++){ 00369 buffer[count]=*iter; 00370 count++; 00371 } 00372 // send data 00373 MPI_Gatherv(buffer,size,GetType(*buffer),NULL,NULL,NULL,MPI_INT,root,m_comm); 00374 // clean up 00375 delete [] buffer; 00376 } 00377 00378 00385 template <typename T> 00386 void TML_Comm::send_gather_debug(T& data,int root) 00387 { 00388 // send size 00389 int size=data.size(); 00390 console.Debug() << "send size :" << size << "\n"; 00391 MPI_Gather(&size,1,MPI_INT,NULL,0,MPI_INT,root,m_comm); 00392 // setup send buffer 00393 typename T::value_type *buffer=new typename T::value_type[size]; 00394 // put data into send buffer 00395 int count=0; 00396 for(typename T::const_iterator iter=data.begin(); 00397 iter!=data.end(); 00398 iter++){ 00399 buffer[count]=*iter; 00400 count++; 00401 } 00402 console.Debug() << "send count :" << count << "\n"; 00403 // send data 00404 MPI_Gatherv(buffer,size,GetType(*buffer),NULL,NULL,NULL,MPI_INT,root,m_comm); 00405 // clean up 00406 delete [] buffer; 00407 } 00408 00409 00417 template <typename T> 00418 void TML_Comm::scatter_packed(const multimap<int,T> data) 00419 { 00420 int comm_size=size(); 00421 // construct new packed multimessage 00422 TML_PackedMultiMessage *msg=new TML_PackedMultiMessage(m_comm); 00423 // pack data 00424 for(int i=1;i<comm_size;i++){ 00425 typename multimap<int,T>::const_iterator begin=data.find(i); 00426 typename multimap<int,T>::const_iterator end=data.upper_bound(i); 00427 (*msg)[i].pack(int(data.count(i))); 00428 if(begin!=data.end()){ 00429 for(typename multimap<int,T>::const_iterator iter=begin; 00430 iter!=end; 00431 iter++){ 00432 (*msg)[i].pack(iter->second); 00433 } 00434 } 00435 } 00436 // send size 00437 int dummy; 00438 MPI_Scatter(msg->sizes(),1,MPI_INT,&dummy,1,MPI_INT,rank(),m_comm); 00439 // send data 00440 T dummy2; 00441 MPI_Datatype data_type=GetType(*(msg->buffer())); 00442 MPI_Scatterv(msg->buffer(),msg->sizes(),msg->offsets(),data_type, 00443 &dummy2,0,data_type,rank(),m_comm); 00444 // clean up 00445 delete msg; 00446 } 00447 00454 template <typename T> 00455 void TML_Comm::recv_scatter_packed(T& data,int root) 00456 { 00457 // receive size 00458 int msg_size; 00459 MPI_Scatter(NULL,0,MPI_INT,&msg_size,1,MPI_INT,root,m_comm); 00460 // construct packed message of sufficient size 00461 TML_Packed_Message* msg=new TML_Packed_Message(m_comm,msg_size); 00462 00463 // recceive data 00464 MPI_Datatype data_type=GetType(*(msg->buffer())); 00465 MPI_Scatterv(NULL,NULL,NULL,MPI_INT,msg->buffer(),msg_size,data_type,root,m_comm); 00466 // extract number of items 00467 msg->begin_unpack(); 00468 int nitems=msg->pop_int(); 00469 // unpack data 00470 typename T::value_type item; 00471 for(int i=0;i<nitems;i++){ 00472 msg->unpack(item); 00473 data.insert(data.end(),item); 00474 } 00475 // clean up 00476 delete msg; 00477 } 00478 00479 template <typename T> 00480 void TML_Comm::gather_packed(multimap<int,T> &data) 00481 { 00482 console.Debug() << "TML_Comm::gather_packed: enter\n"; 00483 int dummy=0; 00484 int comm_size=size(); 00485 int *size_buffer=new int[comm_size]; 00486 int *offs_buffer=new int[comm_size]; 00487 // receive sizes 00488 console.Debug() 00489 << "TML_Comm::gather_packed: gathering sizes\n"; 00490 MPI_Gather(&dummy,1,MPI_INT,size_buffer,1,MPI_INT,rank(),m_comm); 00491 int totalsize=0; 00492 for(int i=0;i<comm_size;i++){ 00493 //console.Debug() 00494 // << "TML_Comm::gather_packed:" 00495 // << " size from rank " << i << " = " << size_buffer[i] << "\n"; 00496 totalsize+=size_buffer[i]; 00497 } 00498 console.Debug() 00499 << "TML_Comm::gather_packed: total msg size = " << totalsize << "\n"; 00500 // setup receive buffer 00501 //setup message 00502 TML_Packed_Message* msg=new TML_Packed_Message(m_comm,totalsize); 00503 00504 // construct offsets from sizes 00505 offs_buffer[0]=0; 00506 for(int i=1;i<comm_size;i++){ 00507 offs_buffer[i]=offs_buffer[i-1]+size_buffer[i-1]; 00508 } 00509 // receive data 00510 T dummy2; 00511 console.Debug() 00512 << "TML_Comm::gather_packed: gathering data\n"; 00513 MPI_Gatherv( 00514 &dummy2,0, GetType(dummy), msg->buffer(), size_buffer, offs_buffer, 00515 GetType(*(msg->buffer())),rank(),m_comm 00516 ); 00517 // put data into multimap 00518 console.Debug() 00519 << "TML_Comm::gather_packed: unpacking into multi-map\n"; 00520 for(int i=0;i<comm_size;i++){ 00521 if (size_buffer[i] > 0) 00522 { 00523 const int numElems = msg->pop_int(); 00524 for(int j=0; j < numElems; j++){ 00525 //console.Debug() 00526 // << "TML_Comm::gather_packed:" 00527 // << " unpacking object (" << i << "," << j << ")\n"; 00528 T t; 00529 msg->unpack(t); 00530 data.insert(pair<int,T>(i, t)); 00531 } 00532 } 00533 } 00534 console.Debug() << "TML_Comm::gather_packed: done unpacking into multi-map\n"; 00535 // clean up 00536 delete [] size_buffer; 00537 delete [] offs_buffer; 00538 delete msg; 00539 console.Debug() << "TML_Comm::gather_packed: exit\n"; 00540 } 00541 00542 template <typename T> 00543 void TML_Comm::send_gather_packed(const T &data, int root) 00544 { 00545 console.Debug() << "TML_Comm::send_gather_packed: enter\n"; 00546 // setup send buffer 00547 TML_Packed_Message* msg = 00548 new TML_Packed_Message( 00549 m_comm, 00550 std::max(static_cast<size_t>(64), data.size()*sizeof(typename T::value_type)) 00551 ); 00552 // put data into send buffer 00553 const int numElems = data.size(); 00554 msg->pack(numElems); 00555 for(typename T::const_iterator iter=data.begin(); 00556 iter!=data.end(); 00557 iter++){ 00558 msg->pack(*iter); 00559 } 00560 00561 // send size 00562 int size = msg->size(); 00563 console.Debug() << "TML_Comm::send_gather_packed: sending data size...\n"; 00564 MPI_Gather(&size,1,MPI_INT,NULL,0,MPI_INT,root,m_comm); 00565 00566 // send data 00567 console.Debug() << "TML_Comm::send_gather_packed: sending data...\n"; 00568 MPI_Gatherv( 00569 msg->buffer(), msg->size(), 00570 GetType(*(msg->buffer())), 00571 NULL, NULL, NULL, MPI_INT, root, m_comm 00572 ); 00573 // clean up 00574 delete msg; 00575 console.Debug() << "TML_Comm::send_gather_packed: exit\n"; 00576 } 00577 00578 template <typename T> 00579 T TML_Comm::sum_all(const T& data) 00580 { 00581 T res; 00582 00583 MPI_Datatype data_type=GetType(data); 00584 MPI_Allreduce((void*)(&data),(void*)(&res),1,data_type,MPI_SUM,m_comm); 00585 00586 return res; 00587 }