blob: 0cb96dacde831f0a4936da2f16cd07e3910c3c19 [file] [log] [blame]
Austin Schuh36244a12019-09-21 17:52:38 -07001// Copyright 2017 The Abseil Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
16#define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
17
18#include <cassert>
19#include <cstddef>
20#include <functional>
21#include <queue>
22#include <thread> // NOLINT(build/c++11)
23#include <vector>
24
25#include "absl/base/thread_annotations.h"
26#include "absl/synchronization/mutex.h"
27
28namespace absl {
Austin Schuhb4691e92020-12-31 12:37:18 -080029ABSL_NAMESPACE_BEGIN
Austin Schuh36244a12019-09-21 17:52:38 -070030namespace synchronization_internal {
31
32// A simple ThreadPool implementation for tests.
33class ThreadPool {
34 public:
35 explicit ThreadPool(int num_threads) {
36 for (int i = 0; i < num_threads; ++i) {
37 threads_.push_back(std::thread(&ThreadPool::WorkLoop, this));
38 }
39 }
40
41 ThreadPool(const ThreadPool &) = delete;
42 ThreadPool &operator=(const ThreadPool &) = delete;
43
44 ~ThreadPool() {
45 {
46 absl::MutexLock l(&mu_);
47 for (size_t i = 0; i < threads_.size(); i++) {
48 queue_.push(nullptr); // Shutdown signal.
49 }
50 }
51 for (auto &t : threads_) {
52 t.join();
53 }
54 }
55
56 // Schedule a function to be run on a ThreadPool thread immediately.
57 void Schedule(std::function<void()> func) {
58 assert(func != nullptr);
59 absl::MutexLock l(&mu_);
60 queue_.push(std::move(func));
61 }
62
63 private:
64 bool WorkAvailable() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
65 return !queue_.empty();
66 }
67
68 void WorkLoop() {
69 while (true) {
70 std::function<void()> func;
71 {
72 absl::MutexLock l(&mu_);
73 mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable));
74 func = std::move(queue_.front());
75 queue_.pop();
76 }
77 if (func == nullptr) { // Shutdown signal.
78 break;
79 }
80 func();
81 }
82 }
83
84 absl::Mutex mu_;
85 std::queue<std::function<void()>> queue_ ABSL_GUARDED_BY(mu_);
86 std::vector<std::thread> threads_;
87};
88
89} // namespace synchronization_internal
Austin Schuhb4691e92020-12-31 12:37:18 -080090ABSL_NAMESPACE_END
Austin Schuh36244a12019-09-21 17:52:38 -070091} // namespace absl
92
93#endif // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_