Skip to content
Snippets Groups Projects

added code from the mpi14 library for parallel mpi communication with a nice wrapper interface

Merged Praetorius, Simon requested to merge feature/mpi_wrapper_library into master
2 files
+ 8
0
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 261
0
#pragma once
#if HAVE_MPI
#include <array>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <string>
#include <type_traits>
#include <vector>
#include <mpi.h>
#include <amdis/Environment.hpp>
#include <amdis/common/ConceptsBase.hpp>
#include <amdis/common/TypeTraits.hpp>
#include <amdis/common/parallel/Request.hpp>
#include <amdis/common/parallel/RecvDynamicSize.hpp>
#include <amdis/common/parallel/MpiTraits.hpp>
namespace AMDiS {
namespace Concepts {
template <class T>
constexpr bool RawPointer = std::is_pointer<std::decay_t<T>>::value;
}
namespace Mpi
{
struct Tag
{
int value = 0;
};
class Communicator
{
public:
/// Constructor, stores an MPI communicator, e.g. MPI_COMM_WORLD
Communicator(MPI_Comm comm = Environment::comm())
: comm_(comm)
{
MPI_Comm_size(comm_, &size_);
MPI_Comm_rank(comm_, &rank_);
}
public:
operator MPI_Comm() const { return comm_; }
int size() const { return size_; }
int rank() const { return rank_; }
public:
// send mpi datatype
template <class Data, REQUIRES(not Concepts::RawPointer<Data>)>
void send(Data const& data, int to, Tag tag = {}) const;
// send array of mpi datatypes
template <class T>
void send(T const* data, std::size_t size, int to, Tag tag = {}) const;
template <class T, std::size_t N>
void send(T const (&data)[N], int to, Tag tag = {}) const
{
send(&data[0], N, to, tag);
}
template <class T, std::size_t N>
void send(std::array<T,N> const& array, int to, Tag tag = {}) const
{
send(array.data(), N, to, tag);
}
template <class T>
void send(std::vector<T> const& vec, int to, Tag tag = {}) const;
void send(std::string const& str, int to, Tag tag = {}) const
{
MPI_Send(to_void_ptr(str.data()), int(str.size()), MPI_CHAR, to, tag.value, comm_);
}
// -------------------------------------------------------------------------------------
// send mpi datatype (non-blocking)
template <class Data, REQUIRES(not Concepts::RawPointer<Data>)>
Request isend(Data const& data, int to, Tag tag = {}) const;
// send array of mpi datatypes (non-blocking)
template <class Data>
Request isend(Data const* data, std::size_t size, int to, Tag tag = {}) const;
template <class T, std::size_t N>
Request isend(T const (&data)[N], int to, Tag tag = {}) const
{
return isend(&data[0], N, to, tag);
}
template <class T, std::size_t N>
Request isend(std::array<T,N> const& array, int to, Tag tag = {}) const
{
return isend(array.data(), N, to, tag);
}
template <class T>
Request isend(std::vector<T> const& vec, int to, Tag tag = {}) const;
Request isend(std::string const& str, int to, Tag tag = {}) const
{
MPI_Request request;
MPI_Isend(to_void_ptr(str.data()), int(str.size()), MPI_CHAR, to, tag.value, comm_, &request);
return {request};
}
// -------------------------------------------------------------------------------------
// receive mpi datatype
template <class Data, REQUIRES(not Concepts::RawPointer<Data>)>
MPI_Status recv(Data& data, int from, Tag tag = {}) const;
// receive array of mpi datatypes
template <class T>
MPI_Status recv(T* data, std::size_t size, int from, Tag tag = {}) const;
template <class T, std::size_t N>
MPI_Status recv(T (&data)[N], int from, Tag tag = {}) const
{
return recv(data, N, from, tag);
}
template <class T, std::size_t N>
MPI_Status recv(std::array<T,N>& data, int from, Tag tag = {}) const
{
return recv(data.data(), N, from, tag);
}
template <class T>
MPI_Status recv(std::vector<T>& data, int from, Tag tag = {}) const;
MPI_Status recv(std::string& str, int from, Tag tag = {}) const
{
MPI_Status status;
MPI_Probe(from, tag.value, comm_, &status);
int size = 0;
MPI_Get_count(&status, MPI_CHAR, &size);
str.resize(size);
MPI_Recv(&str[0], size, MPI_CHAR, from, tag.value, comm_, MPI_STATUS_IGNORE);
return status;
}
// -------------------------------------------------------------------------------------
// receive mpi datatype
template <class Data, REQUIRES(not Concepts::RawPointer<Data>)>
Request irecv(Data& data, int from, Tag tag = {}) const;
// receive array of mpi datatypes
template <class Data>
Request irecv(Data* data, std::size_t size, int from, Tag tag = {}) const;
template <class T, std::size_t N>
Request irecv(T (&data)[N], int from, Tag tag = {}) const
{
return irecv(&data[0], N, from, tag);
}
template <class T, std::size_t N>
Request irecv(std::array<T,N>& data, int from, Tag tag = {}) const
{
return irecv(data.data(), N, from, tag);
}
template <class Receiver>
std::enable_if_t< Concepts::Callable<Receiver,MPI_Status>, Request>
irecv(Receiver&& recv, int from, Tag tag = {}) const
{
return Request{ RecvDynamicSize(from, tag.value, comm_, std::forward<Receiver>(recv)) };
}
// receive vector of mpi datatypes
// 1. until message received, call MPI_Iprobe to retrieve status and size of message
// 2. resize data-vector
// 3. receive data into vector
template <class T>
Request irecv(std::vector<T>& vec, int from, Tag tag = {}) const;
Request irecv(std::string& str, int from, Tag tag = {}) const
{
return Request{RecvDynamicSize(from,tag.value,comm_,
[comm=comm_,&str](MPI_Status status) -> MPI_Request
{
int size = 0;
MPI_Get_count(&status, MPI_CHAR, &size);
str.resize(size);
MPI_Request req;
MPI_Irecv(&str[0], size, MPI_CHAR, status.MPI_SOURCE, status.MPI_TAG, comm, &req);
return req;
}) };
}
protected:
// free unused buffers
void check_buffers() const
{
using Buffers = std::decay_t<decltype(buffers_)>;
std::list<typename Buffers::iterator> remove;
for (auto it = buffers_.begin(); it != buffers_.end(); ++it) {
int flag;
MPI_Request_get_status(it->first, &flag, MPI_STATUS_IGNORE);
if (flag != 0)
remove.push_back(it);
}
for (auto it : remove)
buffers_.erase(it);
}
std::pair<MPI_Request, std::string>& make_buffer(MPI_Status status, std::size_t len) const
{
auto it = buffers_.emplace(buffers_.end(), MPI_Request{}, std::string(len,' '));
buffers_iterators_[{status.MPI_SOURCE, status.MPI_TAG}] = it;
return buffers_.back();
}
std::pair<MPI_Request, std::string>& get_buffer(MPI_Status status) const
{
auto it = buffers_iterators_[{status.MPI_SOURCE, status.MPI_TAG}];
return *it;
}
protected:
MPI_Comm comm_;
int rank_ = 0;
int size_ = 1;
using BufferList = std::list< std::pair<MPI_Request, std::string> >;
mutable BufferList buffers_;
using BufferIter = BufferList::iterator;
mutable std::map<std::pair<int,int>, BufferIter> buffers_iterators_;
};
}} // end namespace AMDiS::Mpi
#endif // HAVE_MPI
#include <amdis/common/parallel/Communicator.inc.hpp>
Loading