使用mpmcpipeline和jthread实现软流水

多个Jthread线程,每个线程完成mpmcpipeline中一个stage的处理,实现软件流水。第一个stage收到sentinal value时,所有work线程都退出。

#include <gtest/gtest.h>
#include <folly/MPMCPipeline.h>
#include <vector>
#include <thread>
#include <boost/mp11.hpp>
#include <fmt/format.h>
#include <folly/Function.h>
#include <type_traits>
#include <boost/hana.hpp>
namespace mp11 = boost::mp11;
namespace hana = boost::hana;

template <class T>
struct getNumber {
  constexpr int operator()(int n) const noexcept { return n; }
};
template <class T>
constexpr auto getnumber = getNumber<T>{};
 //n queuesize, T...: sizeof...(T) ie (N+1) queues' type
template <int n, typename... T>
  requires(sizeof...(T) >= 2)
class SoftPipeline
{

  //N+1 queues' sentinal
  boost::hana::tuple<T...> sentinals_;
  //N stage piepeline
  folly::MPMCPipeline<T...> pipeline_;
  //N stage,so N threads
  std::vector<std::jthread> threads_;
  //every stage work function
  template <int N>
  struct StageCall {
    SoftPipeline* softpipeline_;

    using Input  = mp11::mp_at_c<mp11::mp_list<T...>, N>;
    using Output = mp11::mp_at_c<mp11::mp_list<T...>, N + 1>;
    using Func   = folly::Function<Output(const Input&)>;
    Func func_;
    //if receive sentinal, break
    void operator()()
    {
      for (;;)
      {
      Input val;
      auto  ticket = softpipeline_->pipeline_.template blockingReadStage<N>(val);
      softpipeline_->pipeline_.template blockingWriteStage<N>(ticket, func_(val));
      if (val == hana::at(softpipeline_->sentinals_, hana::size_t<N>{}) )
        break;
      }
    }
  };

 public:
   using HeadType= mp11::mp_front<mp11::mp_list<T...>>;
   using TailType= mp11::mp_back<mp11::mp_list<T...>>;

  template <class... U>
    requires((sizeof...(U) == sizeof...(T) - 1) && !std::is_same_v<std::remove_cv_t< mp11::mp_at_c<mp11::mp_list<U...>,0>   >, SoftPipeline>)
  SoftPipeline(const HeadType& sentinalvalue, U&&... arg) : pipeline_(getnumber<T>(n)...)
  {
  //make sentinals tuple
    auto val   = hana::make_tuple(sentinalvalue);
    auto f     = []<class Seq, class Func>(Seq s, Func func) { return hana::append(s, func(hana::back(s))
    ); };
    auto funcs = hana::make_tuple(arg...);
    sentinals_ = hana::fold(funcs, val, f);
   
   //initional N threads
    [this] <std::size_t... I, class... F>(std::index_sequence<I...>, F&&... f) {
      (..., this->threads_.emplace_back(std::jthread(StageCall<I>{this, std::forward<F>(f)})));
    }(std::index_sequence_for<U...>{}, std::forward<U>(arg)...);
  }
  void blockingWrite(const HeadType &h)
  {
    pipeline_.blockingWrite(h);
  }
  TailType blockingRead()
  {
    TailType val;
    pipeline_.blockingRead(val);
    return val;
  }
};

TEST(mpmcpipeline, basic) {
  SoftPipeline<2,int,int,int> pipeline(3,[](int n) { return n + 1; },[](int n) { return n + 2; });
  pipeline.blockingWrite(1);
  auto n =pipeline.blockingRead();
  fmt::print("n:{}\n",n);
  pipeline.blockingWrite(3);
  n =pipeline.blockingRead();
  fmt::print("n:{}\n",n);
}

value category 和noexcept规范作为练习留给读者。

Posted 2023-05-22