1351 lines
91 KiB
C
1351 lines
91 KiB
C
/*
|
|
* M*LIB - Fixed size (Bounded) QUEUE & STACK interface
|
|
*
|
|
* Copyright (c) 2017-2023, Patrick Pelissier
|
|
* All rights reserved.
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
* + Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* + Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
|
|
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
|
|
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
|
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
|
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
#ifndef MSTARLIB_BUFFER_H
|
|
#define MSTARLIB_BUFFER_H
|
|
|
|
#include "m-core.h"
|
|
#include "m-thread.h"
|
|
#include "m-atomic.h"
|
|
|
|
/* Define the different kind of policy a lock-based buffer can have:
|
|
* - the buffer can be either a queue (policy is FIFO) or a stack (policy is FILO),
|
|
* - if the push method is by default blocking (waiting for the buffer to has some space) or not, *** deprecated ***
|
|
* - if the pop method is by default blocking (waiting for the buffer to has some data) or not, *** deprecated ***
|
|
* - if both methods are blocking, *** deprecated ***
|
|
* - if it shall be thread safe or not (i.e. remove the mutex lock and atomic costs),
|
|
* - if the buffer has to be init with empty elements, or if it shall init an element when it is pushed (and moved when popped),
|
|
* - if the buffer has to overwrite the last element if the buffer is full,
|
|
* - if the pop of an element is not complete until the call to pop_release (preventing push until this call).
|
|
*/
|
|
typedef enum {
|
|
M_BUFFER_QUEUE = 0, M_BUFFER_STACK = 1,
|
|
M_BUFFER_BLOCKING_PUSH = 0, M_BUFFER_UNBLOCKING_PUSH = 2,
|
|
M_BUFFER_BLOCKING_POP = 0, M_BUFFER_UNBLOCKING_POP = 4,
|
|
M_BUFFER_BLOCKING = 0, M_BUFFER_UNBLOCKING = 6,
|
|
M_BUFFER_THREAD_SAFE = 0, M_BUFFER_THREAD_UNSAFE = 8,
|
|
M_BUFFER_PUSH_INIT_POP_MOVE = 16,
|
|
M_BUFFER_PUSH_OVERWRITE = 32,
|
|
M_BUFFER_DEFERRED_POP = 64
|
|
} m_buffer_policy_e;
|
|
|
|
|
|
/* Define a lock based buffer.
|
|
If size is 0, then the size will only be defined at run-time when initializing the buffer,
|
|
otherwise the size will be a compile time constant.
|
|
USAGE: BUFFER_DEF(name, type, size_of_buffer_or_0, policy[, oplist]) */
|
|
#define M_BUFFER_DEF(name, type, m_size, ... ) \
|
|
M_BUFFER_DEF_AS(name, M_F(name, _t), type, m_size, __VA_ARGS__)
|
|
|
|
|
|
/* Define a lock based buffer
|
|
as the provided type name_t.
|
|
USAGE: BUFFER_DEF_AS(name, name_t, type, size_of_buffer_or_0, policy[, oplist of type]) */
|
|
#define M_BUFFER_DEF_AS(name, name_t, type, m_size, ... ) \
|
|
M_BEGIN_PROTECTED_CODE \
|
|
M_BUFF3R_DEF_P1(M_IF_NARGS_EQ1(__VA_ARGS__) \
|
|
((name, type, m_size,__VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(type)(), name_t ), \
|
|
(name, type, m_size,__VA_ARGS__, name_t ))) \
|
|
M_END_PROTECTED_CODE
|
|
|
|
|
|
/* Define the oplist of a lock based buffer given its name and its oplist.
|
|
USAGE: BUFFER_OPLIST(name[, oplist of the type]) */
|
|
#define M_BUFFER_OPLIST(...) \
|
|
M_BUFF3R_OPLIST_P1(M_IF_NARGS_EQ1(__VA_ARGS__) \
|
|
((__VA_ARGS__, M_BASIC_OPLIST), \
|
|
(__VA_ARGS__ )))
|
|
|
|
|
|
/* Define a nearly lock-free queue for Many Producer Many Consummer.
|
|
Much faster than queue of BUFFER_DEF in heavy communication scenario
|
|
but without any blocking features (this is let to the user).
|
|
Size of created queue shall be a power of 2 and is defined at run-time.
|
|
USAGE: QUEUE_MPMC_DEF(name, type, policy, [oplist of type])
|
|
*/
|
|
#define M_QUEUE_MPMC_DEF(name, type, ...) \
|
|
M_QUEUE_MPMC_DEF_AS(name, M_F(name,_t), type, __VA_ARGS__)
|
|
|
|
|
|
/* Define a nearly lock-free queue for Many Producer Many Consummer
|
|
as the provided type name_t.
|
|
Much faster than queue of BUFFER_DEF in heavy communication scenario
|
|
but without any blocking features (this is let to the user).
|
|
Size of created queue shall be a power of 2 and is defined at run-time.
|
|
USAGE: QUEUE_MPMC_DEF_AS(name, name_t, type, policy, [oplist of type])
|
|
*/
|
|
#define M_QUEUE_MPMC_DEF_AS(name, name_t, type, ...) \
|
|
M_BEGIN_PROTECTED_CODE \
|
|
M_QU3UE_MPMC_DEF_P1(M_IF_NARGS_EQ1(__VA_ARGS__) \
|
|
((name, type, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(type)(), name_t ), \
|
|
(name, type, __VA_ARGS__, name_t ))) \
|
|
M_END_PROTECTED_CODE
|
|
|
|
|
|
/* Define a wait-free queue for Single Producer Single Consummer
|
|
Much faster than queue of BUFFER_DEF or QUEUE_MPMC in heavy communication scenario
|
|
but without any blocking features (this is let to the user).
|
|
Size of created queue shall be a power of 2 and is defined at run-time.
|
|
USAGE: QUEUE_SPSC_DEF(name, type, policy, [oplist of type])
|
|
*/
|
|
#define M_QUEUE_SPSC_DEF(name, type, ...) \
|
|
M_QUEUE_SPSC_DEF_AS(name, M_F(name, _t), type, __VA_ARGS__)
|
|
|
|
|
|
/* Define a wait-free queue for Single Producer Single Consummer
|
|
as the provided type name_t.
|
|
Much faster than queue of BUFFER_DEF in heavy communication scenario
|
|
but without any blocking features (this is let to the user).
|
|
Size of created queue shall be a power of 2 and is defined at run-time.
|
|
USAGE: QUEUE_SPSC_DEF_AS(name, name_t, type, policy, [oplist of type])
|
|
*/
|
|
#define M_QUEUE_SPSC_DEF_AS(name, name_t, type, ...) \
|
|
M_BEGIN_PROTECTED_CODE \
|
|
M_QU3UE_SPSC_DEF_P1(M_IF_NARGS_EQ1(__VA_ARGS__) \
|
|
((name, type, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(type)(), name_t ), \
|
|
(name, type, __VA_ARGS__, name_t ))) \
|
|
M_END_PROTECTED_CODE
|
|
|
|
|
|
|
|
/*****************************************************************************/
|
|
/********************************** INTERNAL *********************************/
|
|
/*****************************************************************************/
|
|
|
|
/* Test if the given policy is true or not.
|
|
WARNING: The policy shall be a non zero value (i.e. not a default). */
|
|
#define M_BUFF3R_POLICY_P(policy, val) \
|
|
(((policy) & (val)) != 0)
|
|
|
|
/* Handle either atomic integer or normal integer in function of the policy
|
|
parameter of the buffer BUFFER_THREAD_UNSAFE (BUFFER_THREAD_SAFE is the
|
|
default). This enables avoiding to pay the cost of atomic operations if not
|
|
applicable.
|
|
*/
|
|
typedef union m_buff3r_number_s {
|
|
unsigned int u;
|
|
atomic_uint a;
|
|
#ifdef __cplusplus
|
|
// Not sure why, but C++ needs an explicit default constructor for this union.
|
|
m_buff3r_number_s() : u(0) {};
|
|
#endif
|
|
} m_buff3r_number_ct[1];
|
|
|
|
M_INLINE void
|
|
m_buff3r_number_init(m_buff3r_number_ct n, unsigned int policy)
|
|
{
|
|
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
|
|
atomic_init(&n->a, 0U);
|
|
else
|
|
n->u = 0UL;
|
|
}
|
|
|
|
M_INLINE unsigned int
|
|
m_buff3r_number_load(m_buff3r_number_ct n, unsigned int policy)
|
|
{
|
|
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
|
|
// Perform a memory acquire so that further usage of the buffer
|
|
// is synchronized.
|
|
return atomic_load_explicit(&n->a, memory_order_acquire);
|
|
else
|
|
return n->u;
|
|
}
|
|
|
|
M_INLINE void
|
|
m_buff3r_number_store(m_buff3r_number_ct n, unsigned int v, unsigned int policy)
|
|
{
|
|
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
|
|
// This function is used in context where a relaxed access is sufficient.
|
|
atomic_store_explicit(&n->a, v, memory_order_relaxed);
|
|
else
|
|
n->u = v;
|
|
}
|
|
|
|
M_INLINE void
|
|
m_buff3r_number_set(m_buff3r_number_ct n, m_buff3r_number_ct v, unsigned int policy)
|
|
{
|
|
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
|
|
// This function is used in context where a relaxed access is sufficient.
|
|
atomic_store_explicit(&n->a, atomic_load_explicit(&v->a, memory_order_relaxed), memory_order_relaxed);
|
|
else
|
|
n->u = v->u;
|
|
}
|
|
|
|
M_INLINE unsigned int
|
|
m_buff3r_number_inc(m_buff3r_number_ct n, unsigned int policy)
|
|
{
|
|
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
|
|
return atomic_fetch_add(&n->a, 1U);
|
|
else
|
|
return n->u ++;
|
|
}
|
|
|
|
M_INLINE unsigned int
|
|
m_buff3r_number_dec(m_buff3r_number_ct n, unsigned int policy)
|
|
{
|
|
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
|
|
return atomic_fetch_sub(&n->a, 1U);
|
|
else
|
|
return n->u --;
|
|
}
|
|
|
|
/********************************** INTERNAL *********************************/
|
|
|
|
/* Test if the size is only run-time or build time */
|
|
#define M_BUFF3R_IF_CTE_SIZE(m_size) M_IF(M_BOOL(m_size))
|
|
|
|
/* Return the size (run time or build time).
|
|
NOTE: It assumed that the buffer variable name is 'v' */
|
|
#define M_BUFF3R_SIZE(m_size) \
|
|
M_BUFF3R_IF_CTE_SIZE(m_size) (m_size, v->capacity)
|
|
|
|
/* Contract of a buffer.
|
|
Nothing particular since we cannot test much without locking it.
|
|
*/
|
|
#define M_BUFF3R_CONTRACT(buffer, size) do { \
|
|
M_ASSERT (buffer != NULL); \
|
|
M_ASSERT (buffer->data != NULL); \
|
|
}while (0)
|
|
|
|
/* Contract of a buffer within a protected section */
|
|
#define M_BUFF3R_PROTECTED_CONTRACT(policy, buffer, size) do { \
|
|
M_ASSERT (m_buff3r_number_load(buffer->number[0], policy) <= M_BUFF3R_SIZE(size)); \
|
|
} while (0)
|
|
|
|
|
|
/* Deferred evaluation for the definition,
|
|
so that all arguments are evaluated before further expansion */
|
|
#define M_BUFF3R_DEF_P1(arg) M_ID( M_BUFF3R_DEF_P2 arg )
|
|
|
|
/* Validate the value oplist before going further */
|
|
#define M_BUFF3R_DEF_P2(name, type, m_size, policy, oplist, buffer_t) \
|
|
M_IF_OPLIST(oplist)(M_BUFF3R_DEF_P3, M_BUFF3R_DEF_FAILURE)(name, type, m_size, policy, oplist, buffer_t)
|
|
|
|
/* Stop processing with a compilation failure */
|
|
#define M_BUFF3R_DEF_FAILURE(name, type, m_size, policy, oplist, buffer_t) \
|
|
M_STATIC_FAILURE(M_LIB_NOT_AN_OPLIST, "(M_BUFFER_DEF): the given argument is not a valid oplist: " M_AS_STR(oplist))
|
|
|
|
/* Define the buffer type using mutex lock and its functions.
|
|
- name: main prefix of the container
|
|
- type: type of an element of the buffer
|
|
- m_size: constant to 0 if variable runtime size, or else the fixed size of the buffer
|
|
- policy: the policy of the buffer
|
|
- oplist: the oplist of the type of an element of the buffer
|
|
- buffer_t: name of the buffer
|
|
*/
|
|
#define M_BUFF3R_DEF_P3(name, type, m_size, policy, oplist, buffer_t) \
|
|
M_BUFF3R_DEF_TYPE(name, type, m_size, policy, oplist, buffer_t) \
|
|
M_CHECK_COMPATIBLE_OPLIST(name, 1, type, oplist) \
|
|
M_BUFF3R_DEF_CORE(name, type, m_size, policy, oplist, buffer_t) \
|
|
M_EMPLACE_QUEUE_DEF(name, buffer_t, M_F(name, _emplace), oplist, M_EMPLACE_QUEUE_GENE)
|
|
|
|
/* Define the type of a buffer */
|
|
#define M_BUFF3R_DEF_TYPE(name, type, m_size, policy, oplist, buffer_t) \
|
|
\
|
|
/* Put each data in a separate cache line to avoid false sharing \
|
|
by multiple writing threads. No need to align if there is no thread */ \
|
|
typedef union M_F(name, _el_s) { \
|
|
type x; \
|
|
char align[M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE) ? 1 : M_ALIGN_FOR_CACHELINE_EXCLUSION]; \
|
|
} M_F(name, _el_ct); \
|
|
\
|
|
typedef struct M_F(name, _s) { \
|
|
/* Data for a producer */ \
|
|
m_mutex_t mutexPush; /* MUTEX used for pushing elements */ \
|
|
size_t idx_prod; /* Index of the production threads */ \
|
|
size_t overwrite; /* Number of overwritten values */ \
|
|
m_cond_t there_is_data; /* condition raised when there is data */ \
|
|
/* Read only Data */ \
|
|
M_BUFF3R_IF_CTE_SIZE(m_size)( ,size_t capacity;) /* Capacity of the buffer */ \
|
|
/* Data for a consummer */ \
|
|
m_cond_t there_is_room_for_data; /* Cond. raised when there is room */ \
|
|
m_mutex_t mutexPop; /* MUTEX used for popping elements */ \
|
|
size_t idx_cons; /* Index of the consumption threads */ \
|
|
/* number[0] := Number of elements in the buffer */ \
|
|
/* number[1] := [OPTION] Number of elements being deferred in the buffer */ \
|
|
m_buff3r_number_ct number[1 + M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)]; \
|
|
/* If fixed size, array of elements, otherwise pointer to element */ \
|
|
M_F(name, _el_ct) M_BUFF3R_IF_CTE_SIZE(m_size)(data[m_size], *data); \
|
|
} buffer_t[1]; \
|
|
\
|
|
typedef struct M_F(name, _s) *M_F(name, _ptr); \
|
|
typedef const struct M_F(name, _s) *M_F(name, _srcptr); \
|
|
/* Internal type used to unconst the buffer */ \
|
|
typedef union { M_F(name, _srcptr) cptr; M_F(name, _ptr) ptr; } M_F(name, _uptr_ct); \
|
|
/* Internal types used by the oplist */ \
|
|
typedef type M_F(name, _subtype_ct); \
|
|
typedef buffer_t M_F(name, _ct); \
|
|
|
|
/* Define the core functionnalities of a buffer */
|
|
#define M_BUFF3R_DEF_CORE(name, type, m_size, policy, oplist, buffer_t) \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _init)(buffer_t v, size_t size) \
|
|
{ \
|
|
M_ASSERT(size <= UINT_MAX); \
|
|
M_BUFF3R_IF_CTE_SIZE(m_size)(M_ASSERT(size == m_size), v->capacity = size); \
|
|
v->idx_prod = v->idx_cons = v->overwrite = 0; \
|
|
m_buff3r_number_init (v->number[0], policy); \
|
|
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
|
|
m_buff3r_number_init (v->number[1], policy); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_mutex_init(v->mutexPush); \
|
|
m_mutex_init(v->mutexPop); \
|
|
m_cond_init(v->there_is_data); \
|
|
m_cond_init(v->there_is_room_for_data); \
|
|
} else { \
|
|
M_ASSERT(M_BUFF3R_POLICY_P((policy), M_BUFFER_UNBLOCKING)); \
|
|
} \
|
|
\
|
|
M_BUFF3R_IF_CTE_SIZE(m_size)( /* Statically allocated */ , \
|
|
v->data = M_CALL_REALLOC(oplist, M_F(name, _el_ct), NULL, M_BUFF3R_SIZE(m_size)); \
|
|
if (M_UNLIKELY_NOMEM (v->data == NULL)) { \
|
|
M_MEMORY_FULL (M_BUFF3R_SIZE(m_size)*sizeof(M_F(name, _el_ct))); \
|
|
return; \
|
|
} \
|
|
) \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
for(size_t i = 0; i < size; i++) { \
|
|
M_CALL_INIT(oplist, v->data[i].x); \
|
|
} \
|
|
} \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
} \
|
|
\
|
|
M_BUFF3R_IF_CTE_SIZE(m_size)( \
|
|
M_INLINE void \
|
|
M_C3(m_buff3r_,name,_init)(buffer_t v) \
|
|
{ \
|
|
M_F(name, _init)(v, m_size); \
|
|
} \
|
|
, ) \
|
|
\
|
|
M_INLINE void \
|
|
M_C3(m_buff3r_,name,_clear_obj)(buffer_t v) \
|
|
{ \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
for(size_t i = 0; i < M_BUFF3R_SIZE(m_size); i++) { \
|
|
M_CALL_CLEAR(oplist, v->data[i].x); \
|
|
} \
|
|
} else { \
|
|
size_t i = M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) ? 0 : v->idx_cons; \
|
|
while (i != v->idx_prod) { \
|
|
M_CALL_CLEAR(oplist, v->data[i].x); \
|
|
i++; \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) && i >= M_BUFF3R_SIZE(m_size)) \
|
|
i = 0; \
|
|
} \
|
|
} \
|
|
v->idx_prod = v->idx_cons = 0; \
|
|
m_buff3r_number_store (v->number[0], 0U, policy); \
|
|
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
|
|
m_buff3r_number_store(v->number[1], 0U, policy); \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _clear)(buffer_t v) \
|
|
{ \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
M_C3(m_buff3r_,name,_clear_obj)(v); \
|
|
M_BUFF3R_IF_CTE_SIZE(m_size)( , \
|
|
M_CALL_FREE(oplist, v->data); \
|
|
v->data = NULL; \
|
|
) \
|
|
v->overwrite = 0; \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_mutex_clear(v->mutexPush); \
|
|
m_mutex_clear(v->mutexPop); \
|
|
m_cond_clear(v->there_is_data); \
|
|
m_cond_clear(v->there_is_room_for_data); \
|
|
} \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _reset)(buffer_t v) \
|
|
{ \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_mutex_lock(v->mutexPush); \
|
|
m_mutex_lock(v->mutexPop); \
|
|
} \
|
|
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
|
|
if (M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) \
|
|
M_C3(m_buff3r_,name,_clear_obj)(v); \
|
|
v->idx_prod = v->idx_cons = 0; \
|
|
m_buff3r_number_store (v->number[0], 0U, policy); \
|
|
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
|
|
m_buff3r_number_store(v->number[1], 0U, policy); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_cond_broadcast(v->there_is_room_for_data); \
|
|
m_mutex_unlock(v->mutexPop); \
|
|
m_mutex_unlock(v->mutexPush); \
|
|
} \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
} \
|
|
\
|
|
M_INLINE void M_ATTR_DEPRECATED \
|
|
M_F(name, _clean)(buffer_t v) \
|
|
{ \
|
|
M_F(name,_reset)(v); \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _init_set)(buffer_t dest, const buffer_t src) \
|
|
{ \
|
|
/* unconst 'src', so that we can lock it (semantically it is const) */ \
|
|
M_F(name, _uptr_ct) vu; \
|
|
vu.cptr = src; \
|
|
M_F(name, _ptr) v = vu.ptr; \
|
|
M_ASSERT (dest != v); \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
M_F(name, _init)(dest, M_BUFF3R_SIZE(m_size)); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_mutex_lock(v->mutexPush); \
|
|
m_mutex_lock(v->mutexPop); \
|
|
} \
|
|
\
|
|
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
for(size_t i = 0; i < M_BUFF3R_SIZE(m_size); i++) { \
|
|
M_CALL_INIT_SET(oplist, dest->data[i].x, v->data[i].x); \
|
|
} \
|
|
} else { \
|
|
size_t i = M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) ? 0 : v->idx_cons; \
|
|
while (i != v->idx_prod) { \
|
|
M_CALL_INIT_SET(oplist, dest->data[i].x, v->data[i].x); \
|
|
i++; \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) && i >= M_BUFF3R_SIZE(m_size)) \
|
|
i = 0; \
|
|
} \
|
|
} \
|
|
\
|
|
dest->idx_prod = v->idx_prod; \
|
|
dest->idx_cons = v->idx_cons; \
|
|
m_buff3r_number_set (dest->number[0], v->number[0], policy); \
|
|
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
|
|
m_buff3r_number_set(dest->number[1], v->number[1], policy); \
|
|
\
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_mutex_unlock(v->mutexPop); \
|
|
m_mutex_unlock(v->mutexPush); \
|
|
} \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
M_BUFF3R_CONTRACT(dest, m_size); \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _set)(buffer_t dest, const buffer_t src) \
|
|
{ \
|
|
/* unconst 'src', so that we can lock it (semantically it is const) */ \
|
|
M_F(name, _uptr_ct) vu; \
|
|
vu.cptr = src; \
|
|
M_F(name, _ptr) v = vu.ptr; \
|
|
M_BUFF3R_CONTRACT(dest,m_size); \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
\
|
|
if (dest == v) return; \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
/* Case of deadlock: A := B, B:=C, C:=A (all in //) \
|
|
Solution: order the lock by increasing memory */ \
|
|
if (dest < v) { \
|
|
m_mutex_lock(dest->mutexPush); \
|
|
m_mutex_lock(dest->mutexPop); \
|
|
m_mutex_lock(v->mutexPush); \
|
|
m_mutex_lock(v->mutexPop); \
|
|
} else { \
|
|
m_mutex_lock(v->mutexPush); \
|
|
m_mutex_lock(v->mutexPop); \
|
|
m_mutex_lock(dest->mutexPush); \
|
|
m_mutex_lock(dest->mutexPop); \
|
|
} \
|
|
} \
|
|
\
|
|
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
|
|
M_C3(m_buff3r_,name,_clear_obj)(dest); \
|
|
\
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
for(size_t i = 0; i < M_BUFF3R_SIZE(m_size); i++) { \
|
|
M_CALL_INIT_SET(oplist, dest->data[i].x, v->data[i].x); \
|
|
} \
|
|
} else { \
|
|
size_t i = M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) ? 0 : v->idx_cons; \
|
|
while (i != v->idx_prod) { \
|
|
M_CALL_INIT_SET(oplist, dest->data[i].x, v->data[i].x); \
|
|
i++; \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) && i >= M_BUFF3R_SIZE(m_size)) \
|
|
i = 0; \
|
|
} \
|
|
} \
|
|
\
|
|
dest->idx_prod = v->idx_prod; \
|
|
dest->idx_cons = v->idx_cons; \
|
|
m_buff3r_number_set (dest->number[0], v->number[0], policy); \
|
|
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
|
|
m_buff3r_number_set(dest->number[1], v->number[1], policy); \
|
|
\
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
/* It may be false, but it is not wrong! */ \
|
|
m_cond_broadcast(v->there_is_room_for_data); \
|
|
m_cond_broadcast(v->there_is_data); \
|
|
if (dest < v) { \
|
|
m_mutex_unlock(v->mutexPop); \
|
|
m_mutex_unlock(v->mutexPush); \
|
|
m_mutex_unlock(dest->mutexPop); \
|
|
m_mutex_unlock(dest->mutexPush); \
|
|
} else { \
|
|
m_mutex_unlock(dest->mutexPop); \
|
|
m_mutex_unlock(dest->mutexPush); \
|
|
m_mutex_unlock(v->mutexPop); \
|
|
m_mutex_unlock(v->mutexPush); \
|
|
} \
|
|
} \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
M_BUFF3R_CONTRACT(dest, m_size); \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _empty_p)(buffer_t v) \
|
|
{ \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
/* If the buffer has been configured with deferred pop \
|
|
we considered the queue as empty when the number of \
|
|
deferred pop has reached 0, not the number of items in the \
|
|
buffer is 0. */ \
|
|
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
|
|
return m_buff3r_number_load (v->number[1], policy) == 0; \
|
|
else \
|
|
return m_buff3r_number_load (v->number[0], policy) == 0; \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _full_p)(buffer_t v) \
|
|
{ \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
return m_buff3r_number_load (v->number[0], policy) \
|
|
== M_BUFF3R_SIZE(m_size); \
|
|
} \
|
|
\
|
|
M_INLINE size_t \
|
|
M_F(name, _size)(buffer_t v) \
|
|
{ \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
return m_buff3r_number_load (v->number[0], policy); \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _push_blocking)(buffer_t v, type const data, bool blocking) \
|
|
{ \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
\
|
|
/* Producer Mutex lock (mutex lock performs an acquire memory barrier) */ \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_mutex_lock(v->mutexPush); \
|
|
while (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_OVERWRITE) \
|
|
&& M_F(name, _full_p)(v)) { \
|
|
if (!blocking) { \
|
|
m_mutex_unlock(v->mutexPush); \
|
|
return false; \
|
|
} \
|
|
m_cond_wait(v->there_is_room_for_data, v->mutexPush); \
|
|
} \
|
|
} else if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_OVERWRITE) \
|
|
&& M_F(name, _full_p)(v)) \
|
|
return false; \
|
|
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
|
|
\
|
|
size_t previousSize, idx = v->idx_prod; \
|
|
/* INDEX computation if we have to overwrite the last element */ \
|
|
if (M_UNLIKELY (M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_OVERWRITE) \
|
|
&& M_F(name, _full_p)(v))) { \
|
|
v->overwrite++; \
|
|
/* Let's overwrite the last element */ \
|
|
/* Compute the index of the last push element */ \
|
|
idx--; \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK)) { \
|
|
idx = idx >= M_BUFF3R_SIZE(m_size) ? M_BUFF3R_SIZE(m_size)-1 : idx; \
|
|
} \
|
|
/* Update data in the buffer */ \
|
|
M_CALL_SET(oplist, v->data[idx].x, data); \
|
|
previousSize = M_BUFF3R_SIZE(m_size); \
|
|
} else { \
|
|
/* Add a new item in the buffer */ \
|
|
/* PUSH data in the buffer */ \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, v->data[idx].x, data); \
|
|
} else { \
|
|
M_CALL_INIT_SET(oplist, v->data[idx].x, data); \
|
|
} \
|
|
\
|
|
/* Increment production INDEX of the buffer */ \
|
|
idx++; \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK)) { \
|
|
idx = (idx == M_BUFF3R_SIZE(m_size)) ? 0 : idx; \
|
|
} \
|
|
v->idx_prod = idx; \
|
|
\
|
|
/* number[] is the only variable which can be modified by both \
|
|
the consummer thread which has the pop lock and the producer \
|
|
thread which has the push lock. As such, it is an atomic variable \
|
|
that performs a release memory barrier. */ \
|
|
/* Increment number of elements of the buffer */ \
|
|
previousSize = m_buff3r_number_inc (v->number[0], policy); \
|
|
if (M_BUFF3R_POLICY_P((policy), M_BUFFER_DEFERRED_POP)) { \
|
|
previousSize = m_buff3r_number_inc (v->number[1], policy); \
|
|
} \
|
|
/* From this point, consummer may read the data in the table */ \
|
|
} \
|
|
\
|
|
/* Producer unlock (mutex unlock performs a release memory barrier) */ \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_mutex_unlock(v->mutexPush); \
|
|
/* If the number of items in the buffer was 0, some consummer \
|
|
may be waiting. Signal to them the availibility of the data \
|
|
We cannot only signal one thread. */ \
|
|
if (previousSize == 0) { \
|
|
m_mutex_lock(v->mutexPop); \
|
|
m_cond_broadcast(v->there_is_data); \
|
|
m_mutex_unlock(v->mutexPop); \
|
|
} \
|
|
} \
|
|
\
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
return true; \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _pop_blocking)(type *data, buffer_t v, bool blocking) \
|
|
{ \
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
M_ASSERT (data != NULL); \
|
|
\
|
|
/* Consummer lock (mutex lock performs an acquire memory barrier) */ \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_mutex_lock(v->mutexPop); \
|
|
while (M_F(name, _empty_p)(v)) { \
|
|
if (!blocking) { \
|
|
m_mutex_unlock(v->mutexPop); \
|
|
return false; \
|
|
} \
|
|
m_cond_wait(v->there_is_data, v->mutexPop); \
|
|
} \
|
|
} else if (M_F(name, _empty_p)(v)) \
|
|
return false; \
|
|
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
|
|
\
|
|
/* POP data from the buffer and update INDEX */ \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK)) { \
|
|
/* FIFO queue */ \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, *data, v->data[v->idx_cons].x); \
|
|
} else { \
|
|
M_DO_INIT_MOVE (oplist, *data, v->data[v->idx_cons].x); \
|
|
} \
|
|
v->idx_cons = (v->idx_cons == M_BUFF3R_SIZE(m_size)-1) ? 0 : (v->idx_cons + 1); \
|
|
} else { \
|
|
/* STACK queue */ \
|
|
v->idx_prod --; \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, *data, v->data[v->idx_prod].x); \
|
|
} else { \
|
|
M_DO_INIT_MOVE (oplist, *data, v->data[v->idx_prod].x); \
|
|
} \
|
|
} \
|
|
\
|
|
/* number[] is the only variable which can be modified by both \
|
|
the consummer thread which has the pop lock and the producer \
|
|
thread which has the push lock. As such, it is an atomic variable \
|
|
that performs a release memory barrier. */ \
|
|
/* Decrement number of elements in the buffer */ \
|
|
size_t previousSize; \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_DEFERRED_POP)) { \
|
|
previousSize = m_buff3r_number_dec (v->number[0], policy); \
|
|
} else { \
|
|
m_buff3r_number_dec (v->number[1], policy); \
|
|
} \
|
|
/* Space may be reused by a producer thread from this point */ \
|
|
\
|
|
/* Consummer unlock (mutex unlock perfoms a release memory barrier) */ \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
|
|
m_mutex_unlock(v->mutexPop); \
|
|
/* If the number of items in the buffer was the max, some producer \
|
|
may be waiting. Signal to them the availibility of the free room \
|
|
We cannot only signal one thread. */ \
|
|
if ((!M_BUFF3R_POLICY_P((policy), M_BUFFER_DEFERRED_POP)) \
|
|
&& previousSize == M_BUFF3R_SIZE(m_size)) { \
|
|
m_mutex_lock(v->mutexPush); \
|
|
m_cond_broadcast(v->there_is_room_for_data); \
|
|
m_mutex_unlock(v->mutexPush); \
|
|
} \
|
|
} \
|
|
\
|
|
M_BUFF3R_CONTRACT(v,m_size); \
|
|
return true; \
|
|
} \
|
|
\
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _push)(buffer_t v, type const data) \
|
|
{ \
|
|
return M_F(name, _push_blocking)(v, data, \
|
|
!M_BUFF3R_POLICY_P((policy), M_BUFFER_UNBLOCKING_PUSH)); \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _pop)(type *data, buffer_t v) \
|
|
{ \
|
|
return M_F(name, _pop_blocking)(data, v, \
|
|
!M_BUFF3R_POLICY_P((policy), M_BUFFER_UNBLOCKING_POP)); \
|
|
} \
|
|
\
|
|
M_INLINE size_t \
|
|
M_F(name, _overwrite)(const buffer_t v) \
|
|
{ \
|
|
return v->overwrite; \
|
|
} \
|
|
\
|
|
M_INLINE size_t \
|
|
M_F(name, _capacity)(const buffer_t v) \
|
|
{ \
|
|
(void) v; /* may be unused */ \
|
|
return M_BUFF3R_SIZE(m_size); \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _pop_release)(buffer_t v) \
|
|
{ \
|
|
/* Decrement the effective number of elements in the buffer */ \
|
|
if (M_BUFF3R_POLICY_P((policy), M_BUFFER_DEFERRED_POP)) { \
|
|
size_t previousSize = m_buff3r_number_dec (v->number[0], policy); \
|
|
if (previousSize == M_BUFF3R_SIZE(m_size)) { \
|
|
m_mutex_lock(v->mutexPush); \
|
|
m_cond_broadcast(v->there_is_room_for_data); \
|
|
m_mutex_unlock(v->mutexPush); \
|
|
} \
|
|
} \
|
|
} \
|
|
|
|
|
|
/********************************** INTERNAL *********************************/
|
|
|
|
/* Definition of a a QUEUE for Many Produccer / Many Consummer
|
|
for high bandwidth scenario:
|
|
* nearly lock-free,
|
|
* quite fast
|
|
* no blocking calls.
|
|
* only queue (no stack)
|
|
* size of queue is always a power of 2
|
|
* no overwriting.
|
|
*/
|
|
|
|
/* Deferred evaluation for the definition,
|
|
so that all arguments are evaluated before further expansion */
|
|
#define M_QU3UE_MPMC_DEF_P1(arg) M_ID( M_QU3UE_MPMC_DEF_P2 arg )
|
|
|
|
/* Validate the value oplist before going further */
|
|
#define M_QU3UE_MPMC_DEF_P2(name, type, policy, oplist, buffer_t) \
|
|
M_IF_OPLIST(oplist)(M_QU3UE_MPMC_DEF_P3, M_QU3UE_MPMC_DEF_FAILURE)(name, type, policy, oplist, buffer_t)
|
|
|
|
/* Stop processing with a compilation failure */
|
|
#define M_QU3UE_MPMC_DEF_FAILURE(name, type, policy, oplist, buffer_t) \
|
|
M_STATIC_FAILURE(M_LIB_NOT_AN_OPLIST, "(QUEUE_MPMC_DEF): the given argument is not a valid oplist: " M_AS_STR(oplist))
|
|
|
|
#ifdef NDEBUG
|
|
# define M_QU3UE_MPMC_CONTRACT(v) /* nothing */
|
|
#else
|
|
# define M_QU3UE_MPMC_CONTRACT(v) do { \
|
|
M_ASSERT (v != 0); \
|
|
M_ASSERT (v->Tab != NULL); \
|
|
unsigned int _r = atomic_load(&v->ConsoIdx); \
|
|
unsigned int _w = atomic_load(&v->ProdIdx); \
|
|
_r = atomic_load(&v->ConsoIdx); \
|
|
M_ASSERT (_r > _w || _w-_r <= v->size); \
|
|
M_ASSERT (M_POWEROF2_P(v->size)); \
|
|
} while (0)
|
|
#endif
|
|
|
|
|
|
/* Define the buffer type MPMC using atomics and its functions.
|
|
- name: main prefix of the container
|
|
- type: type of an element of the buffer
|
|
- policy: the policy of the buffer
|
|
- oplist: the oplist of the type of an element of the buffer
|
|
- buffer_t: name of the buffer
|
|
*/
|
|
#define M_QU3UE_MPMC_DEF_P3(name, type, policy, oplist, buffer_t) \
|
|
M_QU3UE_MPMC_DEF_TYPE(name, type, policy, oplist, buffer_t) \
|
|
M_CHECK_COMPATIBLE_OPLIST(name, 1, type, oplist) \
|
|
M_QU3UE_MPMC_DEF_CORE(name, type, policy, oplist, buffer_t) \
|
|
M_EMPLACE_QUEUE_DEF(name, buffer_t, M_F(name, _emplace), oplist, M_EMPLACE_QUEUE_GENE)
|
|
|
|
/* Define the type of a MPMC queue */
|
|
#define M_QU3UE_MPMC_DEF_TYPE(name, type, policy, oplist, buffer_t) \
|
|
\
|
|
/* The sequence number of an element will be equal to either \
|
|
- 2* the index of the production which creates it, \
|
|
- 1 + 2* the index of the consumption which consummes it \
|
|
In case of wrapping, as there is no order comparison but only \
|
|
equal comparison, there is no special issue. \
|
|
Each element is put in a separate cache line to avoid false \
|
|
sharing by multiple writing threads. \
|
|
*/ \
|
|
typedef struct M_F(name, _el_s) { \
|
|
atomic_uint seq; /* Can only increase until wrapping */ \
|
|
type x; \
|
|
M_CACHELINE_ALIGN(align, atomic_uint, type); \
|
|
} M_F(name, _el_ct); \
|
|
\
|
|
/* If there is only one producer and one consummer, then they won't \
|
|
typically use the same cache line, increasing performance. */ \
|
|
typedef struct M_F(name, _s) { \
|
|
atomic_uint ProdIdx; /* Can only increase until wrapping */ \
|
|
M_CACHELINE_ALIGN(align1, atomic_uint); \
|
|
atomic_uint ConsoIdx; /* Can only increase until wrapping */ \
|
|
M_CACHELINE_ALIGN(align2, atomic_uint); \
|
|
M_F(name, _el_ct) *Tab; \
|
|
unsigned int size; \
|
|
} buffer_t[1]; \
|
|
\
|
|
typedef type M_F(name, _subtype_ct); \
|
|
typedef buffer_t M_F(name, _ct); \
|
|
|
|
/* Define the core functionnalities of a MPMC queue */
|
|
#define M_QU3UE_MPMC_DEF_CORE(name, type, policy, oplist, buffer_t) \
|
|
M_INLINE bool \
|
|
M_F(name, _push)(buffer_t table, type const x) \
|
|
{ \
|
|
M_QU3UE_MPMC_CONTRACT(table); \
|
|
unsigned int idx = atomic_load_explicit(&table->ProdIdx, \
|
|
memory_order_relaxed); \
|
|
const unsigned int i = idx & (table->size -1); \
|
|
const unsigned int seq = atomic_load_explicit(&table->Tab[i].seq, \
|
|
memory_order_acquire); \
|
|
if (M_UNLIKELY (2*(idx - table->size) + 1 != seq)) { \
|
|
/* Buffer full (or unlikely preemption). Can not push */ \
|
|
return false; \
|
|
} \
|
|
if (M_UNLIKELY (!atomic_compare_exchange_strong_explicit(&table->ProdIdx, \
|
|
&idx, idx+1, memory_order_relaxed, memory_order_relaxed))) { \
|
|
/* Thread has been preempted by another one. */ \
|
|
return false; \
|
|
} \
|
|
/* If it is interrupted here, it may block all pop methods (not push) \
|
|
even if there is other threads that have pushed data later in the \
|
|
queue as all pop threads will try to enqueue this particular element \
|
|
but always fail. The won't try to enqueue other elements. \
|
|
As such, this queue is not strictly lock-free.*/ \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, table->Tab[i].x, x); \
|
|
} else { \
|
|
M_CALL_INIT_SET(oplist, table->Tab[i].x, x); \
|
|
} \
|
|
/* Finish transaction */ \
|
|
atomic_store_explicit(&table->Tab[i].seq, 2*idx, memory_order_release); \
|
|
M_QU3UE_MPMC_CONTRACT(table); \
|
|
return true; \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _pop)(type *ptr, buffer_t table) \
|
|
{ \
|
|
M_QU3UE_MPMC_CONTRACT(table); \
|
|
M_ASSERT (ptr != NULL); \
|
|
unsigned int iC = atomic_load_explicit(&table->ConsoIdx, \
|
|
memory_order_relaxed); \
|
|
const unsigned int i = (iC & (table->size -1)); \
|
|
const unsigned int seq = atomic_load_explicit(&table->Tab[i].seq, \
|
|
memory_order_acquire); \
|
|
if (seq != 2 * iC) { \
|
|
/* Nothing in buffer to consumme (or unlikely preemption) */ \
|
|
return false; \
|
|
} \
|
|
if (M_UNLIKELY (!atomic_compare_exchange_strong_explicit(&table->ConsoIdx, \
|
|
&iC, iC+1, memory_order_relaxed, memory_order_relaxed))) { \
|
|
/* Thread has been preempted by another one */ \
|
|
return false; \
|
|
} \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, *ptr, table->Tab[i].x); \
|
|
} else { \
|
|
M_DO_INIT_MOVE (oplist, *ptr, table->Tab[i].x); \
|
|
} \
|
|
atomic_store_explicit(&table->Tab[i].seq, 2*iC + 1, memory_order_release); \
|
|
M_QU3UE_MPMC_CONTRACT(table); \
|
|
return true; \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _init)(buffer_t buffer, size_t size) \
|
|
{ \
|
|
M_ASSERT (buffer != NULL); \
|
|
M_ASSERT( M_POWEROF2_P(size)); \
|
|
M_ASSERT (0 < size && size <= UINT_MAX); \
|
|
M_ASSERT(((policy) & (M_BUFFER_STACK|M_BUFFER_THREAD_UNSAFE|M_BUFFER_PUSH_OVERWRITE)) == 0); \
|
|
atomic_init(&buffer->ProdIdx, (unsigned int) size); \
|
|
atomic_init(&buffer->ConsoIdx, (unsigned int) size); \
|
|
buffer->size = (unsigned int) size; \
|
|
buffer->Tab = M_CALL_REALLOC(oplist, M_F(name, _el_ct), NULL, size); \
|
|
if (M_UNLIKELY_NOMEM (buffer->Tab == NULL)) { \
|
|
M_MEMORY_FULL (size*sizeof(M_F(name, _el_ct) )); \
|
|
return; \
|
|
} \
|
|
for(unsigned int j = 0; j < (unsigned int) size; j++) { \
|
|
atomic_init(&buffer->Tab[j].seq, 2*j+1U); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_INIT(oplist, buffer->Tab[j].x); \
|
|
} \
|
|
} \
|
|
M_QU3UE_MPMC_CONTRACT(buffer); \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _clear)(buffer_t buffer) \
|
|
{ \
|
|
M_QU3UE_MPMC_CONTRACT(buffer); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
for(unsigned int j = 0; j < buffer->size; j++) { \
|
|
M_CALL_CLEAR(oplist, buffer->Tab[j].x); \
|
|
} \
|
|
} else { \
|
|
unsigned int iP = atomic_load_explicit(&buffer->ProdIdx, memory_order_relaxed); \
|
|
unsigned int i = iP & (buffer->size -1); \
|
|
unsigned int iC = atomic_load_explicit(&buffer->ConsoIdx, memory_order_relaxed); \
|
|
unsigned int j = iC & (buffer->size -1); \
|
|
while (i != j) { \
|
|
M_CALL_CLEAR(oplist, buffer->Tab[j].x); \
|
|
j++; \
|
|
if (j >= buffer->size) \
|
|
j = 0; \
|
|
} \
|
|
} \
|
|
M_CALL_FREE(oplist, buffer->Tab); \
|
|
buffer->Tab = NULL; /* safer */ \
|
|
buffer->size = 3; \
|
|
} \
|
|
\
|
|
M_INLINE size_t \
|
|
M_F(name, _size)(buffer_t table) \
|
|
{ \
|
|
M_QU3UE_MPMC_CONTRACT(table); \
|
|
const unsigned int iC = atomic_load_explicit(&table->ConsoIdx, memory_order_relaxed); \
|
|
const unsigned int iP = atomic_load_explicit(&table->ProdIdx, memory_order_acquire); \
|
|
/* We return an approximation as we can't read both iC & iP atomically \
|
|
As we read producer index after consummer index, \
|
|
and they are atomic variables without reordering \
|
|
producer index is always greater or equal than consumer index \
|
|
(or on overflow occurs, in which case as we compute with modulo \
|
|
arithmetic, the right result is computed). \
|
|
We may return a result which is greater than the size of the queue \
|
|
if the function is interrupted a long time between reading iC & \
|
|
iP. the function is not protected against it. \
|
|
*/ \
|
|
return iP-iC; \
|
|
} \
|
|
\
|
|
M_INLINE size_t \
|
|
M_F(name, _capacity)(buffer_t v) \
|
|
{ \
|
|
M_QU3UE_MPMC_CONTRACT(v); \
|
|
return v->size; \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _empty_p)(buffer_t v) \
|
|
{ \
|
|
return M_F(name, _size) (v) == 0; \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _full_p)(buffer_t v) \
|
|
{ \
|
|
return M_F(name, _size)(v) >= v->size; \
|
|
} \
|
|
|
|
|
|
/********************************** INTERNAL *********************************/
|
|
|
|
/* Definition of a a QUEUE for Single Producer / Single Consummer
|
|
for high bandwidth scenario:
|
|
* wait-free,
|
|
* quite fast
|
|
* no blocking calls.
|
|
* only queue (no stack)
|
|
* size of queue is always a power of 2
|
|
* no overwriting.
|
|
*/
|
|
|
|
/* Deferred evaluation for the definition,
|
|
so that all arguments are evaluated before further expansion */
|
|
#define M_QU3UE_SPSC_DEF_P1(arg) M_ID( M_QU3UE_SPSC_DEF_P2 arg )
|
|
|
|
/* Validate the value oplist before going further */
|
|
#define M_QU3UE_SPSC_DEF_P2(name, type, policy, oplist, buffer_t) \
|
|
M_IF_OPLIST(oplist)(M_QU3UE_SPSC_DEF_P3, M_QU3UE_SPSC_DEF_FAILURE)(name, type, policy, oplist, buffer_t)
|
|
|
|
/* Stop processing with a compilation failure */
|
|
#define M_QU3UE_SPSC_DEF_FAILURE(name, type, policy, oplist, buffer_t) \
|
|
M_STATIC_FAILURE(M_LIB_NOT_AN_OPLIST, "(QUEUE_SPSC_DEF): the given argument is not a valid oplist: " M_AS_STR(oplist))
|
|
|
|
#ifdef NDEBUG
|
|
#define M_QU3UE_SPSC_CONTRACT(table) do { } while (0)
|
|
#else
|
|
#define M_QU3UE_SPSC_CONTRACT(table) do { \
|
|
M_ASSERT (table != NULL); \
|
|
unsigned int _r = atomic_load(&table->consoIdx); \
|
|
unsigned int _w = atomic_load(&table->prodIdx); \
|
|
/* Due to overflow we don't have M_ASSERT (_r <= _w); */ \
|
|
_r = atomic_load(&table->consoIdx); \
|
|
M_ASSERT (_r > _w || _w-_r <= table->size); \
|
|
M_ASSERT (M_POWEROF2_P(table->size)); \
|
|
} while (0)
|
|
#endif
|
|
|
|
/* Define the buffer type SPSC using atomics and its functions.
|
|
- name: main prefix of the container
|
|
- type: type of an element of the buffer
|
|
- policy: the policy of the buffer
|
|
- oplist: the oplist of the type of an element of the buffer
|
|
- buffer_t: name of the buffer
|
|
*/
|
|
#define M_QU3UE_SPSC_DEF_P3(name, type, policy, oplist, buffer_t) \
|
|
M_QU3UE_SPSC_DEF_TYPE(name, type, policy, oplist, buffer_t) \
|
|
M_CHECK_COMPATIBLE_OPLIST(name, 1, type, oplist) \
|
|
M_QU3UE_SPSC_DEF_CORE(name, type, policy, oplist, buffer_t) \
|
|
M_EMPLACE_QUEUE_DEF(name, buffer_t, M_F(name, _emplace), oplist, M_EMPLACE_QUEUE_GENE)
|
|
|
|
/* Define the type of a SPSC queue */
|
|
#define M_QU3UE_SPSC_DEF_TYPE(name, type, policy, oplist, buffer_t) \
|
|
\
|
|
/* Single producer / Single consummer \
|
|
So, only one thread will write in this table. The other thread \
|
|
will only read. As such, there is no concurrent write, and no \
|
|
need to align the structure for best performance. */ \
|
|
typedef struct M_F(name, _el_s) { \
|
|
type x; \
|
|
} M_F(name, _el_ct); \
|
|
\
|
|
typedef struct M_F(name, _s) { \
|
|
atomic_uint consoIdx; /* Can only increase until overflow */ \
|
|
unsigned int size; \
|
|
M_F(name, _el_ct) *Tab; \
|
|
M_CACHELINE_ALIGN(align, atomic_uint, size_t, M_F(name, _el_ct) *); \
|
|
atomic_uint prodIdx; /* Can only increase until overflow */ \
|
|
} buffer_t[1]; \
|
|
\
|
|
typedef type M_F(name, _subtype_ct); \
|
|
typedef buffer_t M_F(name, _ct); \
|
|
|
|
/* Define the core functionnalities of a SPSC queue */
|
|
#define M_QU3UE_SPSC_DEF_CORE(name, type, policy, oplist, buffer_t) \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _push)(buffer_t table, type const x) \
|
|
{ \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
unsigned int r = atomic_load_explicit(&table->consoIdx, \
|
|
memory_order_relaxed); \
|
|
unsigned int w = atomic_load_explicit(&table->prodIdx, \
|
|
memory_order_acquire); \
|
|
if (w-r >= table->size) \
|
|
return false; \
|
|
unsigned int i = w & (table->size -1); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, table->Tab[i].x, x); \
|
|
} else { \
|
|
M_CALL_INIT_SET(oplist, table->Tab[i].x, x); \
|
|
} \
|
|
atomic_store_explicit(&table->prodIdx, w+1, memory_order_release); \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
return true; \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _push_move)(buffer_t table, type *x) \
|
|
{ \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
unsigned int r = atomic_load_explicit(&table->consoIdx, \
|
|
memory_order_relaxed); \
|
|
unsigned int w = atomic_load_explicit(&table->prodIdx, \
|
|
memory_order_acquire); \
|
|
if (w-r >= table->size) \
|
|
return false; \
|
|
unsigned int i = w & (table->size -1); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_DO_MOVE(oplist, table->Tab[i].x, *x); \
|
|
} else { \
|
|
M_DO_INIT_MOVE(oplist, table->Tab[i].x, *x); \
|
|
} \
|
|
atomic_store_explicit(&table->prodIdx, w+1, memory_order_release); \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
return true; \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _pop)(type *ptr, buffer_t table) \
|
|
{ \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
M_ASSERT (ptr != NULL); \
|
|
unsigned int w = atomic_load_explicit(&table->prodIdx, \
|
|
memory_order_relaxed); \
|
|
unsigned int r = atomic_load_explicit(&table->consoIdx, \
|
|
memory_order_acquire); \
|
|
if (w-r == 0) \
|
|
return false; \
|
|
unsigned int i = r & (table->size -1); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, *ptr , table->Tab[i].x); \
|
|
} else { \
|
|
M_DO_INIT_MOVE (oplist, *ptr, table->Tab[i].x); \
|
|
} \
|
|
atomic_store_explicit(&table->consoIdx, r+1, memory_order_release); \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
return true; \
|
|
} \
|
|
\
|
|
M_INLINE unsigned \
|
|
M_F(name, _push_bulk)(buffer_t table, unsigned n, type const x[]) \
|
|
{ \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
M_ASSERT (x != NULL); \
|
|
M_ASSERT (n <= table->size); \
|
|
unsigned int r = atomic_load_explicit(&table->consoIdx, \
|
|
memory_order_relaxed); \
|
|
unsigned int w = atomic_load_explicit(&table->prodIdx, \
|
|
memory_order_acquire); \
|
|
unsigned int max = M_MIN(n, table->size - (w-r) ); \
|
|
if (max == 0) \
|
|
return 0; \
|
|
for(unsigned int k = 0; k < max; k++) { \
|
|
unsigned int i = (w+k) & (table->size -1); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, table->Tab[i].x, x[k]); \
|
|
} else { \
|
|
M_CALL_INIT_SET(oplist, table->Tab[i].x, x[k]); \
|
|
} \
|
|
} \
|
|
atomic_store_explicit(&table->prodIdx, w+max, memory_order_release); \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
return max; \
|
|
} \
|
|
\
|
|
M_INLINE unsigned \
|
|
M_F(name, _pop_bulk)(unsigned int n, type ptr[], buffer_t table) \
|
|
{ \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
M_ASSERT (ptr != NULL); \
|
|
M_ASSERT (n <= table->size); \
|
|
unsigned int w = atomic_load_explicit(&table->prodIdx, \
|
|
memory_order_relaxed); \
|
|
unsigned int r = atomic_load_explicit(&table->consoIdx, \
|
|
memory_order_acquire); \
|
|
if (w-r == 0) \
|
|
return 0; \
|
|
unsigned int max = M_MIN(w-r, n); \
|
|
for(unsigned int k = 0; k < max; k++) { \
|
|
unsigned int i = (r+k) & (table->size -1); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, ptr[k], table->Tab[i].x); \
|
|
} else { \
|
|
M_DO_INIT_MOVE (oplist, ptr[k], table->Tab[i].x); \
|
|
} \
|
|
} \
|
|
atomic_store_explicit(&table->consoIdx, r+max, memory_order_release); \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
return max; \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _push_force)(buffer_t table, type const x) \
|
|
{ \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
unsigned int r = atomic_load_explicit(&table->consoIdx, \
|
|
memory_order_relaxed); \
|
|
unsigned int w = atomic_load_explicit(&table->prodIdx, \
|
|
memory_order_acquire); \
|
|
/* If no place in queue, try to skip the last one */ \
|
|
while (w-r >= table->size) { \
|
|
bool b = atomic_compare_exchange_strong(&table->consoIdx, &r, r+1); \
|
|
r += b; \
|
|
} \
|
|
unsigned int i = w & (table->size -1); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
M_CALL_SET(oplist, table->Tab[i].x, x); \
|
|
} else { \
|
|
M_CALL_INIT_SET(oplist, table->Tab[i].x, x); \
|
|
} \
|
|
atomic_store_explicit(&table->prodIdx, w+1, memory_order_release); \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
} \
|
|
\
|
|
M_INLINE size_t \
|
|
M_F(name, _size)(buffer_t table) \
|
|
{ \
|
|
M_QU3UE_SPSC_CONTRACT(table); \
|
|
unsigned int r = atomic_load_explicit(&table->consoIdx, \
|
|
memory_order_relaxed); \
|
|
unsigned int w = atomic_load_explicit(&table->prodIdx, \
|
|
memory_order_acquire); \
|
|
/* We return an approximation as we can't read both r & w atomically \
|
|
As we read producer index after consummer index, \
|
|
and they are atomic variables without reordering \
|
|
producer index is always greater or equal than consumer index \
|
|
(or on overflow occurs, in which case as we compute with modulo \
|
|
arithmetic, the right result is computed). \
|
|
We may return a result which is greater than the size of the queue \
|
|
if the function is interrupted a long time between reading the \
|
|
indexs. The function is not protected against it. \
|
|
*/ \
|
|
return w-r; \
|
|
} \
|
|
\
|
|
M_INLINE size_t \
|
|
M_F(name, _capacity)(buffer_t v) \
|
|
{ \
|
|
return v->size; \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _empty_p)(buffer_t v) \
|
|
{ \
|
|
return M_F(name, _size) (v) == 0; \
|
|
} \
|
|
\
|
|
M_INLINE bool \
|
|
M_F(name, _full_p)(buffer_t v) \
|
|
{ \
|
|
return M_F(name, _size)(v) >= v->size; \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _init)(buffer_t buffer, size_t size) \
|
|
{ \
|
|
M_ASSERT (buffer != NULL); \
|
|
M_ASSERT( M_POWEROF2_P(size)); \
|
|
M_ASSERT (0 < size && size <= UINT_MAX); \
|
|
M_ASSERT(((policy) & (M_BUFFER_STACK|M_BUFFER_THREAD_UNSAFE|M_BUFFER_PUSH_OVERWRITE)) == 0); \
|
|
atomic_init(&buffer->prodIdx, (unsigned int) size); \
|
|
atomic_init(&buffer->consoIdx, (unsigned int) size); \
|
|
buffer->size = (unsigned int) size; \
|
|
buffer->Tab = M_CALL_REALLOC(oplist, M_F(name, _el_ct), NULL, size); \
|
|
if (M_UNLIKELY_NOMEM (buffer->Tab == NULL)) { \
|
|
M_MEMORY_FULL (size*sizeof(M_F(name, _el_ct) )); \
|
|
return; \
|
|
} \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
for(unsigned int j = 0; j < (unsigned int) size; j++) { \
|
|
M_CALL_INIT(oplist, buffer->Tab[j].x); \
|
|
} \
|
|
} \
|
|
M_QU3UE_SPSC_CONTRACT(buffer); \
|
|
} \
|
|
\
|
|
M_INLINE void \
|
|
M_F(name, _clear)(buffer_t buffer) \
|
|
{ \
|
|
M_QU3UE_SPSC_CONTRACT(buffer); \
|
|
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
|
|
for(unsigned int j = 0; j < buffer->size; j++) { \
|
|
M_CALL_CLEAR(oplist, buffer->Tab[j].x); \
|
|
} \
|
|
} else { \
|
|
unsigned int iP = atomic_load_explicit(&buffer->prodIdx, memory_order_relaxed); \
|
|
unsigned int i = iP & (buffer->size -1); \
|
|
unsigned int iC = atomic_load_explicit(&buffer->consoIdx, memory_order_relaxed); \
|
|
unsigned int j = iC & (buffer->size -1); \
|
|
while (i != j) { \
|
|
M_CALL_CLEAR(oplist, buffer->Tab[j].x); \
|
|
j++; \
|
|
if (j >= buffer->size) \
|
|
j = 0; \
|
|
} \
|
|
} \
|
|
M_CALL_FREE(oplist, buffer->Tab); \
|
|
buffer->Tab = NULL; /* safer */ \
|
|
buffer->size = 3; \
|
|
} \
|
|
|
|
|
|
/********************************** INTERNAL *********************************/
|
|
|
|
/* Deferred evaluation for the definition,
|
|
so that all arguments are evaluated before further expansion */
|
|
#define M_BUFF3R_OPLIST_P1(arg) M_BUFF3R_OPLIST_P2 arg
|
|
|
|
/* Validation of the given oplist */
|
|
#define M_BUFF3R_OPLIST_P2(name, oplist) \
|
|
M_IF_OPLIST(oplist)(M_BUFF3R_OPLIST_P3, M_BUFF3R_OPLIST_FAILURE)(name, oplist)
|
|
|
|
/* Prepare a clean compilation failure */
|
|
#define M_BUFF3R_OPLIST_FAILURE(name, oplist) \
|
|
((M_LIB_ERROR(ARGUMENT_OF_BUFFER_OPLIST_IS_NOT_AN_OPLIST, name, oplist)))
|
|
|
|
/* OPLIST definition of a buffer */
|
|
#define M_BUFF3R_OPLIST_P3(name, oplist) \
|
|
(INIT(M_C3(m_buff3r_,name, _init)) \
|
|
,INIT_SET(M_F(name, _init_set)) \
|
|
,SET(M_F(name, _set)) \
|
|
,CLEAR(M_F(name, _clear)) \
|
|
,NAME(name) \
|
|
,TYPE(M_F(name,_ct)) \
|
|
,SUBTYPE(M_F(name, _subtype_ct)) \
|
|
,RESET(M_F(name,_reset)) \
|
|
,PUSH(M_F(name,_push)) \
|
|
,POP(M_F(name,_pop)) \
|
|
,OPLIST(oplist) \
|
|
,EMPTY_P(M_F(name, _empty_p)), \
|
|
,GET_SIZE(M_F(name, _size)) \
|
|
)
|
|
|
|
|
|
/********************************** INTERNAL *********************************/
|
|
|
|
#if M_USE_SMALL_NAME
|
|
#define BUFFER_DEF M_BUFFER_DEF
|
|
#define BUFFER_DEF_AS M_BUFFER_DEF_AS
|
|
#define BUFFER_OPLIST M_BUFFER_OPLIST
|
|
#define QUEUE_MPMC_DEF M_QUEUE_MPMC_DEF
|
|
#define QUEUE_MPMC_DEF_AS M_QUEUE_MPMC_DEF_AS
|
|
#define QUEUE_SPSC_DEF M_QUEUE_SPSC_DEF
|
|
#define QUEUE_SPSC_DEF_AS M_QUEUE_SPSC_DEF_AS
|
|
|
|
#define buffer_policy_e m_buffer_policy_e
|
|
#define BUFFER_QUEUE M_BUFFER_QUEUE
|
|
#define BUFFER_STACK M_BUFFER_STACK
|
|
#define BUFFER_BLOCKING_PUSH M_BUFFER_BLOCKING_PUSH
|
|
#define BUFFER_UNBLOCKING_PUSH M_BUFFER_UNBLOCKING_PUSH
|
|
#define BUFFER_BLOCKING_POP M_BUFFER_BLOCKING_POP
|
|
#define BUFFER_UNBLOCKING_POP M_BUFFER_UNBLOCKING_POP
|
|
#define BUFFER_BLOCKING M_BUFFER_BLOCKING
|
|
#define BUFFER_UNBLOCKING M_BUFFER_UNBLOCKING
|
|
#define BUFFER_THREAD_SAFE M_BUFFER_THREAD_SAFE
|
|
#define BUFFER_THREAD_UNSAFE M_BUFFER_THREAD_UNSAFE
|
|
#define BUFFER_PUSH_INIT_POP_MOVE M_BUFFER_PUSH_INIT_POP_MOVE
|
|
#define BUFFER_PUSH_OVERWRITE M_BUFFER_PUSH_OVERWRITE
|
|
#define BUFFER_DEFERRED_POP M_BUFFER_DEFERRED_POP
|
|
|
|
#endif
|
|
|
|
#endif
|