atomic.cpp: minor notification improvement

Check mask before locking the reference.
Can help to not bother non-eligible waiters.
This commit is contained in:
Nekotekina 2020-11-11 11:32:52 +03:00
parent 4b823469f7
commit d391133524

View file

@ -207,7 +207,7 @@ ptr_cmp(const void* data, u32 _size, __m128i old128, __m128i mask128, atomic_wai
}
// Returns true if mask overlaps, or the argument is invalid
static u32
static bool
#ifdef _WIN32
__vectorcall
#endif
@ -219,7 +219,7 @@ cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m12
// Simple mask overlap
const auto v0 = _mm_and_si128(mask1, mask2);
const auto v1 = _mm_packs_epi16(v0, v0);
return _mm_cvtsi128_si64(v1) ? 1 : 0;
return !!_mm_cvtsi128_si64(v1);
}
// Generate masked value inequality bits
@ -243,14 +243,14 @@ cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m12
if (!(_mm_cvtsi128_si64(v0) & mask))
{
return flag & op_flag::inverse ? 2 : 0;
return !!(flag & op_flag::inverse);
}
}
else if (size == 16)
{
if (!_mm_cvtsi128_si64(_mm_packs_epi16(v0, v0)))
{
return flag & op_flag::inverse ? 2 : 0;
return !!(flag & op_flag::inverse);
}
}
else
@ -259,8 +259,7 @@ cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m12
std::abort();
}
// Use force wake-up
return flag & op_flag::inverse ? 0 : 2;
return !(flag & op_flag::inverse);
}
static atomic_t<u64> s_min_tsc{0};
@ -367,8 +366,8 @@ namespace atomic_wait
mtx.init(mtx);
#endif
// Initialize first reference
verify(HERE), !refs++;
verify(HERE), !refs;
refs.release(1);
}
void destroy()
@ -379,7 +378,9 @@ namespace atomic_wait
link = 0;
size = 0;
flag = 0;
sync = 0;
sync.release(0);
mask = _mm_setzero_si128();
oldv = _mm_setzero_si128();
#ifdef USE_STD
mtx.destroy();
@ -391,7 +392,7 @@ namespace atomic_wait
{
const auto [_old, ok] = sync.fetch_op([](u32& val)
{
if (val == 1 || val == 2)
if (val - 1 <= 1)
{
val = 3;
return true;
@ -408,7 +409,18 @@ namespace atomic_wait
{
if (cmp_res == 1) [[likely]]
{
return sync == 1 && sync.compare_and_swap_test(1, 2);
const auto [_old, ok] = sync.fetch_op([](u32& val)
{
if (val == 1)
{
val = 2;
return true;
}
return false;
});
return ok;
}
if (cmp_res > 1) [[unlikely]]
@ -421,6 +433,22 @@ namespace atomic_wait
return false;
}
bool set_sleep()
{
const auto [_old, ok] = sync.fetch_op([](u32& val)
{
if (val == 2)
{
val = 1;
return true;
}
return false;
});
return ok;
}
void alert_native()
{
#ifdef USE_FUTEX
@ -493,7 +521,11 @@ static atomic_t<u64, 64> s_cond_bits[(UINT16_MAX + 1) / 64]{};
// Allocation semaphore
static atomic_t<u32, 64> s_cond_sema{0};
static u32 cond_alloc(const void* data)
static u32
#ifdef _WIN32
__vectorcall
#endif
cond_alloc(const void* data, __m128i mask)
{
// Determine whether there is a free slot or not
if (!s_cond_sema.try_inc(UINT16_MAX + 1))
@ -540,6 +572,7 @@ static u32 cond_alloc(const void* data)
}
// Initialize new "semaphore"
s_cond_list[id].mask = mask;
s_cond_list[id].init(data);
return id;
@ -577,7 +610,11 @@ static void cond_free(u32 cond_id)
verify(HERE), s_cond_sema--;
}
static atomic_wait::cond_handle* cond_id_lock(u32 cond_id, u64 thread_id = 0, const void* data = nullptr)
static atomic_wait::cond_handle*
#ifdef _WIN32
__vectorcall
#endif
cond_id_lock(u32 cond_id, __m128i mask, u64 thread_id = 0, const void* data = nullptr)
{
if (cond_id - 1 < u32{UINT16_MAX})
{
@ -598,6 +635,8 @@ static atomic_wait::cond_handle* cond_id_lock(u32 cond_id, u64 thread_id = 0, co
return false;
}
const __m128i mask12 = _mm_and_si128(mask, _mm_load_si128(&cond->mask));
if (thread_id)
{
if (atomic_storage<u64>::load(cond->tid) != thread_id)
@ -614,6 +653,11 @@ static atomic_wait::cond_handle* cond_id_lock(u32 cond_id, u64 thread_id = 0, co
}
}
if (_mm_cvtsi128_si64(_mm_packs_epi16(mask12, mask12)) == 0)
{
return false;
}
ref++;
return true;
});
@ -625,7 +669,7 @@ static atomic_wait::cond_handle* cond_id_lock(u32 cond_id, u64 thread_id = 0, co
if (old == UINT16_MAX)
{
fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " reached in an atomic notifier.");
fmt::raw_error("Reference count limit " STRINGIZE(UINT16_MAX) " reached in an atomic notifier.");
}
}
@ -664,7 +708,7 @@ namespace atomic_wait
root_info* slot_free(atomic_t<u16>* slot) noexcept;
template <typename F>
auto slot_search(const void* data, u64 thread_id, F func) noexcept;
auto slot_search(const void* data, u64 thread_id, __m128i mask, F func) noexcept;
};
static_assert(sizeof(root_info) == 128);
@ -861,7 +905,7 @@ atomic_wait::root_info* atomic_wait::root_info::slot_free(atomic_t<u16>* slot) n
}
template <typename F>
auto atomic_wait::root_info::slot_search(const void* data, u64 thread_id, F func) noexcept
FORCE_INLINE auto atomic_wait::root_info::slot_search(const void* data, u64 thread_id, __m128i mask, F func) noexcept
{
const u64 bits_val = this->bits.load();
const u64 max_order = bits_val >> max_threads;
@ -869,7 +913,6 @@ auto atomic_wait::root_info::slot_search(const void* data, u64 thread_id, F func
auto* _this = this;
u32 order = 0;
u32 count = 0;
u64 new_val = bits_val & thread_mask;
@ -889,7 +932,7 @@ auto atomic_wait::root_info::slot_search(const void* data, u64 thread_id, F func
for (u32 i = 0; i < cond_max; i++)
{
if (cond_id_lock(cond_ids[i], thread_id, data))
if (cond_id_lock(cond_ids[i], mask, thread_id, data))
{
if (func(cond_ids[i]))
{
@ -956,13 +999,13 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time
}
}
const u32 cond_id = cond_alloc(data);
const u32 cond_id = cond_alloc(data, mask);
u32 cond_id_ext[atomic_wait::max_list - 1]{};
for (u32 i = 0; i < ext_size; i++)
{
cond_id_ext[i] = cond_alloc(ext[i].data);
cond_id_ext[i] = cond_alloc(ext[i].data, ext[i].mask);
}
const auto slot = root->slot_alloc(iptr);
@ -987,17 +1030,17 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time
cond->link = 0;
cond->size = static_cast<u8>(size);
cond->flag = static_cast<u8>(size >> 8);
cond->mask = mask;
cond->oldv = old_value;
cond->tsc0 = stamp0;
cond->sync.release(1);
for (u32 i = 0; i < ext_size; i++)
{
// Extensions point to original cond_id, copy remaining info
cond_ext[i]->link = cond_id;
cond_ext[i]->size = static_cast<u8>(ext[i].size);
cond_ext[i]->flag = static_cast<u8>(ext[i].size >> 8);
cond_ext[i]->mask = ext[i].mask;
cond_ext[i]->oldv = ext[i].old;
cond_ext[i]->tsc0 = stamp0;
@ -1005,19 +1048,20 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time
cond_ext[i]->sync.release(4);
}
cond->sync.release(1);
// Final deployment
slot->store(static_cast<u16>(cond_id));
for (u32 i = 0; i < ext_size; i++)
{
slot_ext[i]->release(static_cast<u16>(cond_id_ext[i]));
}
// Final deployment
slot->store(static_cast<u16>(cond_id));
#ifdef USE_STD
// Lock mutex
std::unique_lock lock(*cond->mtx.get());
#else
if (ext_size)
_mm_mfence();
#endif
// Can skip unqueue process if true
@ -1034,22 +1078,24 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time
ts.tv_sec = timeout / 1'000'000'000;
ts.tv_nsec = timeout % 1'000'000'000;
if (cond->sync.load() > 1) [[unlikely]]
const u32 val = cond->sync;
if (val > 1) [[unlikely]]
{
// Signaled prematurely
if (cond->sync.load() == 3 || !cond->sync.compare_and_swap_test(2, 1))
if (!cond->set_sleep())
{
break;
}
}
else
{
futex(&cond->sync, FUTEX_WAIT_PRIVATE, 1, timeout + 1 ? &ts : nullptr);
futex(&cond->sync, FUTEX_WAIT_PRIVATE, val, timeout + 1 ? &ts : nullptr);
}
#elif defined(USE_STD)
if (cond->sync.load() > 1) [[unlikely]]
if (cond->sync > 1) [[unlikely]]
{
if (cond->sync.load() == 3 || !cond->sync.compare_and_swap_test(2, 1))
if (!cond->set_sleep())
{
break;
}
@ -1075,7 +1121,7 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time
if (fallback) [[unlikely]]
{
if (cond->sync.load() == 3 || !cond->sync.compare_and_swap_test(2, 1))
if (!cond->set_sleep())
{
fallback = false;
break;
@ -1119,7 +1165,7 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time
#if defined(_WIN32)
static LARGE_INTEGER instant{};
if (cond->sync.compare_and_swap_test(1, 2))
if (cond->wakeup(1))
{
// Succeeded in self-notifying
break;
@ -1176,11 +1222,11 @@ alert_sema(u32 cond_id, const void* data, u64 tid, u32 size, __m128i mask, __m12
u32 ok = 0;
if (cond->sync && (!size ? (!tid || cond->tid == tid) : (cond->ptr == data && cmp_mask(size, mask, new_value, cond->size | (cond->flag << 8), cond->mask, cond->oldv))))
if (!size ? (!tid || cond->tid == tid) : (cond->ptr == data && cmp_mask(size, mask, new_value, cond->size | (cond->flag << 8), cond->mask, cond->oldv)))
{
// Redirect if necessary
const auto _old = cond;
const auto _new = _old->link ? cond_id_lock(_old->link) : _old;
const auto _new = _old->link ? cond_id_lock(_old->link, _mm_set1_epi64x(-1)) : _old;
if (_new && _new->tsc0 == _old->tsc0)
{
@ -1278,7 +1324,7 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id)
if (thread_id)
{
// Check thread if provided
if (s_cond_list[i].tid != thread_id)
if (atomic_storage<u64>::load(cond->tid) != thread_id)
{
return false;
}
@ -1329,7 +1375,7 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id)
u64 progress = 0;
root->slot_search(data, thread_id, [&](u32 cond_id)
root->slot_search(data, thread_id, _mm_set1_epi64x(-1), [&](u32 cond_id)
{
// Forced notification
if (alert_sema(cond_id, data, thread_id, 0, _mm_setzero_si128(), _mm_setzero_si128()))
@ -1367,7 +1413,7 @@ atomic_wait_engine::notify_one(const void* data, u32 size, __m128i mask, __m128i
u64 progress = 0;
root->slot_search(data, 0, [&](u32 cond_id)
root->slot_search(data, 0, mask, [&](u32 cond_id)
{
if (alert_sema(cond_id, data, -1, size, mask, new_value))
{
@ -1401,7 +1447,7 @@ atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i
// Array itself.
u16 cond_ids[UINT16_MAX + 1];
root->slot_search(data, 0, [&](u32 cond_id)
root->slot_search(data, 0, mask, [&](u32 cond_id)
{
u32 res = alert_sema<true>(cond_id, data, -1, size, mask, new_value);