From 5f192ad30fcc8a6a2c1718ce566cccf320163759 Mon Sep 17 00:00:00 2001 From: Antonin Portelli Date: Sun, 28 Nov 2021 23:26:17 +0000 Subject: [PATCH] Thread pool implementation --- examples/Makefile.am | 7 ++- examples/exThreadPool.cpp | 29 ++++++++++ lib/Core/ThreadPool.cpp | 109 ++++++++++++++++++++++++++++++++++++++ lib/Core/ThreadPool.hpp | 54 +++++++++++++++++++ lib/Core/stdincludes.hpp | 2 + lib/Makefile.am | 2 + 6 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 examples/exThreadPool.cpp create mode 100644 lib/Core/ThreadPool.cpp create mode 100644 lib/Core/ThreadPool.hpp diff --git a/examples/Makefile.am b/examples/Makefile.am index dcb5a7c..123bafb 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -19,7 +19,8 @@ noinst_PROGRAMS = \ exPlot \ exPValue \ exRand \ - exRootFinder + exRootFinder \ + exThreadPool exCompiledDoubleFunction_SOURCES = exCompiledDoubleFunction.cpp exCompiledDoubleFunction_CXXFLAGS = $(COM_CXXFLAGS) @@ -73,4 +74,8 @@ exRootFinder_SOURCES = exRootFinder.cpp exRootFinder_CXXFLAGS = $(COM_CXXFLAGS) exRootFinder_LDFLAGS = -L../lib/.libs -lLatAnalyze +exThreadPool_SOURCES = exThreadPool.cpp +exThreadPool_CXXFLAGS = $(COM_CXXFLAGS) +exThreadPool_LDFLAGS = -L../lib/.libs -lLatAnalyze + ACLOCAL_AMFLAGS = -I .buildutils/m4 diff --git a/examples/exThreadPool.cpp b/examples/exThreadPool.cpp new file mode 100644 index 0000000..85c714d --- /dev/null +++ b/examples/exThreadPool.cpp @@ -0,0 +1,29 @@ +#include + +using namespace std; +using namespace Latan; + +int main(void) +{ + ThreadPool pool; + + cout << "Using " << pool.getThreadNum() << " threads" << endl; + for (unsigned int i = 1; i <= 20; ++i) + { + pool.addJob([i, &pool](void) + { + { + unique_lock lock(pool.getMutex()); + cout << "job " << i << " wait for " << i*100 << " ms" << endl; + } + this_thread::sleep_for(chrono::milliseconds(i*100)); + { + unique_lock lock(pool.getMutex()); + cout << "job " << i << " done" << endl; + } + }); + } + pool.terminate(); + + return EXIT_SUCCESS; +} diff --git a/lib/Core/ThreadPool.cpp b/lib/Core/ThreadPool.cpp new file mode 100644 index 0000000..de4f575 --- /dev/null +++ b/lib/Core/ThreadPool.cpp @@ -0,0 +1,109 @@ +/* + * ThreadPool.cpp, part of LatAnalyze 3 + * + * Copyright (C) 2013 - 2021 Antonin Portelli + * + * LatAnalyze 3 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * LatAnalyze 3 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LatAnalyze 3. If not, see . + */ + +#include +#include + +using namespace std; +using namespace Latan; + +/****************************************************************************** + * ThreadPool implementation * + ******************************************************************************/ +// constructors //////////////////////////////////////////////////////////////// +ThreadPool::ThreadPool(void) +: ThreadPool(std::thread::hardware_concurrency()) +{} + +ThreadPool::ThreadPool(const unsigned int nThreads) +: nThreads_(nThreads) +{ + for (unsigned int t = 0; t < nThreads_; ++t) + { + threads_.push_back(thread(&ThreadPool::workerLoop, this)); + } +} + +// destructor ////////////////////////////////////////////////////////////////// +ThreadPool::~ThreadPool(void) +{ + terminate(); +} + +// get the number of threads /////////////////////////////////////////////////// +unsigned int ThreadPool::getThreadNum(void) const +{ + return nThreads_; +} + +// get the pool mutex for synchronisation ////////////////////////////////////// +std::mutex & ThreadPool::getMutex(void) +{ + return mutex_; +} + +// worker loop ///////////////////////////////////////////////////////////////// +void ThreadPool::workerLoop(void) +{ + while (true) + { + Job job; + { + unique_lock lock(mutex_); + + condition_.wait(lock, [this](){ + return !queue_.empty() || terminatePool_; + }); + if (terminatePool_ and queue_.empty()) + { + return; + } + job = queue_.front(); + queue_.pop(); + } + job(); + } +} + +// add jobs //////////////////////////////////////////////////////////////////// +void ThreadPool::addJob(Job newJob) +{ + { + unique_lock lock(mutex_); + + queue_.push(newJob); + } + condition_.notify_one(); +} + +// wait for completion ///////////////////////////////////////////////////////// +void ThreadPool::terminate(void) +{ + { + unique_lock lock(mutex_); + + terminatePool_ = true; + } + condition_.notify_all(); + for (auto &thread: threads_) + { + thread.join(); + } + threads_.clear(); +} diff --git a/lib/Core/ThreadPool.hpp b/lib/Core/ThreadPool.hpp new file mode 100644 index 0000000..5a504e5 --- /dev/null +++ b/lib/Core/ThreadPool.hpp @@ -0,0 +1,54 @@ +/* + * ThreadPool.hpp, part of LatAnalyze 3 + * + * Copyright (C) 2013 - 2021 Antonin Portelli + * + * LatAnalyze 3 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * LatAnalyze 3 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LatAnalyze 3. If not, see . + */ + +#ifndef Latan_ThreadPool_hpp_ +#define Latan_ThreadPool_hpp_ + +#include + +class ThreadPool +{ +public: + typedef std::function Job; +public: + // constructors/destructor + ThreadPool(void); + ThreadPool(const unsigned int nThreads); + virtual ~ThreadPool(void); + // get the number of threads + unsigned int getThreadNum(void) const; + // get the pool mutex for synchronisation + std::mutex & getMutex(void); + // add jobs + void addJob(Job newJob); + // wait for completion and terminate + void terminate(void); +private: + // worker loop + void workerLoop(void); +private: + unsigned int nThreads_; + std::condition_variable condition_; + std::vector threads_; + bool terminatePool_{false}; + std::queue queue_; + std::mutex mutex_; +}; + +#endif diff --git a/lib/Core/stdincludes.hpp b/lib/Core/stdincludes.hpp index bec9bf8..e976118 100644 --- a/lib/Core/stdincludes.hpp +++ b/lib/Core/stdincludes.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +41,7 @@ #include #include #include +#include #include #include #include diff --git a/lib/Makefile.am b/lib/Makefile.am index ddb23b6..9fb5e8c 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -32,6 +32,7 @@ libLatAnalyze_la_SOURCES = \ Core/MathParser.ypp \ Core/OptParser.cpp \ Core/Plot.cpp \ + Core/ThreadPool.cpp \ Core/Utilities.cpp \ Functional/CompiledFunction.cpp \ Functional/CompiledModel.cpp \ @@ -75,6 +76,7 @@ HPPFILES = \ Core/OptParser.hpp \ Core/ParserState.hpp \ Core/Plot.hpp \ + Core/ThreadPool.hpp \ Core/stdincludes.hpp \ Core/Utilities.hpp \ Functional/CompiledFunction.hpp \