diff --git a/Source/Shared/arcana/threading/blocking_concurrent_queue.h b/Source/Shared/arcana/threading/blocking_concurrent_queue.h index 24d5276..15eda2d 100644 --- a/Source/Shared/arcana/threading/blocking_concurrent_queue.h +++ b/Source/Shared/arcana/threading/blocking_concurrent_queue.h @@ -21,7 +21,20 @@ namespace arcana { namespace detail { + inline std::mutex callbackMutex; inline std::function beforeWaitCallback{[]() {}}; + + // Copy the callback under the lock and invoke after releasing, avoiding + // potential deadlocks if the callback interacts with other locks. + inline void invoke_before_wait_callback() + { + std::function cb; + { + std::lock_guard lock{callbackMutex}; + cb = beforeWaitCallback; + } + cb(); + } } // Set a callback to be invoked while holding the queue mutex, right before @@ -29,6 +42,7 @@ namespace arcana // lost-wakeup race conditions. Pass an empty lambda [](){} to reset. inline void set_before_wait_callback(std::function callback) { + std::lock_guard lock{detail::callbackMutex}; detail::beforeWaitCallback = std::move(callback); } } @@ -121,7 +135,7 @@ namespace arcana while (!cancel.cancelled() && m_data.empty()) { #ifdef ARCANA_TEST_HOOKS - test_hooks::blocking_concurrent_queue::detail::beforeWaitCallback(); + test_hooks::blocking_concurrent_queue::detail::invoke_before_wait_callback(); #endif m_dataReady.wait(lock); } @@ -145,7 +159,7 @@ namespace arcana while (!cancel.cancelled() && m_data.empty()) { #ifdef ARCANA_TEST_HOOKS - test_hooks::blocking_concurrent_queue::detail::beforeWaitCallback(); + test_hooks::blocking_concurrent_queue::detail::invoke_before_wait_callback(); #endif m_dataReady.wait(lock); }