Please, help us to better know about our user community by answering the following short survey: https://forms.gle/wpyrxWi18ox9Z5ae9
 
Loading...
Searching...
No Matches
ThreadLocal.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
11#define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
12
13#ifdef EIGEN_AVOID_THREAD_LOCAL
14
15#ifdef EIGEN_THREAD_LOCAL
16#undef EIGEN_THREAD_LOCAL
17#endif
18
19#else
20
21#if EIGEN_MAX_CPP_VER >= 11 && \
22 ((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \
23 __has_feature(cxx_thread_local) || \
24 (EIGEN_COMP_MSVC >= 1900) )
25#define EIGEN_THREAD_LOCAL static thread_local
26#endif
27
28// Disable TLS for Apple and Android builds with older toolchains.
29#if defined(__APPLE__)
30// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED,
31// __IPHONE_8_0.
32#include <Availability.h>
33#include <TargetConditionals.h>
34#endif
35// Checks whether C++11's `thread_local` storage duration specifier is
36// supported.
37#if defined(__apple_build_version__) && \
38 ((__apple_build_version__ < 8000042) || \
39 (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0))
40// Notes: Xcode's clang did not support `thread_local` until version
41// 8, and even then not for all iOS < 9.0.
42#undef EIGEN_THREAD_LOCAL
43
44#elif defined(__ANDROID__) && EIGEN_COMP_CLANG
45// There are platforms for which TLS should not be used even though the compiler
46// makes it seem like it's supported (Android NDK < r12b for example).
47// This is primarily because of linker problems and toolchain misconfiguration:
48// TLS isn't supported until NDK r12b per
49// https://developer.android.com/ndk/downloads/revision_history.html
50// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in
51// <android/ndk-version.h>. For NDK < r16, users should define these macros,
52// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11.
53#if __has_include(<android/ndk-version.h>)
54#include <android/ndk-version.h>
55#endif // __has_include(<android/ndk-version.h>)
56#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \
57 defined(__NDK_MINOR__) && \
58 ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1)))
59#undef EIGEN_THREAD_LOCAL
60#endif
61#endif // defined(__ANDROID__) && defined(__clang__)
62
63#endif // EIGEN_AVOID_THREAD_LOCAL
64
65namespace Eigen {
66
67namespace internal {
68template <typename T>
69struct ThreadLocalNoOpInitialize {
70 void operator()(T&) const {}
71};
72
73template <typename T>
74struct ThreadLocalNoOpRelease {
75 void operator()(T&) const {}
76};
77
78} // namespace internal
79
80// Thread local container for elements of type T, that does not use thread local
81// storage. As long as the number of unique threads accessing this storage
82// is smaller than `capacity_`, it is lock-free and wait-free. Otherwise it will
83// use a mutex for synchronization.
84//
85// Type `T` has to be default constructible, and by default each thread will get
86// a default constructed value. It is possible to specify custom `initialize`
87// callable, that will be called lazily from each thread accessing this object,
88// and will be passed a default initialized object of type `T`. Also it's
89// possible to pass a custom `release` callable, that will be invoked before
90// calling ~T().
91//
92// Example:
93//
94// struct Counter {
95// int value = 0;
96// }
97//
98// Eigen::ThreadLocal<Counter> counter(10);
99//
100// // Each thread will have access to it's own counter object.
101// Counter& cnt = counter.local();
102// cnt++;
103//
104// WARNING: Eigen::ThreadLocal uses the OS-specific value returned by
105// std::this_thread::get_id() to identify threads. This value is not guaranteed
106// to be unique except for the life of the thread. A newly created thread may
107// get an OS-specific ID equal to that of an already destroyed thread.
108//
109// Somewhat similar to TBB thread local storage, with similar restrictions:
110// https://www.threadingbuildingblocks.org/docs/help/reference/thread_local_storage/enumerable_thread_specific_cls.html
111//
112template <typename T,
113 typename Initialize = internal::ThreadLocalNoOpInitialize<T>,
114 typename Release = internal::ThreadLocalNoOpRelease<T>>
115class ThreadLocal {
116 // We preallocate default constructed elements in MaxSizedVector.
117 static_assert(std::is_default_constructible<T>::value,
118 "ThreadLocal data type must be default constructible");
119
120 public:
121 explicit ThreadLocal(int capacity)
122 : ThreadLocal(capacity, internal::ThreadLocalNoOpInitialize<T>(),
123 internal::ThreadLocalNoOpRelease<T>()) {}
124
125 ThreadLocal(int capacity, Initialize initialize)
126 : ThreadLocal(capacity, std::move(initialize),
127 internal::ThreadLocalNoOpRelease<T>()) {}
128
129 ThreadLocal(int capacity, Initialize initialize, Release release)
130 : initialize_(std::move(initialize)),
131 release_(std::move(release)),
132 capacity_(capacity),
133 data_(capacity_),
134 ptr_(capacity_),
135 filled_records_(0) {
136 eigen_assert(capacity_ >= 0);
137 data_.resize(capacity_);
138 for (int i = 0; i < capacity_; ++i) {
139 ptr_.emplace_back(nullptr);
140 }
141 }
142
143 T& local() {
144 std::thread::id this_thread = std::this_thread::get_id();
145 if (capacity_ == 0) return SpilledLocal(this_thread);
146
147 std::size_t h = std::hash<std::thread::id>()(this_thread);
148 const int start_idx = h % capacity_;
149
150 // NOTE: From the definition of `std::this_thread::get_id()` it is
151 // guaranteed that we never can have concurrent insertions with the same key
152 // to our hash-map like data structure. If we didn't find an element during
153 // the initial traversal, it's guaranteed that no one else could have
154 // inserted it while we are in this function. This allows to massively
155 // simplify out lock-free insert-only hash map.
156
157 // Check if we already have an element for `this_thread`.
158 int idx = start_idx;
159 while (ptr_[idx].load() != nullptr) {
160 ThreadIdAndValue& record = *(ptr_[idx].load());
161 if (record.thread_id == this_thread) return record.value;
162
163 idx += 1;
164 if (idx >= capacity_) idx -= capacity_;
165 if (idx == start_idx) break;
166 }
167
168 // If we are here, it means that we found an insertion point in lookup
169 // table at `idx`, or we did a full traversal and table is full.
170
171 // If lock-free storage is full, fallback on mutex.
172 if (filled_records_.load() >= capacity_) return SpilledLocal(this_thread);
173
174 // We double check that we still have space to insert an element into a lock
175 // free storage. If old value in `filled_records_` is larger than the
176 // records capacity, it means that some other thread added an element while
177 // we were traversing lookup table.
178 int insertion_index =
179 filled_records_.fetch_add(1, std::memory_order_relaxed);
180 if (insertion_index >= capacity_) return SpilledLocal(this_thread);
181
182 // At this point it's guaranteed that we can access to
183 // data_[insertion_index_] without a data race.
184 data_[insertion_index].thread_id = this_thread;
185 initialize_(data_[insertion_index].value);
186
187 // That's the pointer we'll put into the lookup table.
188 ThreadIdAndValue* inserted = &data_[insertion_index];
189
190 // We'll use nullptr pointer to ThreadIdAndValue in a compare-and-swap loop.
191 ThreadIdAndValue* empty = nullptr;
192
193 // Now we have to find an insertion point into the lookup table. We start
194 // from the `idx` that was identified as an insertion point above, it's
195 // guaranteed that we will have an empty record somewhere in a lookup table
196 // (because we created a record in the `data_`).
197 const int insertion_idx = idx;
198
199 do {
200 // Always start search from the original insertion candidate.
201 idx = insertion_idx;
202 while (ptr_[idx].load() != nullptr) {
203 idx += 1;
204 if (idx >= capacity_) idx -= capacity_;
205 // If we did a full loop, it means that we don't have any free entries
206 // in the lookup table, and this means that something is terribly wrong.
207 eigen_assert(idx != insertion_idx);
208 }
209 // Atomic CAS of the pointer guarantees that any other thread, that will
210 // follow this pointer will see all the mutations in the `data_`.
211 } while (!ptr_[idx].compare_exchange_weak(empty, inserted));
212
213 return inserted->value;
214 }
215
216 // WARN: It's not thread safe to call it concurrently with `local()`.
217 void ForEach(std::function<void(std::thread::id, T&)> f) {
218 // Reading directly from `data_` is unsafe, because only CAS to the
219 // record in `ptr_` makes all changes visible to other threads.
220 for (auto& ptr : ptr_) {
221 ThreadIdAndValue* record = ptr.load();
222 if (record == nullptr) continue;
223 f(record->thread_id, record->value);
224 }
225
226 // We did not spill into the map based storage.
227 if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
228
229 // Adds a happens before edge from the last call to SpilledLocal().
230 std::unique_lock<std::mutex> lock(mu_);
231 for (auto& kv : per_thread_map_) {
232 f(kv.first, kv.second);
233 }
234 }
235
236 // WARN: It's not thread safe to call it concurrently with `local()`.
237 ~ThreadLocal() {
238 // Reading directly from `data_` is unsafe, because only CAS to the record
239 // in `ptr_` makes all changes visible to other threads.
240 for (auto& ptr : ptr_) {
241 ThreadIdAndValue* record = ptr.load();
242 if (record == nullptr) continue;
243 release_(record->value);
244 }
245
246 // We did not spill into the map based storage.
247 if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
248
249 // Adds a happens before edge from the last call to SpilledLocal().
250 std::unique_lock<std::mutex> lock(mu_);
251 for (auto& kv : per_thread_map_) {
252 release_(kv.second);
253 }
254 }
255
256 private:
257 struct ThreadIdAndValue {
258 std::thread::id thread_id;
259 T value;
260 };
261
262 // Use unordered map guarded by a mutex when lock free storage is full.
263 T& SpilledLocal(std::thread::id this_thread) {
264 std::unique_lock<std::mutex> lock(mu_);
265
266 auto it = per_thread_map_.find(this_thread);
267 if (it == per_thread_map_.end()) {
268 auto result = per_thread_map_.emplace(this_thread, T());
269 eigen_assert(result.second);
270 initialize_((*result.first).second);
271 return (*result.first).second;
272 } else {
273 return it->second;
274 }
275 }
276
277 Initialize initialize_;
278 Release release_;
279 const int capacity_;
280
281 // Storage that backs lock-free lookup table `ptr_`. Records stored in this
282 // storage contiguously starting from index 0.
283 MaxSizeVector<ThreadIdAndValue> data_;
284
285 // Atomic pointers to the data stored in `data_`. Used as a lookup table for
286 // linear probing hash map (https://en.wikipedia.org/wiki/Linear_probing).
287 MaxSizeVector<std::atomic<ThreadIdAndValue*>> ptr_;
288
289 // Number of records stored in the `data_`.
290 std::atomic<int> filled_records_;
291
292 // We fallback on per thread map if lock-free storage is full. In practice
293 // this should never happen, if `capacity_` is a reasonable estimate of the
294 // number of threads running in a system.
295 std::mutex mu_; // Protects per_thread_map_.
296 std::unordered_map<std::thread::id, T> per_thread_map_;
297};
298
299} // namespace Eigen
300
301#endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
Namespace containing all symbols from the Eigen library.