XRootD
Loading...
Searching...
No Matches
XrdEcThreadPool.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26
27#include <future>
28#include <type_traits>
29
30#ifndef SRC_XRDEC_XRDECTHREADPOOL_HH_
31#define SRC_XRDEC_XRDECTHREADPOOL_HH_
32
33namespace XrdEc
34{
35 //---------------------------------------------------------------------------
36 // A theread pool class for the XrdEc module
37 //---------------------------------------------------------------------------
38 class ThreadPool
39 {
40 private:
41
42 // This is the type which holds sequences
43 template<int ... Is> struct sequence {};
44
45 // First define the template signature
46 template <int ... Ns> struct seq_gen;
47
48 // Recursion case
49 template <int I, int ... Ns>
50 struct seq_gen<I, Ns...>
51 {
52 using type = typename seq_gen<I - 1, I - 1, Ns...>::type;
53 };
54
55 // Recursion abort
56 template <int ... Ns>
57 struct seq_gen<0, Ns...>
58 {
59 using type = sequence<Ns...>;
60 };
61
62 // call functional with arguments in a tuple (implementation)
63 template <typename FUNC, typename TUPL, int ... INDICES>
64 inline static auto tuple_call_impl( FUNC &func, TUPL &args, sequence<INDICES...> ) -> decltype( func( std::move( std::get<INDICES>( args ) )... ) )
65 {
66 return func( std::move( std::get<INDICES>( args ) )... );
67 }
68
69 // call functional with argumetns packaged in a tuple
70 template <typename FUNC, typename ... ARGs>
71 inline static auto tuple_call( FUNC &func, std::tuple<ARGs...> &tup ) ->decltype( tuple_call_impl( func, tup, typename seq_gen<sizeof...(ARGs)>::type{} ) )
72 {
73 return tuple_call_impl( func, tup, typename seq_gen<sizeof...(ARGs)>::type{} );
74 }
75
76 //-----------------------------------------------------------------------
77 // Helper class implementing a job containing any functional and its
78 // arguments.
79 //-----------------------------------------------------------------------
80 template<typename FUNC, typename RET, typename ... ARGs>
81 class AnyJob : public XrdCl::Job
82 {
83 //---------------------------------------------------------------------
84 // Run the functional (returning void) with the packaged arguments
85 //---------------------------------------------------------------------
86 static inline void RunImpl( FUNC func, std::tuple<ARGs...> &args, std::promise<void> &prms )
87 {
88 tuple_call( func, args );
89 prms.set_value();
90 }
91
92 //---------------------------------------------------------------------
93 // Run the functional (returning anything but void) with the packaged
94 // arguments
95 //---------------------------------------------------------------------
96 template<typename RETURN>
97 static inline void RunImpl( FUNC func, std::tuple<ARGs...> &args, std::promise<RETURN> &prms )
98 {
99 prms.set_value( tuple_call( func, args ) );
100 }
101
102 public:
103 //-------------------------------------------------------------------
108 //-------------------------------------------------------------------
109 AnyJob( FUNC func, ARGs... args ) : func( std::move( func ) ),
110 args( std::tuple<ARGs...>( std::move( args )... ) )
111 {
112 }
113
114 //-------------------------------------------------------------------
116 //-------------------------------------------------------------------
117 void Run( void *arg )
118 {
119 RunImpl( this->func, this->args, this->prms );
120 delete this;
121 }
122
123 //-------------------------------------------------------------------
125 //-------------------------------------------------------------------
126 std::future<RET> GetFuture()
127 {
128 return prms.get_future();
129 }
130
131 protected:
132
133 FUNC func; //< the functional
134 std::tuple<ARGs...> args; //< the arguments
135 std::promise<RET> prms; //< the promiss that there will be a result
136 };
137
138 public:
139
140 //-----------------------------------------------------------------------
142 //-----------------------------------------------------------------------
144 {
145 threadpool.Stop();
146 threadpool.Finalize();
147 }
148
149 //-----------------------------------------------------------------------
151 //-----------------------------------------------------------------------
152 static ThreadPool& Instance()
153 {
154 static ThreadPool instance;
155 return instance;
156 }
157
158 //-----------------------------------------------------------------------
160 //-----------------------------------------------------------------------
161 template<typename FUNC, typename ... ARGs>
162 inline std::future<std::invoke_result_t<FUNC, ARGs...>>
163 Execute( FUNC func, ARGs... args )
164 {
165 using RET = std::invoke_result_t<FUNC, ARGs...>;
166 auto *job = new AnyJob<FUNC, RET, ARGs...>( func, std::move( args )... );
167 std::future<RET> ftr = job->GetFuture();
168 threadpool.QueueJob( job, nullptr );
169 return ftr;
170 }
171
172 private:
173
174 //-----------------------------------------------------------------------
176 //-----------------------------------------------------------------------
177 ThreadPool() : threadpool( 64 )
178 {
179 threadpool.Initialize();
180 threadpool.Start();
181 }
182
183 XrdCl::JobManager threadpool; //< the thread-pool itself
184 };
185
186}
187
188
189#endif /* SRC_XRDEC_XRDECTHREADPOOL_HH_ */
#define I(x)
A synchronized queue.
bool Start()
Start the workers.
bool Initialize()
Initialize the job manager.
Interface for a job to be run by the job manager.
static ThreadPool & Instance()
Singleton access.
~ThreadPool()
Destructor.
std::future< std::invoke_result_t< FUNC, ARGs... > > Execute(FUNC func, ARGs... args)
Schedule a functional (together with its arguments) for execution.