diff --git a/absl/base/internal/low_level_scheduling.h b/absl/base/internal/low_level_scheduling.h index 9baccc06..c2738914 100644 --- a/absl/base/internal/low_level_scheduling.h +++ b/absl/base/internal/low_level_scheduling.h @@ -20,6 +20,7 @@ #include "absl/base/internal/raw_logging.h" #include "absl/base/internal/scheduling_mode.h" +#include "absl/base/internal/thread_identity.h" #include "absl/base/macros.h" // The following two declarations exist so SchedulingGuard may friend them with @@ -64,7 +65,6 @@ class SchedulingGuard { SchedulingGuard(const SchedulingGuard&) = delete; SchedulingGuard& operator=(const SchedulingGuard&) = delete; - private: // Disable cooperative rescheduling of the calling thread. It may still // initiate scheduling operations (e.g. wake-ups), however, it may not itself // reschedule. Nestable. The returned result is opaque, clients should not @@ -96,13 +96,6 @@ class SchedulingGuard { private: int scheduling_disabled_depth_; }; - - // Access to SchedulingGuard is explicitly permitted. - friend class absl::CondVar; - friend class absl::Mutex; - friend class SchedulingHelper; - friend class SpinLock; - friend int absl::synchronization_internal::MutexDelay(int32_t c, int mode); }; //------------------------------------------------------------------------------ @@ -110,21 +103,89 @@ class SchedulingGuard { //------------------------------------------------------------------------------ inline bool SchedulingGuard::ReschedulingIsAllowed() { - return false; + ThreadIdentity* identity; + identity = CurrentThreadIdentityIfPresent(); + if (identity != nullptr) { + ThreadIdentity::SchedulerState* state; + state = &identity->scheduler_state; + // For a thread to be eligible for re-scheduling it must have a bound + // schedulable (otherwise it's not cooperative) and not be within a + // SchedulerGuard region. + return state->get_bound_schedulable() != nullptr && + state->scheduling_disabled_depth.load(std::memory_order_relaxed) == + 0; + } else { + // Cooperative threads always have a ThreadIdentity. + return false; + } } +// We don't use [[nodiscard]] here as some clients (e.g. +// FinishPotentiallyBlockingRegion()) cannot yet properly consume it. inline bool SchedulingGuard::DisableRescheduling() { - return false; + ThreadIdentity* identity; + identity = CurrentThreadIdentityIfPresent(); + if (identity != nullptr) { + // The depth is accessed concurrently from other threads, so it must be + // atomic, but it's only mutated from this thread, so we don't need an + // atomic increment. + int old_val = identity->scheduler_state.scheduling_disabled_depth.load( + std::memory_order_relaxed); + identity->scheduler_state.scheduling_disabled_depth.store( + old_val + 1, std::memory_order_relaxed); + return true; + } else { + return false; + } } -inline void SchedulingGuard::EnableRescheduling(bool /* disable_result */) { - return; +inline void SchedulingGuard::EnableRescheduling(bool disable_result) { + if (!disable_result) { + // There was no installed thread identity at the time that scheduling was + // disabled, so we have nothing to do. This is an implementation detail + // that may change in the future, clients may not depend on it. + // EnableRescheduling() must always be called. + return; + } + + ThreadIdentity* identity; + // A thread identity exists, see above + identity = CurrentThreadIdentityIfPresent(); + // The depth is accessed concurrently from other threads, so it must be + // atomic, but it's only mutated from this thread, so we don't need an atomic + // decrement. + int old_val = identity->scheduler_state.scheduling_disabled_depth.load( + std::memory_order_relaxed); + identity->scheduler_state.scheduling_disabled_depth.store( + old_val - 1, std::memory_order_relaxed); +} + +inline SchedulingGuard::ScopedEnable::ScopedEnable() { + ThreadIdentity* identity; + identity = CurrentThreadIdentityIfPresent(); + if (identity != nullptr) { + scheduling_disabled_depth_ = + identity->scheduler_state.scheduling_disabled_depth.load( + std::memory_order_relaxed); + if (scheduling_disabled_depth_ != 0) { + // The store below does not need to be compare_exchange because + // the value is never modified concurrently (only accessed). + identity->scheduler_state.scheduling_disabled_depth.store( + 0, std::memory_order_relaxed); + } + } else { + scheduling_disabled_depth_ = 0; + } } -inline SchedulingGuard::ScopedEnable::ScopedEnable() - : scheduling_disabled_depth_(0) {} inline SchedulingGuard::ScopedEnable::~ScopedEnable() { - ABSL_RAW_CHECK(scheduling_disabled_depth_ == 0, "disable unused warning"); + if (scheduling_disabled_depth_ == 0) { + return; + } + ThreadIdentity* identity = CurrentThreadIdentityIfPresent(); + // itentity is guaranteed to exist, see the constructor above. + identity->scheduler_state.scheduling_disabled_depth.store( + scheduling_disabled_depth_, std::memory_order_relaxed); } } // namespace base_internal diff --git a/absl/base/internal/thread_identity.h b/absl/base/internal/thread_identity.h index acfc15a8..f2525438 100644 --- a/absl/base/internal/thread_identity.h +++ b/absl/base/internal/thread_identity.h @@ -34,6 +34,13 @@ #include "absl/base/internal/per_thread_tls.h" #include "absl/base/optimization.h" +// Forward declare Gloop class for scheduling. +namespace base { +namespace scheduling { +class Schedulable; +} // namespace scheduling +} // namespace base + namespace absl { ABSL_NAMESPACE_BEGIN @@ -146,6 +153,58 @@ struct ThreadIdentity { // ThreadIdentity itself. PerThreadSynch per_thread_synch; + struct SchedulerState { + std::atomic bound_schedulable{nullptr}; + // Storage space for a SpinLock, which is created through a placement new to + // break a dependency cycle. + uint32_t association_lock_word; + std::atomic scheduling_disabled_depth; + int potentially_blocking_depth; + uint32_t schedule_next_state; + + // When true, current thread is unlocking a mutex and actively waking a + // thread that was previously waiting, but that lock has yet more waiters. + // Used to signal to schedulers that work being woken should get an + // elevated priority. + bool waking_designated_waker; + + inline SpinLock* association_lock() { + return reinterpret_cast(&association_lock_word); + } + + inline base::scheduling::Schedulable* get_bound_schedulable() const { + return bound_schedulable.load(std::memory_order_relaxed); + } + } scheduler_state; // Private: Reserved for use in Gloop + + // For worker threads that may not be doing any interesting user work, this + // tracks the current state of the worker. This is used to handle those + // threads differently e.g. when printing stacktraces. + // + // It should only be written to by the thread itself. + // + // Note that this is different from the mutex idle bit - threads running user + // work can be waiting but still be active. + // + // Note: not all parts of the code-base may maintain this field correctly and + // therefore this field should only be used to improve debugging/monitoring. + // + // Put it here to reuse some of the padding space. + enum class WaitState : uint8_t { + kActive = 0, + kWaitingForWork = 1, + }; + std::atomic wait_state; + static_assert(std::atomic::is_always_lock_free); + + // Add a padding such that scheduler_state is on a different cache line than + // waiter state. We use padding here, so that the size of the structure does + // not substantially grow due to the added padding. + static constexpr size_t kToBePaddedSize = + sizeof(SchedulerState) + sizeof(std::atomic); + static_assert(ABSL_CACHELINE_SIZE >= kToBePaddedSize); + char padding[ABSL_CACHELINE_SIZE - kToBePaddedSize]; + // Private: Reserved for absl::synchronization_internal::Waiter. struct WaiterState { alignas(void*) char data[256]; @@ -161,6 +220,10 @@ struct ThreadIdentity { std::atomic wait_start; // Ticker value when thread started waiting. std::atomic is_idle; // Has thread become idle yet? + // For tracking depth of __cxa_guard_acquire. This used to recognize heap + // allocations for function static objects. + int static_initialization_depth; + ThreadIdentity* next; }; diff --git a/absl/synchronization/internal/create_thread_identity.cc b/absl/synchronization/internal/create_thread_identity.cc index 0b0f9207..e10d3bc0 100644 --- a/absl/synchronization/internal/create_thread_identity.cc +++ b/absl/synchronization/internal/create_thread_identity.cc @@ -77,6 +77,16 @@ void OneTimeInitThreadIdentity(base_internal::ThreadIdentity* identity) { identity->ticker.store(0, std::memory_order_relaxed); identity->wait_start.store(0, std::memory_order_relaxed); identity->is_idle.store(false, std::memory_order_relaxed); + // To avoid a circular dependency we declare only the storage in the header + // and use placement new to construct the SpinLock. + static_assert( + sizeof(base_internal::ThreadIdentity::SchedulerState:: + association_lock_word) == sizeof(base_internal::SpinLock), + "Wrong size for SpinLock"); + // Protects the association between this identity and its schedulable. Should + // never be cooperative. + new (&identity->scheduler_state.association_lock_word) + base_internal::SpinLock(base_internal::SCHEDULE_KERNEL_ONLY); } static void ResetThreadIdentityBetweenReuse( @@ -96,6 +106,17 @@ static void ResetThreadIdentityBetweenReuse( pts->wake = false; pts->cond_waiter = false; pts->all_locks = nullptr; + base_internal::ThreadIdentity::SchedulerState* ss = + &identity->scheduler_state; + ss->bound_schedulable.store(nullptr, std::memory_order_relaxed); + ss->association_lock_word = 0; + ss->scheduling_disabled_depth.store(0, std::memory_order_relaxed); + ss->potentially_blocking_depth = 0; + ss->schedule_next_state = 0; + ss->waking_designated_waker = false; + identity->static_initialization_depth = 0; + identity->wait_state.store(base_internal::ThreadIdentity::WaitState::kActive, + std::memory_order_relaxed); identity->blocked_count_ptr = nullptr; identity->ticker.store(0, std::memory_order_relaxed); identity->wait_start.store(0, std::memory_order_relaxed); diff --git a/absl/synchronization/mutex.cc b/absl/synchronization/mutex.cc index 9b80f1f5..20164352 100644 --- a/absl/synchronization/mutex.cc +++ b/absl/synchronization/mutex.cc @@ -2139,6 +2139,8 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams* waitp) { intptr_t wr_wait = 0; // set to kMuWrWait if we wake a reader and a // later writer could have acquired the lock // (starvation avoidance) + // When non-null, clear its "woken_has_waiters" field before returning. + absl::base_internal::ThreadIdentity* clear_waking_des_waker = nullptr; ABSL_RAW_CHECK(waitp == nullptr || waitp->thread->waitp == nullptr || waitp->thread->suppress_fatal_errors, "detected illegal recursion into Mutex code"); @@ -2382,6 +2384,13 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams* waitp) { h->readers = 0; h->maybe_unlocking = false; // finished unlocking nv |= wr_wait | kMuWait | reinterpret_cast(h); + + // Signal to any Scheduler that we are waking from Mutex Unlock + // and there are more waiters left, signaling possible contention. + ABSL_TSAN_MUTEX_PRE_DIVERT(this, 0); + clear_waking_des_waker = GetOrCreateCurrentThreadIdentity(); + ABSL_TSAN_MUTEX_POST_DIVERT(this, 0); + clear_waking_des_waker->scheduler_state.waking_designated_waker = true; } // release both spinlock & lock @@ -2417,6 +2426,10 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams* waitp) { ABSL_TSAN_MUTEX_POST_DIVERT(this, 0); } } + + if (clear_waking_des_waker) { + clear_waking_des_waker->scheduler_state.waking_designated_waker = false; + } } // Used by CondVar implementation to reacquire mutex after waking from