Internal functionality

PiperOrigin-RevId: 882667993
Change-Id: I47f0d92b58caa9cc137ae92e8b568277dd9b1d78
This commit is contained in:
Mike Kruskal
2026-03-12 10:37:28 -07:00
committed by Copybara-Service
parent ec35402ac9
commit 1bcce1bda4
4 changed files with 173 additions and 15 deletions

View File

@@ -20,6 +20,7 @@
#include "absl/base/internal/raw_logging.h"
#include "absl/base/internal/scheduling_mode.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/base/macros.h"
// The following two declarations exist so SchedulingGuard may friend them with
@@ -64,7 +65,6 @@ class SchedulingGuard {
SchedulingGuard(const SchedulingGuard&) = delete;
SchedulingGuard& operator=(const SchedulingGuard&) = delete;
private:
// Disable cooperative rescheduling of the calling thread. It may still
// initiate scheduling operations (e.g. wake-ups), however, it may not itself
// reschedule. Nestable. The returned result is opaque, clients should not
@@ -96,13 +96,6 @@ class SchedulingGuard {
private:
int scheduling_disabled_depth_;
};
// Access to SchedulingGuard is explicitly permitted.
friend class absl::CondVar;
friend class absl::Mutex;
friend class SchedulingHelper;
friend class SpinLock;
friend int absl::synchronization_internal::MutexDelay(int32_t c, int mode);
};
//------------------------------------------------------------------------------
@@ -110,21 +103,89 @@ class SchedulingGuard {
//------------------------------------------------------------------------------
inline bool SchedulingGuard::ReschedulingIsAllowed() {
ThreadIdentity* identity;
identity = CurrentThreadIdentityIfPresent();
if (identity != nullptr) {
ThreadIdentity::SchedulerState* state;
state = &identity->scheduler_state;
// For a thread to be eligible for re-scheduling it must have a bound
// schedulable (otherwise it's not cooperative) and not be within a
// SchedulerGuard region.
return state->get_bound_schedulable() != nullptr &&
state->scheduling_disabled_depth.load(std::memory_order_relaxed) ==
0;
} else {
// Cooperative threads always have a ThreadIdentity.
return false;
}
}
// We don't use [[nodiscard]] here as some clients (e.g.
// FinishPotentiallyBlockingRegion()) cannot yet properly consume it.
inline bool SchedulingGuard::DisableRescheduling() {
ThreadIdentity* identity;
identity = CurrentThreadIdentityIfPresent();
if (identity != nullptr) {
// The depth is accessed concurrently from other threads, so it must be
// atomic, but it's only mutated from this thread, so we don't need an
// atomic increment.
int old_val = identity->scheduler_state.scheduling_disabled_depth.load(
std::memory_order_relaxed);
identity->scheduler_state.scheduling_disabled_depth.store(
old_val + 1, std::memory_order_relaxed);
return true;
} else {
return false;
}
}
inline void SchedulingGuard::EnableRescheduling(bool /* disable_result */) {
inline void SchedulingGuard::EnableRescheduling(bool disable_result) {
if (!disable_result) {
// There was no installed thread identity at the time that scheduling was
// disabled, so we have nothing to do. This is an implementation detail
// that may change in the future, clients may not depend on it.
// EnableRescheduling() must always be called.
return;
}
inline SchedulingGuard::ScopedEnable::ScopedEnable()
: scheduling_disabled_depth_(0) {}
ThreadIdentity* identity;
// A thread identity exists, see above
identity = CurrentThreadIdentityIfPresent();
// The depth is accessed concurrently from other threads, so it must be
// atomic, but it's only mutated from this thread, so we don't need an atomic
// decrement.
int old_val = identity->scheduler_state.scheduling_disabled_depth.load(
std::memory_order_relaxed);
identity->scheduler_state.scheduling_disabled_depth.store(
old_val - 1, std::memory_order_relaxed);
}
inline SchedulingGuard::ScopedEnable::ScopedEnable() {
ThreadIdentity* identity;
identity = CurrentThreadIdentityIfPresent();
if (identity != nullptr) {
scheduling_disabled_depth_ =
identity->scheduler_state.scheduling_disabled_depth.load(
std::memory_order_relaxed);
if (scheduling_disabled_depth_ != 0) {
// The store below does not need to be compare_exchange because
// the value is never modified concurrently (only accessed).
identity->scheduler_state.scheduling_disabled_depth.store(
0, std::memory_order_relaxed);
}
} else {
scheduling_disabled_depth_ = 0;
}
}
inline SchedulingGuard::ScopedEnable::~ScopedEnable() {
ABSL_RAW_CHECK(scheduling_disabled_depth_ == 0, "disable unused warning");
if (scheduling_disabled_depth_ == 0) {
return;
}
ThreadIdentity* identity = CurrentThreadIdentityIfPresent();
// itentity is guaranteed to exist, see the constructor above.
identity->scheduler_state.scheduling_disabled_depth.store(
scheduling_disabled_depth_, std::memory_order_relaxed);
}
} // namespace base_internal

View File

@@ -34,6 +34,13 @@
#include "absl/base/internal/per_thread_tls.h"
#include "absl/base/optimization.h"
// Forward declare Gloop class for scheduling.
namespace base {
namespace scheduling {
class Schedulable;
} // namespace scheduling
} // namespace base
namespace absl {
ABSL_NAMESPACE_BEGIN
@@ -146,6 +153,58 @@ struct ThreadIdentity {
// ThreadIdentity itself.
PerThreadSynch per_thread_synch;
struct SchedulerState {
std::atomic<base::scheduling::Schedulable*> bound_schedulable{nullptr};
// Storage space for a SpinLock, which is created through a placement new to
// break a dependency cycle.
uint32_t association_lock_word;
std::atomic<int> scheduling_disabled_depth;
int potentially_blocking_depth;
uint32_t schedule_next_state;
// When true, current thread is unlocking a mutex and actively waking a
// thread that was previously waiting, but that lock has yet more waiters.
// Used to signal to schedulers that work being woken should get an
// elevated priority.
bool waking_designated_waker;
inline SpinLock* association_lock() {
return reinterpret_cast<SpinLock*>(&association_lock_word);
}
inline base::scheduling::Schedulable* get_bound_schedulable() const {
return bound_schedulable.load(std::memory_order_relaxed);
}
} scheduler_state; // Private: Reserved for use in Gloop
// For worker threads that may not be doing any interesting user work, this
// tracks the current state of the worker. This is used to handle those
// threads differently e.g. when printing stacktraces.
//
// It should only be written to by the thread itself.
//
// Note that this is different from the mutex idle bit - threads running user
// work can be waiting but still be active.
//
// Note: not all parts of the code-base may maintain this field correctly and
// therefore this field should only be used to improve debugging/monitoring.
//
// Put it here to reuse some of the padding space.
enum class WaitState : uint8_t {
kActive = 0,
kWaitingForWork = 1,
};
std::atomic<WaitState> wait_state;
static_assert(std::atomic<WaitState>::is_always_lock_free);
// Add a padding such that scheduler_state is on a different cache line than
// waiter state. We use padding here, so that the size of the structure does
// not substantially grow due to the added padding.
static constexpr size_t kToBePaddedSize =
sizeof(SchedulerState) + sizeof(std::atomic<WaitState>);
static_assert(ABSL_CACHELINE_SIZE >= kToBePaddedSize);
char padding[ABSL_CACHELINE_SIZE - kToBePaddedSize];
// Private: Reserved for absl::synchronization_internal::Waiter.
struct WaiterState {
alignas(void*) char data[256];
@@ -161,6 +220,10 @@ struct ThreadIdentity {
std::atomic<int> wait_start; // Ticker value when thread started waiting.
std::atomic<bool> is_idle; // Has thread become idle yet?
// For tracking depth of __cxa_guard_acquire. This used to recognize heap
// allocations for function static objects.
int static_initialization_depth;
ThreadIdentity* next;
};

View File

@@ -77,6 +77,16 @@ void OneTimeInitThreadIdentity(base_internal::ThreadIdentity* identity) {
identity->ticker.store(0, std::memory_order_relaxed);
identity->wait_start.store(0, std::memory_order_relaxed);
identity->is_idle.store(false, std::memory_order_relaxed);
// To avoid a circular dependency we declare only the storage in the header
// and use placement new to construct the SpinLock.
static_assert(
sizeof(base_internal::ThreadIdentity::SchedulerState::
association_lock_word) == sizeof(base_internal::SpinLock),
"Wrong size for SpinLock");
// Protects the association between this identity and its schedulable. Should
// never be cooperative.
new (&identity->scheduler_state.association_lock_word)
base_internal::SpinLock(base_internal::SCHEDULE_KERNEL_ONLY);
}
static void ResetThreadIdentityBetweenReuse(
@@ -96,6 +106,17 @@ static void ResetThreadIdentityBetweenReuse(
pts->wake = false;
pts->cond_waiter = false;
pts->all_locks = nullptr;
base_internal::ThreadIdentity::SchedulerState* ss =
&identity->scheduler_state;
ss->bound_schedulable.store(nullptr, std::memory_order_relaxed);
ss->association_lock_word = 0;
ss->scheduling_disabled_depth.store(0, std::memory_order_relaxed);
ss->potentially_blocking_depth = 0;
ss->schedule_next_state = 0;
ss->waking_designated_waker = false;
identity->static_initialization_depth = 0;
identity->wait_state.store(base_internal::ThreadIdentity::WaitState::kActive,
std::memory_order_relaxed);
identity->blocked_count_ptr = nullptr;
identity->ticker.store(0, std::memory_order_relaxed);
identity->wait_start.store(0, std::memory_order_relaxed);

View File

@@ -2139,6 +2139,8 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams* waitp) {
intptr_t wr_wait = 0; // set to kMuWrWait if we wake a reader and a
// later writer could have acquired the lock
// (starvation avoidance)
// When non-null, clear its "woken_has_waiters" field before returning.
absl::base_internal::ThreadIdentity* clear_waking_des_waker = nullptr;
ABSL_RAW_CHECK(waitp == nullptr || waitp->thread->waitp == nullptr ||
waitp->thread->suppress_fatal_errors,
"detected illegal recursion into Mutex code");
@@ -2382,6 +2384,13 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams* waitp) {
h->readers = 0;
h->maybe_unlocking = false; // finished unlocking
nv |= wr_wait | kMuWait | reinterpret_cast<intptr_t>(h);
// Signal to any Scheduler that we are waking from Mutex Unlock
// and there are more waiters left, signaling possible contention.
ABSL_TSAN_MUTEX_PRE_DIVERT(this, 0);
clear_waking_des_waker = GetOrCreateCurrentThreadIdentity();
ABSL_TSAN_MUTEX_POST_DIVERT(this, 0);
clear_waking_des_waker->scheduler_state.waking_designated_waker = true;
}
// release both spinlock & lock
@@ -2417,6 +2426,10 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams* waitp) {
ABSL_TSAN_MUTEX_POST_DIVERT(this, 0);
}
}
if (clear_waking_des_waker) {
clear_waking_des_waker->scheduler_state.waking_designated_waker = false;
}
}
// Used by CondVar implementation to reacquire mutex after waking from