From 7f1d4e70f99d4b399755cfd567cfcc6e8a4cf4af Mon Sep 17 00:00:00 2001 From: "Praetorius, Simon" <simon.praetorius@tu-dresden.de> Date: Wed, 28 Aug 2019 17:42:18 +0200 Subject: [PATCH] added code from the mpi14 library for parallel mpi communication with a nice wrapper interface --- src/amdis/common/CMakeLists.txt | 2 + src/amdis/common/parallel/CMakeLists.txt | 10 + src/amdis/common/parallel/Collective.hpp | 93 ++++++ src/amdis/common/parallel/Communicator.hpp | 261 ++++++++++++++++ .../common/parallel/Communicator.inc.hpp | 149 ++++++++++ src/amdis/common/parallel/MpiTraits.hpp | 97 ++++++ src/amdis/common/parallel/RecvDynamicSize.hpp | 109 +++++++ src/amdis/common/parallel/Request.hpp | 125 ++++++++ src/amdis/common/parallel/RequestChain.hpp | 49 +++ .../common/parallel/RequestOperations.hpp | 117 ++++++++ test/CMakeLists.txt | 6 + test/MpiWrapperTest.cpp | 281 ++++++++++++++++++ 12 files changed, 1299 insertions(+) create mode 100644 src/amdis/common/parallel/CMakeLists.txt create mode 100644 src/amdis/common/parallel/Collective.hpp create mode 100644 src/amdis/common/parallel/Communicator.hpp create mode 100644 src/amdis/common/parallel/Communicator.inc.hpp create mode 100644 src/amdis/common/parallel/MpiTraits.hpp create mode 100644 src/amdis/common/parallel/RecvDynamicSize.hpp create mode 100644 src/amdis/common/parallel/Request.hpp create mode 100644 src/amdis/common/parallel/RequestChain.hpp create mode 100644 src/amdis/common/parallel/RequestOperations.hpp create mode 100644 test/MpiWrapperTest.cpp diff --git a/src/amdis/common/CMakeLists.txt b/src/amdis/common/CMakeLists.txt index 68394a7e..8f5c7dd3 100644 --- a/src/amdis/common/CMakeLists.txt +++ b/src/amdis/common/CMakeLists.txt @@ -34,3 +34,5 @@ install(FILES TypeTraits.hpp ValueCategory.hpp DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/amdis/common) + +add_subdirectory(parallel) \ No newline at end of file diff --git a/src/amdis/common/parallel/CMakeLists.txt b/src/amdis/common/parallel/CMakeLists.txt new file mode 100644 index 00000000..46ccaced --- /dev/null +++ b/src/amdis/common/parallel/CMakeLists.txt @@ -0,0 +1,10 @@ +install(FILES + Collective.hpp + Communicator.hpp + Communicator.inc.hpp + MpiTraits.hpp + RecvDynamicSize.hpp + Request.hpp + RequestChain.hpp + RequestOperations.hpp +DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/amdis/common/parallel) diff --git a/src/amdis/common/parallel/Collective.hpp b/src/amdis/common/parallel/Collective.hpp new file mode 100644 index 00000000..bd9cd20f --- /dev/null +++ b/src/amdis/common/parallel/Collective.hpp @@ -0,0 +1,93 @@ +#pragma once + +#if HAVE_MPI +#include <array> +#include <vector> + +#include <mpi.h> + +#include <amdis/common/parallel/Request.hpp> +#include <amdis/common/parallel/MpiTraits.hpp> + +namespace AMDiS { namespace Mpi +{ + template <class T, class Operation> + void all_reduce(MPI_Comm comm, T const& in, T& out, Operation) + { + MPI_Allreduce(&in, &out, 1, type_to_mpi<T>(), op_to_mpi<Operation>(), comm); + } + + template <class T, class Operation> + void all_reduce(MPI_Comm comm, T& inout, Operation) + { + MPI_Allreduce(MPI_IN_PLACE, &inout, 1, type_to_mpi<T>(), op_to_mpi<Operation>(), comm); + } + + + template <class T> + void all_gather(MPI_Comm comm, T const& in, std::vector<T>& out) + { + int size = 1; + MPI_Comm_size(comm, &size); + out.resize(size); + MPI_Allgather(to_void_ptr(&in), 1, type_to_mpi<T>(), to_void_ptr(out.data()), 1, type_to_mpi<T>(), comm); + } + + template <class T, std::size_t N> + void all_gather(MPI_Comm comm, std::array<T,N> const& in, std::vector<T>& out) + { + int size = 1; + MPI_Comm_size(comm, &size); + out.resize(size * N); + MPI_Allgather(to_void_ptr(in.data()), N, type_to_mpi<T>(), to_void_ptr(out.data()), N, type_to_mpi<T>(), comm); + } + + + template <class T> + std::vector<T> all_gather(MPI_Comm comm, T const& in) + { + int size = 1; + MPI_Comm_size(comm, &size); + std::vector<T> out(size); + MPI_Allgather(to_void_ptr(&in), 1, type_to_mpi<T>(), to_void_ptr(out.data()), 1, type_to_mpi<T>(), comm); + return out; + } + + template <class T, std::size_t N> + std::vector<T> all_gather(MPI_Comm comm, std::array<T,N> const& in) + { + int size = 1; + MPI_Comm_size(comm, &size); + std::vector<T> out(size * N); + MPI_Allgather(to_void_ptr(in.data()), N, type_to_mpi<T>(), to_void_ptr(out.data()), N, type_to_mpi<T>(), comm); + return out; + } + + + template <class T> + Request iall_gather(MPI_Comm comm, T const& in, std::vector<T>& out) + { + int size = 1; + MPI_Comm_size(comm, &size); + out.resize(size); + MPI_Request request; + MPI_Iallgather(to_void_ptr(&in), 1, type_to_mpi<T>(), to_void_ptr(out.data()), 1, type_to_mpi<T>(), comm, &request); + + return {request}; + } + + template <class T, std::size_t N> + Request iall_gather(MPI_Comm comm, std::array<T,N> const& in, std::vector<T>& out) + { + int size = 1; + MPI_Comm_size(comm, &size); + out.resize(size * N); + MPI_Request request; + MPI_Iallgather(to_void_ptr(in.data()), N, type_to_mpi<T>(), to_void_ptr(out.data()), N, type_to_mpi<T>(), comm, &request); + + return {request}; + } + +}} // end namespace AMDiS::Mpi + +#endif // HAVE_MPI diff --git a/src/amdis/common/parallel/Communicator.hpp b/src/amdis/common/parallel/Communicator.hpp new file mode 100644 index 00000000..67e72c4b --- /dev/null +++ b/src/amdis/common/parallel/Communicator.hpp @@ -0,0 +1,261 @@ +#pragma once + +#if HAVE_MPI +#include <array> +#include <iostream> +#include <list> +#include <map> +#include <memory> +#include <string> +#include <type_traits> +#include <vector> + +#include <mpi.h> + +#include <amdis/Environment.hpp> +#include <amdis/common/ConceptsBase.hpp> +#include <amdis/common/TypeTraits.hpp> +#include <amdis/common/parallel/Request.hpp> +#include <amdis/common/parallel/RecvDynamicSize.hpp> +#include <amdis/common/parallel/MpiTraits.hpp> + +namespace AMDiS { +namespace Concepts { + template <class T> + constexpr bool RawPointer = std::is_pointer<std::decay_t<T>>::value; +} + +namespace Mpi +{ + struct Tag + { + int value = 0; + }; + + class Communicator + { + public: + + /// Constructor, stores an MPI communicator, e.g. MPI_COMM_WORLD + Communicator(MPI_Comm comm = Environment::comm()) + : comm_(comm) + { + MPI_Comm_size(comm_, &size_); + MPI_Comm_rank(comm_, &rank_); + } + + + public: + + operator MPI_Comm() const { return comm_; } + + int size() const { return size_; } + int rank() const { return rank_; } + + public: + + // send mpi datatype + template <class Data, REQUIRES(not Concepts::RawPointer<Data>)> + void send(Data const& data, int to, Tag tag = {}) const; + + // send array of mpi datatypes + template <class T> + void send(T const* data, std::size_t size, int to, Tag tag = {}) const; + + template <class T, std::size_t N> + void send(T const (&data)[N], int to, Tag tag = {}) const + { + send(&data[0], N, to, tag); + } + + template <class T, std::size_t N> + void send(std::array<T,N> const& array, int to, Tag tag = {}) const + { + send(array.data(), N, to, tag); + } + + template <class T> + void send(std::vector<T> const& vec, int to, Tag tag = {}) const; + + void send(std::string const& str, int to, Tag tag = {}) const + { + MPI_Send(to_void_ptr(str.data()), int(str.size()), MPI_CHAR, to, tag.value, comm_); + } + + + // ------------------------------------------------------------------------------------- + + + // send mpi datatype (non-blocking) + template <class Data, REQUIRES(not Concepts::RawPointer<Data>)> + Request isend(Data const& data, int to, Tag tag = {}) const; + + // send array of mpi datatypes (non-blocking) + template <class Data> + Request isend(Data const* data, std::size_t size, int to, Tag tag = {}) const; + + template <class T, std::size_t N> + Request isend(T const (&data)[N], int to, Tag tag = {}) const + { + return isend(&data[0], N, to, tag); + } + + template <class T, std::size_t N> + Request isend(std::array<T,N> const& array, int to, Tag tag = {}) const + { + return isend(array.data(), N, to, tag); + } + + template <class T> + Request isend(std::vector<T> const& vec, int to, Tag tag = {}) const; + + Request isend(std::string const& str, int to, Tag tag = {}) const + { + MPI_Request request; + MPI_Isend(to_void_ptr(str.data()), int(str.size()), MPI_CHAR, to, tag.value, comm_, &request); + return {request}; + } + + // ------------------------------------------------------------------------------------- + + // receive mpi datatype + template <class Data, REQUIRES(not Concepts::RawPointer<Data>)> + MPI_Status recv(Data& data, int from, Tag tag = {}) const; + + // receive array of mpi datatypes + template <class T> + MPI_Status recv(T* data, std::size_t size, int from, Tag tag = {}) const; + + template <class T, std::size_t N> + MPI_Status recv(T (&data)[N], int from, Tag tag = {}) const + { + return recv(data, N, from, tag); + } + + template <class T, std::size_t N> + MPI_Status recv(std::array<T,N>& data, int from, Tag tag = {}) const + { + return recv(data.data(), N, from, tag); + } + + template <class T> + MPI_Status recv(std::vector<T>& data, int from, Tag tag = {}) const; + + MPI_Status recv(std::string& str, int from, Tag tag = {}) const + { + MPI_Status status; + MPI_Probe(from, tag.value, comm_, &status); + + int size = 0; + MPI_Get_count(&status, MPI_CHAR, &size); + + str.resize(size); + MPI_Recv(&str[0], size, MPI_CHAR, from, tag.value, comm_, MPI_STATUS_IGNORE); + return status; + } + + // ------------------------------------------------------------------------------------- + + // receive mpi datatype + template <class Data, REQUIRES(not Concepts::RawPointer<Data>)> + Request irecv(Data& data, int from, Tag tag = {}) const; + + // receive array of mpi datatypes + template <class Data> + Request irecv(Data* data, std::size_t size, int from, Tag tag = {}) const; + + template <class T, std::size_t N> + Request irecv(T (&data)[N], int from, Tag tag = {}) const + { + return irecv(&data[0], N, from, tag); + } + + template <class T, std::size_t N> + Request irecv(std::array<T,N>& data, int from, Tag tag = {}) const + { + return irecv(data.data(), N, from, tag); + } + + template <class Receiver> + std::enable_if_t< Concepts::Callable<Receiver,MPI_Status>, Request> + irecv(Receiver&& recv, int from, Tag tag = {}) const + { + return Request{ RecvDynamicSize(from, tag.value, comm_, std::forward<Receiver>(recv)) }; + } + + // receive vector of mpi datatypes + // 1. until message received, call MPI_Iprobe to retrieve status and size of message + // 2. resize data-vector + // 3. receive data into vector + template <class T> + Request irecv(std::vector<T>& vec, int from, Tag tag = {}) const; + + Request irecv(std::string& str, int from, Tag tag = {}) const + { + return Request{RecvDynamicSize(from,tag.value,comm_, + [comm=comm_,&str](MPI_Status status) -> MPI_Request + { + int size = 0; + MPI_Get_count(&status, MPI_CHAR, &size); + + str.resize(size); + MPI_Request req; + MPI_Irecv(&str[0], size, MPI_CHAR, status.MPI_SOURCE, status.MPI_TAG, comm, &req); + + return req; + }) }; + } + + + protected: + + // free unused buffers + void check_buffers() const + { + using Buffers = std::decay_t<decltype(buffers_)>; + + std::list<typename Buffers::iterator> remove; + for (auto it = buffers_.begin(); it != buffers_.end(); ++it) { + int flag; + MPI_Request_get_status(it->first, &flag, MPI_STATUS_IGNORE); + if (flag != 0) + remove.push_back(it); + } + + for (auto it : remove) + buffers_.erase(it); + } + + + std::pair<MPI_Request, std::string>& make_buffer(MPI_Status status, std::size_t len) const + { + auto it = buffers_.emplace(buffers_.end(), MPI_Request{}, std::string(len,' ')); + buffers_iterators_[{status.MPI_SOURCE, status.MPI_TAG}] = it; + return buffers_.back(); + } + + std::pair<MPI_Request, std::string>& get_buffer(MPI_Status status) const + { + auto it = buffers_iterators_[{status.MPI_SOURCE, status.MPI_TAG}]; + return *it; + } + + protected: + + MPI_Comm comm_; + + int rank_ = 0; + int size_ = 1; + + using BufferList = std::list< std::pair<MPI_Request, std::string> >; + mutable BufferList buffers_; + + using BufferIter = BufferList::iterator; + mutable std::map<std::pair<int,int>, BufferIter> buffers_iterators_; + }; + +}} // end namespace AMDiS::Mpi + +#endif // HAVE_MPI + +#include <amdis/common/parallel/Communicator.inc.hpp> diff --git a/src/amdis/common/parallel/Communicator.inc.hpp b/src/amdis/common/parallel/Communicator.inc.hpp new file mode 100644 index 00000000..8d9ae94f --- /dev/null +++ b/src/amdis/common/parallel/Communicator.inc.hpp @@ -0,0 +1,149 @@ +#pragma once + +#if HAVE_MPI + +namespace AMDiS { namespace Mpi { + +// send mpi datatype +template <class Data, REQUIRES_(not Concepts::RawPointer<Data>)> +void Communicator::send(Data const& data, int to, Tag tag) const +{ + MPI_Send(to_void_ptr(&data), 1, type_to_mpi<Data>(), to, tag.value, comm_); +} + + +// send array of mpi datatypes +template <class Data> +void Communicator::send(Data const* data, std::size_t size, int to, Tag tag) const +{ + MPI_Send(to_void_ptr(data), int(size), type_to_mpi<Data>(), to, tag.value, comm_); +} + + +template <class T> +void Communicator::send(std::vector<T> const& vec, int to, Tag tag) const +{ + MPI_Send(to_void_ptr(vec.data()), int(vec.size()), type_to_mpi<T>(), to, tag.value, comm_); +} + + +// ------------------------------------------------------------------------------------- + + +// send mpi datatype (non-blocking) +template <class Data, REQUIRES_(not Concepts::RawPointer<Data>)> +Request Communicator::isend(Data const& data, int to, Tag tag) const +{ + MPI_Request request; + MPI_Isend(to_void_ptr(&data), 1, type_to_mpi<Data>(), to, tag.value, comm_, &request); + return {request}; +} + + +// send array of mpi datatypes (non-blocking) +template <class Data> +Request Communicator::isend(Data const* data, std::size_t size, int to, Tag tag) const +{ + MPI_Request request; + MPI_Isend(to_void_ptr(data), size, type_to_mpi<Data>(), to, tag.value, comm_, &request); + return {request}; +} + + +template <class T> +Request Communicator::isend(std::vector<T> const& vec, int to, Tag tag) const +{ + MPI_Request request; + MPI_Isend(to_void_ptr(vec.data()), int(vec.size()), type_to_mpi<T>(), to, tag.value, comm_, &request); + return {request}; +} + +// ------------------------------------------------------------------------------------- + +// receive mpi datatype +template <class Data, REQUIRES_(not Concepts::RawPointer<Data>)> +MPI_Status Communicator::recv(Data& data, int from, Tag tag) const +{ + MPI_Status status; + MPI_Recv(&data, 1, type_to_mpi<Data>(), from, tag.value, comm_, &status); + return status; +} + + +// receive array of mpi datatypes +template <class Data> +MPI_Status Communicator::recv(Data* data, std::size_t size, int from, Tag tag) const +{ + MPI_Status status; + MPI_Recv(data, size, type_to_mpi<Data>(), from, tag.value, comm_, &status); + return status; +} + + +// receive array of mpi datatypes +template <class T> +MPI_Status Communicator::recv(std::vector<T>& vec, int from, Tag tag) const +{ + MPI_Status status; + MPI_Probe(from, tag.value, comm_, &status); + + int size = 0; + MPI_Get_count(&status, type_to_mpi<T>(), &size); + int min_size = std::max(size,1); + + vec.resize(min_size); + MPI_Recv(vec.data(), min_size, type_to_mpi<T>(), from, tag.value, comm_, MPI_STATUS_IGNORE); + if (size != min_size) + vec.resize(size); + return status; +} + + +// ------------------------------------------------------------------------------------- + +// receive mpi datatype +template <class Data, REQUIRES_(not Concepts::RawPointer<Data>)> +Request Communicator::irecv(Data& data, int from, Tag tag) const +{ + MPI_Request request; + MPI_Irecv(&data, 1, type_to_mpi<Data>(), from, tag.value, comm_, &request); + return {request}; +} + + +// receive array of mpi datatypes +template <class Data> +Request Communicator::irecv(Data* data, std::size_t size, int from, Tag tag) const +{ + MPI_Request request; + MPI_Irecv(data, size, type_to_mpi<Data>(), from, tag.value, comm_, &request); + return {request}; +} + + +template <class T> +Request Communicator::irecv(std::vector<T>& vec, int from, Tag tag) const +{ + return Request{ RecvDynamicSize(from,tag.value,comm_, + [comm=comm_,&vec](MPI_Status status) -> MPI_Request + { + int size = 0; + MPI_Get_count(&status, type_to_mpi<T>(), &size); + int min_size = std::max(size,1); + + vec.resize(min_size); + MPI_Request req; + MPI_Irecv(vec.data(), min_size, type_to_mpi<T>(), status.MPI_SOURCE, status.MPI_TAG, comm, &req); + return req; + }, + [&vec](MPI_Status status) + { + int size = 0; + MPI_Get_count(&status, type_to_mpi<T>(), &size); + vec.resize(size); + }) }; +} + +}} // end namespace AMDiS::Mpi + +#endif // HAVE_MPI diff --git a/src/amdis/common/parallel/MpiTraits.hpp b/src/amdis/common/parallel/MpiTraits.hpp new file mode 100644 index 00000000..aedfa490 --- /dev/null +++ b/src/amdis/common/parallel/MpiTraits.hpp @@ -0,0 +1,97 @@ +#pragma once + +#if HAVE_MPI + +#include <functional> +#include <type_traits> + +#include <mpi.h> + +#include <dune/common/parallel/mpitraits.hh> + +namespace AMDiS { + +// forward declarations +namespace Operation { + struct Plus; + struct Multiplies; + struct Max; + struct Min; +} + +namespace Mpi +{ + namespace Impl2 + { + template <class T> + struct is_mpi_type : std::false_type {}; + + template <> struct is_mpi_type<char> : std::true_type {}; + template <> struct is_mpi_type<short> : std::true_type {}; + template <> struct is_mpi_type<int> : std::true_type {}; + template <> struct is_mpi_type<long> : std::true_type {}; + template <> struct is_mpi_type<long long> : std::true_type {}; + template <> struct is_mpi_type<signed char> : std::true_type {}; + template <> struct is_mpi_type<unsigned char> : std::true_type {}; + template <> struct is_mpi_type<unsigned short> : std::true_type {}; + template <> struct is_mpi_type<unsigned int> : std::true_type {}; + template <> struct is_mpi_type<unsigned long> : std::true_type {}; + template <> struct is_mpi_type<unsigned long long> : std::true_type {}; + template <> struct is_mpi_type<float> : std::true_type {}; + template <> struct is_mpi_type<double> : std::true_type {}; + template <> struct is_mpi_type<long double> : std::true_type {}; + + template <class T1, class T2> + struct is_mpi_type<std::pair<T1,T2>> : std::integral_constant<bool, + is_mpi_type<T1>::value && + is_mpi_type<T2>::value && + std::is_standard_layout<std::pair<T1,T2>>::value + > + {}; + + } // end namespace Impl2 + + template <class T> + using is_mpi_type = Impl2::is_mpi_type<T>; + + + template <class T> + MPI_Datatype type_to_mpi() { return Dune::MPITraits<T>::getType(); } + + namespace Impl2 + { + template <class T> using always_false = std::false_type; + + template <class Op> + struct op_to_mpi; + + template <> struct op_to_mpi<Operation::Min> { static MPI_Op value() { return MPI_MIN; } }; + template <> struct op_to_mpi<Operation::Max> { static MPI_Op value() { return MPI_MAX; } }; + + template <> struct op_to_mpi<std::plus<>> { static MPI_Op value() { return MPI_SUM; } }; + template <> struct op_to_mpi<Operation::Plus> { static MPI_Op value() { return MPI_SUM; } }; + + template <> struct op_to_mpi<std::multiplies<>> { static MPI_Op value() { return MPI_PROD; } }; + template <> struct op_to_mpi<Operation::Multiplies> { static MPI_Op value() { return MPI_PROD; } }; + + template <> struct op_to_mpi<std::logical_and<>> { static MPI_Op value() { return MPI_LAND; } }; + template <> struct op_to_mpi<std::logical_or<>> { static MPI_Op value() { return MPI_LOR; } }; + template <> struct op_to_mpi<std::bit_and<>> { static MPI_Op value() { return MPI_BAND; } }; + template <> struct op_to_mpi<std::bit_or<>> { static MPI_Op value() { return MPI_BOR; } }; + template <> struct op_to_mpi<std::bit_xor<>> { static MPI_Op value() { return MPI_BXOR; } }; + + } // end namespac Impl2 + + template <class T> + MPI_Op op_to_mpi() { return Impl2::op_to_mpi<T>::value(); } + + + template <class T> + void* to_void_ptr(T* ptr) { return ptr; } + + template <class T> + void* to_void_ptr(T const* ptr) { return const_cast<T*>(ptr); } + +}} // end namespace AMDiS::Mpi + +#endif // HAVE_MPI diff --git a/src/amdis/common/parallel/RecvDynamicSize.hpp b/src/amdis/common/parallel/RecvDynamicSize.hpp new file mode 100644 index 00000000..459c7747 --- /dev/null +++ b/src/amdis/common/parallel/RecvDynamicSize.hpp @@ -0,0 +1,109 @@ +#pragma once + +#if HAVE_MPI +#include <functional> +#include <utility> + +#include <dune/common/std/optional.hh> + +#include <amdis/common/Concepts.hpp> + +namespace AMDiS { namespace Mpi +{ + + /// \brief Test-functor for dynamic size data to be used in \ref Request class. + /** + * Implements the iprob - irecv - test workflow to receive data of unknown size. + * At first, when the header of the data is arrived, the size is fetched from the + * MPI_Status given by MPI_Iprob. This can be done in a user functor `recv_` that + * receives an MPI_Status and returns an MPI_Receive handler. Typically, the recv_ + * functor increases the size of an internal receive-buffer and calls MPI_Irecv. + * When an MPI_Test on the MPI_Receive handler results in a completed communication, + * a finalize user functor `finish_` is called to make some postprocessing on the + * received data. Afterwards, the MPI_Status of the MPI_Test command is returned. + **/ + class RecvDynamicSize + { + enum Progress { + STARTED, + INITIALIZED, + RECEIVING, + FINISHED + }; + + public: + + /// Constructor with user receive-functor `R`. + template <class R, + REQUIRES( Concepts::Callable<R, MPI_Status> )> + RecvDynamicSize(int from, int tag, MPI_Comm comm, R&& r) + : from_(from) + , tag_(tag) + , comm_(comm) + , recv_(std::forward<R>(r)) + , finish_([](MPI_Status){}) + {} + + /// Constructor with user receive-functor `R` and user finilize-functor `F`. + template <class R, class F, + REQUIRES( Concepts::Callable<R, MPI_Status> && Concepts::Callable<F, MPI_Status> )> + RecvDynamicSize(int from, int tag, MPI_Comm comm, R&& r, F&& f) + : from_(from) + , tag_(tag) + , comm_(comm) + , recv_(std::forward<R>(r)) + , finish_(std::forward<F>(f)) + {} + + /// Operator called as test function in the \ref Request class. + Dune::Std::optional<MPI_Status> operator()() + { + if (progress_ == STARTED) { + int flag = 0; + // Wait for a message from rank from_ with tag tag_ + MPI_Iprobe(from_, tag_, comm_, &flag, &status_); + + if (flag != 0) + progress_ = INITIALIZED; + } + + if (progress_ == INITIALIZED) { + req_ = recv_(status_); + + progress_ = RECEIVING; + } + + if (progress_ == RECEIVING) { + int flag = 0; + MPI_Test(&req_, &flag, &status_); + + if (flag != 0) + progress_ = FINISHED; + } + + if (progress_ == FINISHED) { + finish_(status_); + + return status_; + } else + return {}; + } + + + private: + + int from_ = 0; //< source rank + int tag_ = 0; //< communication tag + MPI_Comm comm_; //< communicator + + std::function<MPI_Request(MPI_Status)> recv_; //< user receive-functor + std::function<void(MPI_Status)> finish_; //< user finalize-functor + + Progress progress_ = STARTED; //< internal progress flag + MPI_Status status_{}; //< the status information, filled by MPI_Iprob and MPI_Test + MPI_Request req_{}; //< the request handler, filled by the user receive-functor \ref recv_ + }; + +}} // end namespace AMDiS::Mpi + +#endif // HAVE_MPI diff --git a/src/amdis/common/parallel/Request.hpp b/src/amdis/common/parallel/Request.hpp new file mode 100644 index 00000000..7a3677aa --- /dev/null +++ b/src/amdis/common/parallel/Request.hpp @@ -0,0 +1,125 @@ +#pragma once + +#if HAVE_MPI +#include <cassert> +#include <functional> +#include <type_traits> + +#include <mpi.h> + +#include <dune/common/std/optional.hh> +#include <amdis/common/Concepts.hpp> + +namespace AMDiS { namespace Mpi +{ + class Request + { + public: + Request() = default; + + Request(MPI_Request request) + : request_(request) + {} + + // In advanced mode, take a callable that returns an optional<MPI_Status> + // This functor is called in the test() function and wait() isa implemented in terms of test() + template <class F, + REQUIRES( Concepts::Callable<F> ) > + explicit Request(F&& f) + : test_(std::forward<F>(f)) + , advanced_(true) + {} + + + // Returns an MPI_Status object if the operation identified by the request is complete. + // If the request is an active persistent request, it is marked as inactive. Any other type of + // request is deallocated and the request handle is set to MPI_REQUEST_NULL . + Dune::Std::optional<MPI_Status> test() + { + if (advanced_) + return test_(); + + auto s = status(); + if (s && request_ != MPI_REQUEST_NULL) + MPI_Request_free(&request_); + return s; + } + + // Access the information associated with a request, without freeing the request. + Dune::Std::optional<MPI_Status> status() const + { + assert( !advanced_ ); + + if (status_initialized_) + return status_; + + MPI_Status status; + int flag = 0; + MPI_Request_get_status(request_, &flag, &status); + if (flag) + return status; + else + return {}; + } + + // Returns when the operation identified by request is complete. + MPI_Status wait() + { + if (advanced_) { + Dune::Std::optional<MPI_Status> status; + while( !(status = test()) ) ; + return status.value(); + } else { + MPI_Wait(&request_, &status_); + status_initialized_ = true; + return status_; + } + } + + // Returns the underlying MPI_Request handle + MPI_Request get() const + { + assert( !advanced_ ); + return request_; + } + + MPI_Request& get() + { + assert( !advanced_ ); + return request_; + } + + // Deallocate a request object without waiting for the associated communication to complete. + void free() + { + assert( !advanced_ ); + MPI_Request_free(&request_); + } + + void cancel() + { + assert( !advanced_ ); + MPI_Cancel(&request_); + } + + void cancel_and_free() + { + cancel(); + free(); + } + + protected: + + MPI_Request request_ = MPI_REQUEST_NULL; + + MPI_Status status_; // the MPI_Status after the request object is destroyed + bool status_initialized_ = false; + + std::function<Dune::Std::optional<MPI_Status>(void)> test_; + bool advanced_ = false; + }; + +}} // end namespace AMDiS::Mpi + +#endif // HAVE_MPI + diff --git a/src/amdis/common/parallel/RequestChain.hpp b/src/amdis/common/parallel/RequestChain.hpp new file mode 100644 index 00000000..f38d1c24 --- /dev/null +++ b/src/amdis/common/parallel/RequestChain.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include <tuple> +#include <type_traits> + +#include <dune/common/std/optional.hh> + +#include <amdis/common/Index.hpp> + +namespace AMDiS { namespace Mpi +{ + template <class... Reqs> + class RequestChain + { + static constexpr std::size_t N = sizeof...(Reqs); + using Data = typename std::tuple_element_t<N-1, std::tuple<Reqs...>>::DataType; + + RequestChain(Reqs... reqs) + : reqs_{reqs...} + {} + + Dune::Std::optional<Data const*> test() + { + auto op = std::get<0>(reqs_).test(); + return test(op, index_t<1>{}); + } + + template <class Optional, std::size_t i = 0> + Dune::Std::optional<Data const*> test(Optional op, index_t<i> = {}) + { + if (!op) + return {}; + + auto op_next = std::get<i>(reqs_).test(op); + return test(op_next, index_t<i+1>{}); + } + + template <class Optional> + Dune::Std::optional<Data const*> test(Optional&& result, index_t<N>) + { + return std::forward<Optional>(result); + } + + protected: + + std::tuple<Reqs...> reqs_; + }; + +}} // end namespace AMDiS::Mpi diff --git a/src/amdis/common/parallel/RequestOperations.hpp b/src/amdis/common/parallel/RequestOperations.hpp new file mode 100644 index 00000000..6525624a --- /dev/null +++ b/src/amdis/common/parallel/RequestOperations.hpp @@ -0,0 +1,117 @@ +#pragma once + +#include <list> +#include <vector> + +#include <mpi.h> + +namespace AMDiS { namespace Mpi +{ + // Blocks until all communication operations associated with active handles in the range complete. + template <class ReqIter> + void wait_all(ReqIter first, ReqIter last) + { + std::list<ReqIter> remaining; + for (ReqIter it = first; it != last; ++it) remaining.push_back(it); + + while (!remaining.empty()) { + auto remove_it = remaining.end(); + for (auto it = remaining.begin(); it != remaining.end(); ++it) { + if ((*it)->test()) { + remove_it = it; + break; + } + } + + if (remove_it != remaining.end()) + remaining.erase(remove_it); + } + } + + template <class ReqIter, class Apply> + void wait_all_apply(ReqIter first, ReqIter last, Apply apply) + { + std::list<ReqIter> remaining; + for (ReqIter it = first; it != last; ++it) remaining.push_back(it); + + while (!remaining.empty()) { + auto remove_it = remaining.end(); + for (auto it = remaining.begin(); it != remaining.end(); ++it) { + if ((*it)->test()) { + apply(*it); // call a functor with the request iterator + remove_it = it; + break; + } + } + + if (remove_it != remaining.end()) + remaining.erase(remove_it); + } + } + + template <class ReqIter> + void wait_all_weak(ReqIter first, ReqIter last) + { + std::list<ReqIter> remaining; + for (ReqIter it = first; it != last; ++it) remaining.push_back(it); + + while (!remaining.empty()) { + auto remove_it = remaining.end(); + for (auto it = remaining.begin(); it != remaining.end(); ++it) { + if ((*it)->status()) { + remove_it = it; + break; + } + } + + if (remove_it != remaining.end()) + remaining.erase(remove_it); + } + } + + template <class ReqIter> + void wait_all(ReqIter first, ReqIter last, std::vector<MPI_Status>& statuses) + { + statuses.resize(std::distance(first, last)); + + std::list<ReqIter> remaining; + for (ReqIter it = first; it != last; ++it) remaining.push_back(it); + + while (!remaining.empty()) { + auto remove_it = remaining.end(); + for (auto it = remaining.begin(); it != remaining.end(); ++it) { + auto status = (*it)->test(); + if (status) { + remove_it = it; + statuses[std::distance(first,*it)] = *status; + break; + } + } + + if (remove_it != remaining.end()) + remaining.erase(remove_it); + } + } + + + // Tests for completion of either one or none of the operations associated with active handles. + // In the former case, it returns an iterator to the finished handle. + template <class ReqIter> + ReqIter test_any(ReqIter first, ReqIter last) + { + for (auto it = first; it != last; ++it) { + if (it->test()) + return it; + } + return last; + } + + + // Blocks until one of the operations associated with the active requests in the range has completed. + template <class ReqIter> + void wait_any(ReqIter first, ReqIter last) + { + while (test_any(first, last) == last) ; + } + +}} // end namespace AMDiS::Mpi diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6ef4b43b..3d6c4220 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -69,6 +69,12 @@ dune_add_test(SOURCES IntegrateTest.cpp dune_add_test(SOURCES MarkerTest.cpp LINK_LIBRARIES amdis) +dune_add_test(SOURCES MpiWrapperTest.cpp + LINK_LIBRARIES amdis + MPI_RANKS 2 4 + TIMEOUT 300 + CMAKE_GUARD "MPI_FOUND") + dune_add_test(SOURCES MultiTypeVectorTest.cpp LINK_LIBRARIES amdis) diff --git a/test/MpiWrapperTest.cpp b/test/MpiWrapperTest.cpp new file mode 100644 index 00000000..4bf713b7 --- /dev/null +++ b/test/MpiWrapperTest.cpp @@ -0,0 +1,281 @@ +#include <algorithm> +#include <functional> +#include <string> +#include <vector> + +#include <amdis/AMDiS.hpp> +#include <amdis/common/parallel/Collective.hpp> +#include <amdis/common/parallel/Communicator.hpp> +#include <amdis/common/parallel/RequestOperations.hpp> +#include "Tests.hpp" + +namespace AMDiS { namespace Impl { + template <class Op> + void test_allreduce(Op op, int init) + { + int s = Environment::mpiSize(); + int r = Environment::mpiRank()+1; + int result = 0; + + std::vector<int> ranks(s); + std::iota(ranks.begin(), ranks.end(), 1); + + // in, out + Mpi::all_reduce(Environment::comm(), r, result, op); + // inout + Mpi::all_reduce(Environment::comm(), r, op); + + AMDIS_TEST_EQ(r, result); + AMDIS_TEST_EQ(r, std::accumulate(ranks.begin(), ranks.end(), init, op)); + } +}} + +using namespace AMDiS; + + +// --------------------------------------------------------------------------------------- +// Test of collective MPI operations +void test0() +{ + Impl::test_allreduce(Operation::Plus{}, 0); + Impl::test_allreduce(std::plus<>{}, 0); + + Impl::test_allreduce(Operation::Multiplies{}, 1); + Impl::test_allreduce(std::multiplies<>{}, 1); + + Impl::test_allreduce(Operation::Min{}, 99999); + Impl::test_allreduce(Operation::Max{}, -99999); + + int s = Environment::mpiSize(); + int r = Environment::mpiRank(); + std::vector<int> ranks(s, 0); + + Mpi::all_gather(Environment::comm(), r, ranks); + for (int i = 0; i < s; ++i) + AMDIS_TEST_EQ(ranks[i], i); + + ranks.resize(2*s); + std::fill(ranks.begin(), ranks.end(), 0); + Mpi::all_gather(Environment::comm(), std::array<int,2>{r,r}, ranks); + for (int i = 0; i < s; ++i) { + AMDIS_TEST_EQ(ranks[2*i], i); + AMDIS_TEST_EQ(ranks[2*i+1], i); + } + + auto ranks2 = Mpi::all_gather(Environment::comm(), r); + AMDIS_TEST_EQ(ranks2.size(), s); + for (int i = 0; i < s; ++i) + AMDIS_TEST_EQ(ranks2[i], i); + + auto ranks3 = Mpi::all_gather(Environment::comm(), std::array<int,2>{r,r}); + AMDIS_TEST_EQ(ranks3.size(), 2*s); + for (int i = 0; i < s; ++i) { + AMDIS_TEST_EQ(ranks3[2*i], i); + AMDIS_TEST_EQ(ranks3[2*i+1], i); + } +} + +// --------------------------------------------------------------------------------------- +// Test of Point-to-Point communication (blocking) +void test1() +{ + Mpi::Communicator world(Environment::comm()); + + AMDIS_TEST_EQ(world.rank(), Environment::mpiRank()); + AMDIS_TEST_EQ(world.size(), Environment::mpiSize()); + + // direct send/recv + int data = 0; + if (world.rank() == 0) { + data = world.size(); + for (int i = 1; i < world.size(); ++i) + world.send(data, i, Mpi::Tag{10}); + } else { + world.recv(data, 0, Mpi::Tag{10}); + } + AMDIS_TEST_EQ(data, world.size()); + + + // send/recv of pointers + data = 0; + if (world.rank() == 0) { + data = world.size(); + for (int i = 1; i < world.size(); ++i) + world.send(&data, 1u, i, Mpi::Tag{11}); + } else { + world.recv(&data, 1u, 0, Mpi::Tag{11}); + } + AMDIS_TEST_EQ(data, world.size()); + + + // send/recv of c-arrays + int data2[] = {0}; + if (world.rank() == 0) { + data2[0] = world.size(); + for (int i = 1; i < world.size(); ++i) + world.send(data2, i, Mpi::Tag{12}); + } else { + world.recv(data2, 0, Mpi::Tag{12}); + } + AMDIS_TEST_EQ(data2[0], world.size()); + + + // send/recv of std::arrays + std::array<int,1> data3{0}; + if (world.rank() == 0) { + data3[0] = world.size(); + for (int i = 1; i < world.size(); ++i) + world.send(data3, i, Mpi::Tag{13}); + } else { + world.recv(data3, 0, Mpi::Tag{13}); + } + AMDIS_TEST_EQ(data3[0], world.size()); + + + // send/recv of std::vector + std::vector<int> data4; + if (world.rank() == 0) { + data4.push_back(world.size()); + for (int i = 1; i < world.size(); ++i) + world.send(data4, i, Mpi::Tag{14}); + } else { + world.recv(data4, 0, Mpi::Tag{14}); + } + AMDIS_TEST_EQ(data4.size(), 1); + AMDIS_TEST_EQ(data4[0], world.size()); + + + // send/recv of std::string + std::string data5 = ""; + if (world.rank() == 0) { + data5 = "world"; + for (int i = 1; i < world.size(); ++i) + world.send(data5, i, Mpi::Tag{15}); + } else { + world.recv(data5, 0, Mpi::Tag{15}); + } + AMDIS_TEST_EQ(data5, std::string("world")); +} + + +// --------------------------------------------------------------------------------------- +// Test of Point-to-Point communication (non-blocking) +void test2() +{ + Mpi::Communicator world(Environment::comm()); + + // direct send/recv + int data = 0; + if (world.rank() == 0) { + data = world.size(); + std::vector<Mpi::Request> requests; + for (int i = 1; i < world.size(); ++i) + requests.emplace_back( world.isend(data, i, Mpi::Tag{20}) ); + Mpi::wait_all(requests.begin(), requests.end()); + } else { + Mpi::Request request( world.irecv(data, 0, Mpi::Tag{20}) ); + MPI_Status status = request.wait(); + AMDIS_TEST(status.MPI_SOURCE == 0); + AMDIS_TEST(status.MPI_TAG == 20); + } + AMDIS_TEST_EQ(data, world.size()); + + + // send/recv of pointers + data = 0; + if (world.rank() == 0) { + data = world.size(); + std::vector<Mpi::Request> requests; + for (int i = 1; i < world.size(); ++i) + requests.push_back( world.isend(&data, 1u, i, Mpi::Tag{21}) ); + Mpi::wait_all(requests.begin(), requests.end()); + } else { + Mpi::Request request = world.irecv(&data, 1u, 0, Mpi::Tag{21}); + MPI_Status status = request.wait(); + AMDIS_TEST(status.MPI_SOURCE == 0); + AMDIS_TEST(status.MPI_TAG == 21); + } + AMDIS_TEST_EQ(data, world.size()); + +#if 1 + // send/recv of c-arrays + int data2[] = {0}; + if (world.rank() == 0) { + data2[0] = world.size(); + std::vector<Mpi::Request> requests(world.size()-1); + for (int i = 1; i < world.size(); ++i) + requests[i-1] = world.isend(data2, 1, i, Mpi::Tag{22}); + Mpi::wait_all(requests.begin(), requests.end()); + } else { + auto request = world.irecv(data2, 0, Mpi::Tag{22}); + MPI_Status status = request.wait(); + AMDIS_TEST(status.MPI_SOURCE == 0); + AMDIS_TEST(status.MPI_TAG == 22); + } + AMDIS_TEST_EQ(data2[0], world.size()); +#endif + +#if 1 + // send/recv of std::arrays + std::array<int,1> data3{0}; + if (world.rank() == 0) { + data3[0] = world.size(); + std::vector<Mpi::Request> requests; + for (int i = 1; i < world.size(); ++i) + requests.emplace_back( world.isend(data3, i, Mpi::Tag{23}) ); + Mpi::wait_all(requests.begin(), requests.end()); + } else { + Mpi::Request request{ world.irecv(data3, 0, Mpi::Tag{23}) }; + MPI_Status status = request.wait(); + AMDIS_TEST(status.MPI_SOURCE == 0); + AMDIS_TEST(status.MPI_TAG == 23); + } + AMDIS_TEST_EQ(data3[0], world.size()); +#endif + + // send/recv of std::vector + std::vector<int> data4; + if (world.rank() == 0) { + data4.push_back(world.size()); + std::vector<Mpi::Request> requests; + for (int i = 1; i < world.size(); ++i) + requests.emplace_back( world.isend(data4, i, Mpi::Tag{24}) ); + Mpi::test_any(requests.begin(), requests.end()); + Mpi::wait_all(requests.begin(), requests.end()); + } else { + Mpi::Request request{ world.irecv(data4, 0, Mpi::Tag{24}) }; + MPI_Status status = request.wait(); + AMDIS_TEST(status.MPI_SOURCE == 0); + AMDIS_TEST(status.MPI_TAG == 24); + } + AMDIS_TEST_EQ(data4.size(), 1); + AMDIS_TEST_EQ(data4[0], world.size()); + + + // send/recv of std::string + std::string data5 = ""; + if (world.rank() == 0) { + data5 = "world"; + std::vector<Mpi::Request> requests; + for (int i = 1; i < world.size(); ++i) + requests.emplace_back( world.isend(data5, i, Mpi::Tag{25}) ); + Mpi::wait_all(requests.begin(), requests.end()); + } else { + Mpi::Request request{ world.irecv(data5, 0, Mpi::Tag{25}) }; + MPI_Status status = request.wait(); + AMDIS_TEST(status.MPI_SOURCE == 0); + AMDIS_TEST(status.MPI_TAG == 25); + } + AMDIS_TEST_EQ(data5, std::string("world")); +} + + +int main(int argc, char** argv) +{ + Environment env(argc, argv); + test0(); + test1(); + test2(); + + return report_errors(); +} -- GitLab