Ken Van Hoeylandt 5dc2599e55 implemented furi from flipper zero
added cmsis_core, furi, mlib and nanobake
implemented basic app structure from furi
implemented basic placeholder apps
2023-12-26 21:47:27 +01:00

1351 lines
91 KiB
C

/*
* M*LIB - Fixed size (Bounded) QUEUE & STACK interface
*
* Copyright (c) 2017-2023, Patrick Pelissier
* All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* + Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* + Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef MSTARLIB_BUFFER_H
#define MSTARLIB_BUFFER_H
#include "m-core.h"
#include "m-thread.h"
#include "m-atomic.h"
/* Define the different kind of policy a lock-based buffer can have:
* - the buffer can be either a queue (policy is FIFO) or a stack (policy is FILO),
* - if the push method is by default blocking (waiting for the buffer to has some space) or not, *** deprecated ***
* - if the pop method is by default blocking (waiting for the buffer to has some data) or not, *** deprecated ***
* - if both methods are blocking, *** deprecated ***
* - if it shall be thread safe or not (i.e. remove the mutex lock and atomic costs),
* - if the buffer has to be init with empty elements, or if it shall init an element when it is pushed (and moved when popped),
* - if the buffer has to overwrite the last element if the buffer is full,
* - if the pop of an element is not complete until the call to pop_release (preventing push until this call).
*/
typedef enum {
M_BUFFER_QUEUE = 0, M_BUFFER_STACK = 1,
M_BUFFER_BLOCKING_PUSH = 0, M_BUFFER_UNBLOCKING_PUSH = 2,
M_BUFFER_BLOCKING_POP = 0, M_BUFFER_UNBLOCKING_POP = 4,
M_BUFFER_BLOCKING = 0, M_BUFFER_UNBLOCKING = 6,
M_BUFFER_THREAD_SAFE = 0, M_BUFFER_THREAD_UNSAFE = 8,
M_BUFFER_PUSH_INIT_POP_MOVE = 16,
M_BUFFER_PUSH_OVERWRITE = 32,
M_BUFFER_DEFERRED_POP = 64
} m_buffer_policy_e;
/* Define a lock based buffer.
If size is 0, then the size will only be defined at run-time when initializing the buffer,
otherwise the size will be a compile time constant.
USAGE: BUFFER_DEF(name, type, size_of_buffer_or_0, policy[, oplist]) */
#define M_BUFFER_DEF(name, type, m_size, ... ) \
M_BUFFER_DEF_AS(name, M_F(name, _t), type, m_size, __VA_ARGS__)
/* Define a lock based buffer
as the provided type name_t.
USAGE: BUFFER_DEF_AS(name, name_t, type, size_of_buffer_or_0, policy[, oplist of type]) */
#define M_BUFFER_DEF_AS(name, name_t, type, m_size, ... ) \
M_BEGIN_PROTECTED_CODE \
M_BUFF3R_DEF_P1(M_IF_NARGS_EQ1(__VA_ARGS__) \
((name, type, m_size,__VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(type)(), name_t ), \
(name, type, m_size,__VA_ARGS__, name_t ))) \
M_END_PROTECTED_CODE
/* Define the oplist of a lock based buffer given its name and its oplist.
USAGE: BUFFER_OPLIST(name[, oplist of the type]) */
#define M_BUFFER_OPLIST(...) \
M_BUFF3R_OPLIST_P1(M_IF_NARGS_EQ1(__VA_ARGS__) \
((__VA_ARGS__, M_BASIC_OPLIST), \
(__VA_ARGS__ )))
/* Define a nearly lock-free queue for Many Producer Many Consummer.
Much faster than queue of BUFFER_DEF in heavy communication scenario
but without any blocking features (this is let to the user).
Size of created queue shall be a power of 2 and is defined at run-time.
USAGE: QUEUE_MPMC_DEF(name, type, policy, [oplist of type])
*/
#define M_QUEUE_MPMC_DEF(name, type, ...) \
M_QUEUE_MPMC_DEF_AS(name, M_F(name,_t), type, __VA_ARGS__)
/* Define a nearly lock-free queue for Many Producer Many Consummer
as the provided type name_t.
Much faster than queue of BUFFER_DEF in heavy communication scenario
but without any blocking features (this is let to the user).
Size of created queue shall be a power of 2 and is defined at run-time.
USAGE: QUEUE_MPMC_DEF_AS(name, name_t, type, policy, [oplist of type])
*/
#define M_QUEUE_MPMC_DEF_AS(name, name_t, type, ...) \
M_BEGIN_PROTECTED_CODE \
M_QU3UE_MPMC_DEF_P1(M_IF_NARGS_EQ1(__VA_ARGS__) \
((name, type, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(type)(), name_t ), \
(name, type, __VA_ARGS__, name_t ))) \
M_END_PROTECTED_CODE
/* Define a wait-free queue for Single Producer Single Consummer
Much faster than queue of BUFFER_DEF or QUEUE_MPMC in heavy communication scenario
but without any blocking features (this is let to the user).
Size of created queue shall be a power of 2 and is defined at run-time.
USAGE: QUEUE_SPSC_DEF(name, type, policy, [oplist of type])
*/
#define M_QUEUE_SPSC_DEF(name, type, ...) \
M_QUEUE_SPSC_DEF_AS(name, M_F(name, _t), type, __VA_ARGS__)
/* Define a wait-free queue for Single Producer Single Consummer
as the provided type name_t.
Much faster than queue of BUFFER_DEF in heavy communication scenario
but without any blocking features (this is let to the user).
Size of created queue shall be a power of 2 and is defined at run-time.
USAGE: QUEUE_SPSC_DEF_AS(name, name_t, type, policy, [oplist of type])
*/
#define M_QUEUE_SPSC_DEF_AS(name, name_t, type, ...) \
M_BEGIN_PROTECTED_CODE \
M_QU3UE_SPSC_DEF_P1(M_IF_NARGS_EQ1(__VA_ARGS__) \
((name, type, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(type)(), name_t ), \
(name, type, __VA_ARGS__, name_t ))) \
M_END_PROTECTED_CODE
/*****************************************************************************/
/********************************** INTERNAL *********************************/
/*****************************************************************************/
/* Test if the given policy is true or not.
WARNING: The policy shall be a non zero value (i.e. not a default). */
#define M_BUFF3R_POLICY_P(policy, val) \
(((policy) & (val)) != 0)
/* Handle either atomic integer or normal integer in function of the policy
parameter of the buffer BUFFER_THREAD_UNSAFE (BUFFER_THREAD_SAFE is the
default). This enables avoiding to pay the cost of atomic operations if not
applicable.
*/
typedef union m_buff3r_number_s {
unsigned int u;
atomic_uint a;
#ifdef __cplusplus
// Not sure why, but C++ needs an explicit default constructor for this union.
m_buff3r_number_s() : u(0) {};
#endif
} m_buff3r_number_ct[1];
M_INLINE void
m_buff3r_number_init(m_buff3r_number_ct n, unsigned int policy)
{
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
atomic_init(&n->a, 0U);
else
n->u = 0UL;
}
M_INLINE unsigned int
m_buff3r_number_load(m_buff3r_number_ct n, unsigned int policy)
{
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
// Perform a memory acquire so that further usage of the buffer
// is synchronized.
return atomic_load_explicit(&n->a, memory_order_acquire);
else
return n->u;
}
M_INLINE void
m_buff3r_number_store(m_buff3r_number_ct n, unsigned int v, unsigned int policy)
{
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
// This function is used in context where a relaxed access is sufficient.
atomic_store_explicit(&n->a, v, memory_order_relaxed);
else
n->u = v;
}
M_INLINE void
m_buff3r_number_set(m_buff3r_number_ct n, m_buff3r_number_ct v, unsigned int policy)
{
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
// This function is used in context where a relaxed access is sufficient.
atomic_store_explicit(&n->a, atomic_load_explicit(&v->a, memory_order_relaxed), memory_order_relaxed);
else
n->u = v->u;
}
M_INLINE unsigned int
m_buff3r_number_inc(m_buff3r_number_ct n, unsigned int policy)
{
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
return atomic_fetch_add(&n->a, 1U);
else
return n->u ++;
}
M_INLINE unsigned int
m_buff3r_number_dec(m_buff3r_number_ct n, unsigned int policy)
{
if (!M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE))
return atomic_fetch_sub(&n->a, 1U);
else
return n->u --;
}
/********************************** INTERNAL *********************************/
/* Test if the size is only run-time or build time */
#define M_BUFF3R_IF_CTE_SIZE(m_size) M_IF(M_BOOL(m_size))
/* Return the size (run time or build time).
NOTE: It assumed that the buffer variable name is 'v' */
#define M_BUFF3R_SIZE(m_size) \
M_BUFF3R_IF_CTE_SIZE(m_size) (m_size, v->capacity)
/* Contract of a buffer.
Nothing particular since we cannot test much without locking it.
*/
#define M_BUFF3R_CONTRACT(buffer, size) do { \
M_ASSERT (buffer != NULL); \
M_ASSERT (buffer->data != NULL); \
}while (0)
/* Contract of a buffer within a protected section */
#define M_BUFF3R_PROTECTED_CONTRACT(policy, buffer, size) do { \
M_ASSERT (m_buff3r_number_load(buffer->number[0], policy) <= M_BUFF3R_SIZE(size)); \
} while (0)
/* Deferred evaluation for the definition,
so that all arguments are evaluated before further expansion */
#define M_BUFF3R_DEF_P1(arg) M_ID( M_BUFF3R_DEF_P2 arg )
/* Validate the value oplist before going further */
#define M_BUFF3R_DEF_P2(name, type, m_size, policy, oplist, buffer_t) \
M_IF_OPLIST(oplist)(M_BUFF3R_DEF_P3, M_BUFF3R_DEF_FAILURE)(name, type, m_size, policy, oplist, buffer_t)
/* Stop processing with a compilation failure */
#define M_BUFF3R_DEF_FAILURE(name, type, m_size, policy, oplist, buffer_t) \
M_STATIC_FAILURE(M_LIB_NOT_AN_OPLIST, "(M_BUFFER_DEF): the given argument is not a valid oplist: " M_AS_STR(oplist))
/* Define the buffer type using mutex lock and its functions.
- name: main prefix of the container
- type: type of an element of the buffer
- m_size: constant to 0 if variable runtime size, or else the fixed size of the buffer
- policy: the policy of the buffer
- oplist: the oplist of the type of an element of the buffer
- buffer_t: name of the buffer
*/
#define M_BUFF3R_DEF_P3(name, type, m_size, policy, oplist, buffer_t) \
M_BUFF3R_DEF_TYPE(name, type, m_size, policy, oplist, buffer_t) \
M_CHECK_COMPATIBLE_OPLIST(name, 1, type, oplist) \
M_BUFF3R_DEF_CORE(name, type, m_size, policy, oplist, buffer_t) \
M_EMPLACE_QUEUE_DEF(name, buffer_t, M_F(name, _emplace), oplist, M_EMPLACE_QUEUE_GENE)
/* Define the type of a buffer */
#define M_BUFF3R_DEF_TYPE(name, type, m_size, policy, oplist, buffer_t) \
\
/* Put each data in a separate cache line to avoid false sharing \
by multiple writing threads. No need to align if there is no thread */ \
typedef union M_F(name, _el_s) { \
type x; \
char align[M_BUFF3R_POLICY_P(policy, M_BUFFER_THREAD_UNSAFE) ? 1 : M_ALIGN_FOR_CACHELINE_EXCLUSION]; \
} M_F(name, _el_ct); \
\
typedef struct M_F(name, _s) { \
/* Data for a producer */ \
m_mutex_t mutexPush; /* MUTEX used for pushing elements */ \
size_t idx_prod; /* Index of the production threads */ \
size_t overwrite; /* Number of overwritten values */ \
m_cond_t there_is_data; /* condition raised when there is data */ \
/* Read only Data */ \
M_BUFF3R_IF_CTE_SIZE(m_size)( ,size_t capacity;) /* Capacity of the buffer */ \
/* Data for a consummer */ \
m_cond_t there_is_room_for_data; /* Cond. raised when there is room */ \
m_mutex_t mutexPop; /* MUTEX used for popping elements */ \
size_t idx_cons; /* Index of the consumption threads */ \
/* number[0] := Number of elements in the buffer */ \
/* number[1] := [OPTION] Number of elements being deferred in the buffer */ \
m_buff3r_number_ct number[1 + M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)]; \
/* If fixed size, array of elements, otherwise pointer to element */ \
M_F(name, _el_ct) M_BUFF3R_IF_CTE_SIZE(m_size)(data[m_size], *data); \
} buffer_t[1]; \
\
typedef struct M_F(name, _s) *M_F(name, _ptr); \
typedef const struct M_F(name, _s) *M_F(name, _srcptr); \
/* Internal type used to unconst the buffer */ \
typedef union { M_F(name, _srcptr) cptr; M_F(name, _ptr) ptr; } M_F(name, _uptr_ct); \
/* Internal types used by the oplist */ \
typedef type M_F(name, _subtype_ct); \
typedef buffer_t M_F(name, _ct); \
/* Define the core functionnalities of a buffer */
#define M_BUFF3R_DEF_CORE(name, type, m_size, policy, oplist, buffer_t) \
\
M_INLINE void \
M_F(name, _init)(buffer_t v, size_t size) \
{ \
M_ASSERT(size <= UINT_MAX); \
M_BUFF3R_IF_CTE_SIZE(m_size)(M_ASSERT(size == m_size), v->capacity = size); \
v->idx_prod = v->idx_cons = v->overwrite = 0; \
m_buff3r_number_init (v->number[0], policy); \
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
m_buff3r_number_init (v->number[1], policy); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_mutex_init(v->mutexPush); \
m_mutex_init(v->mutexPop); \
m_cond_init(v->there_is_data); \
m_cond_init(v->there_is_room_for_data); \
} else { \
M_ASSERT(M_BUFF3R_POLICY_P((policy), M_BUFFER_UNBLOCKING)); \
} \
\
M_BUFF3R_IF_CTE_SIZE(m_size)( /* Statically allocated */ , \
v->data = M_CALL_REALLOC(oplist, M_F(name, _el_ct), NULL, M_BUFF3R_SIZE(m_size)); \
if (M_UNLIKELY_NOMEM (v->data == NULL)) { \
M_MEMORY_FULL (M_BUFF3R_SIZE(m_size)*sizeof(M_F(name, _el_ct))); \
return; \
} \
) \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
for(size_t i = 0; i < size; i++) { \
M_CALL_INIT(oplist, v->data[i].x); \
} \
} \
M_BUFF3R_CONTRACT(v,m_size); \
} \
\
M_BUFF3R_IF_CTE_SIZE(m_size)( \
M_INLINE void \
M_C3(m_buff3r_,name,_init)(buffer_t v) \
{ \
M_F(name, _init)(v, m_size); \
} \
, ) \
\
M_INLINE void \
M_C3(m_buff3r_,name,_clear_obj)(buffer_t v) \
{ \
M_BUFF3R_CONTRACT(v,m_size); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
for(size_t i = 0; i < M_BUFF3R_SIZE(m_size); i++) { \
M_CALL_CLEAR(oplist, v->data[i].x); \
} \
} else { \
size_t i = M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) ? 0 : v->idx_cons; \
while (i != v->idx_prod) { \
M_CALL_CLEAR(oplist, v->data[i].x); \
i++; \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) && i >= M_BUFF3R_SIZE(m_size)) \
i = 0; \
} \
} \
v->idx_prod = v->idx_cons = 0; \
m_buff3r_number_store (v->number[0], 0U, policy); \
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
m_buff3r_number_store(v->number[1], 0U, policy); \
M_BUFF3R_CONTRACT(v,m_size); \
} \
\
M_INLINE void \
M_F(name, _clear)(buffer_t v) \
{ \
M_BUFF3R_CONTRACT(v,m_size); \
M_C3(m_buff3r_,name,_clear_obj)(v); \
M_BUFF3R_IF_CTE_SIZE(m_size)( , \
M_CALL_FREE(oplist, v->data); \
v->data = NULL; \
) \
v->overwrite = 0; \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_mutex_clear(v->mutexPush); \
m_mutex_clear(v->mutexPop); \
m_cond_clear(v->there_is_data); \
m_cond_clear(v->there_is_room_for_data); \
} \
} \
\
M_INLINE void \
M_F(name, _reset)(buffer_t v) \
{ \
M_BUFF3R_CONTRACT(v,m_size); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_mutex_lock(v->mutexPush); \
m_mutex_lock(v->mutexPop); \
} \
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
if (M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) \
M_C3(m_buff3r_,name,_clear_obj)(v); \
v->idx_prod = v->idx_cons = 0; \
m_buff3r_number_store (v->number[0], 0U, policy); \
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
m_buff3r_number_store(v->number[1], 0U, policy); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_cond_broadcast(v->there_is_room_for_data); \
m_mutex_unlock(v->mutexPop); \
m_mutex_unlock(v->mutexPush); \
} \
M_BUFF3R_CONTRACT(v,m_size); \
} \
\
M_INLINE void M_ATTR_DEPRECATED \
M_F(name, _clean)(buffer_t v) \
{ \
M_F(name,_reset)(v); \
} \
\
M_INLINE void \
M_F(name, _init_set)(buffer_t dest, const buffer_t src) \
{ \
/* unconst 'src', so that we can lock it (semantically it is const) */ \
M_F(name, _uptr_ct) vu; \
vu.cptr = src; \
M_F(name, _ptr) v = vu.ptr; \
M_ASSERT (dest != v); \
M_BUFF3R_CONTRACT(v,m_size); \
M_F(name, _init)(dest, M_BUFF3R_SIZE(m_size)); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_mutex_lock(v->mutexPush); \
m_mutex_lock(v->mutexPop); \
} \
\
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
for(size_t i = 0; i < M_BUFF3R_SIZE(m_size); i++) { \
M_CALL_INIT_SET(oplist, dest->data[i].x, v->data[i].x); \
} \
} else { \
size_t i = M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) ? 0 : v->idx_cons; \
while (i != v->idx_prod) { \
M_CALL_INIT_SET(oplist, dest->data[i].x, v->data[i].x); \
i++; \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) && i >= M_BUFF3R_SIZE(m_size)) \
i = 0; \
} \
} \
\
dest->idx_prod = v->idx_prod; \
dest->idx_cons = v->idx_cons; \
m_buff3r_number_set (dest->number[0], v->number[0], policy); \
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
m_buff3r_number_set(dest->number[1], v->number[1], policy); \
\
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_mutex_unlock(v->mutexPop); \
m_mutex_unlock(v->mutexPush); \
} \
M_BUFF3R_CONTRACT(v,m_size); \
M_BUFF3R_CONTRACT(dest, m_size); \
} \
\
M_INLINE void \
M_F(name, _set)(buffer_t dest, const buffer_t src) \
{ \
/* unconst 'src', so that we can lock it (semantically it is const) */ \
M_F(name, _uptr_ct) vu; \
vu.cptr = src; \
M_F(name, _ptr) v = vu.ptr; \
M_BUFF3R_CONTRACT(dest,m_size); \
M_BUFF3R_CONTRACT(v,m_size); \
\
if (dest == v) return; \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
/* Case of deadlock: A := B, B:=C, C:=A (all in //) \
Solution: order the lock by increasing memory */ \
if (dest < v) { \
m_mutex_lock(dest->mutexPush); \
m_mutex_lock(dest->mutexPop); \
m_mutex_lock(v->mutexPush); \
m_mutex_lock(v->mutexPop); \
} else { \
m_mutex_lock(v->mutexPush); \
m_mutex_lock(v->mutexPop); \
m_mutex_lock(dest->mutexPush); \
m_mutex_lock(dest->mutexPop); \
} \
} \
\
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
M_C3(m_buff3r_,name,_clear_obj)(dest); \
\
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
for(size_t i = 0; i < M_BUFF3R_SIZE(m_size); i++) { \
M_CALL_INIT_SET(oplist, dest->data[i].x, v->data[i].x); \
} \
} else { \
size_t i = M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) ? 0 : v->idx_cons; \
while (i != v->idx_prod) { \
M_CALL_INIT_SET(oplist, dest->data[i].x, v->data[i].x); \
i++; \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK) && i >= M_BUFF3R_SIZE(m_size)) \
i = 0; \
} \
} \
\
dest->idx_prod = v->idx_prod; \
dest->idx_cons = v->idx_cons; \
m_buff3r_number_set (dest->number[0], v->number[0], policy); \
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
m_buff3r_number_set(dest->number[1], v->number[1], policy); \
\
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
/* It may be false, but it is not wrong! */ \
m_cond_broadcast(v->there_is_room_for_data); \
m_cond_broadcast(v->there_is_data); \
if (dest < v) { \
m_mutex_unlock(v->mutexPop); \
m_mutex_unlock(v->mutexPush); \
m_mutex_unlock(dest->mutexPop); \
m_mutex_unlock(dest->mutexPush); \
} else { \
m_mutex_unlock(dest->mutexPop); \
m_mutex_unlock(dest->mutexPush); \
m_mutex_unlock(v->mutexPop); \
m_mutex_unlock(v->mutexPush); \
} \
} \
M_BUFF3R_CONTRACT(v,m_size); \
M_BUFF3R_CONTRACT(dest, m_size); \
} \
\
M_INLINE bool \
M_F(name, _empty_p)(buffer_t v) \
{ \
M_BUFF3R_CONTRACT(v,m_size); \
/* If the buffer has been configured with deferred pop \
we considered the queue as empty when the number of \
deferred pop has reached 0, not the number of items in the \
buffer is 0. */ \
if (M_BUFF3R_POLICY_P(policy, M_BUFFER_DEFERRED_POP)) \
return m_buff3r_number_load (v->number[1], policy) == 0; \
else \
return m_buff3r_number_load (v->number[0], policy) == 0; \
} \
\
M_INLINE bool \
M_F(name, _full_p)(buffer_t v) \
{ \
M_BUFF3R_CONTRACT(v,m_size); \
return m_buff3r_number_load (v->number[0], policy) \
== M_BUFF3R_SIZE(m_size); \
} \
\
M_INLINE size_t \
M_F(name, _size)(buffer_t v) \
{ \
M_BUFF3R_CONTRACT(v,m_size); \
return m_buff3r_number_load (v->number[0], policy); \
} \
\
M_INLINE bool \
M_F(name, _push_blocking)(buffer_t v, type const data, bool blocking) \
{ \
M_BUFF3R_CONTRACT(v,m_size); \
\
/* Producer Mutex lock (mutex lock performs an acquire memory barrier) */ \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_mutex_lock(v->mutexPush); \
while (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_OVERWRITE) \
&& M_F(name, _full_p)(v)) { \
if (!blocking) { \
m_mutex_unlock(v->mutexPush); \
return false; \
} \
m_cond_wait(v->there_is_room_for_data, v->mutexPush); \
} \
} else if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_OVERWRITE) \
&& M_F(name, _full_p)(v)) \
return false; \
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
\
size_t previousSize, idx = v->idx_prod; \
/* INDEX computation if we have to overwrite the last element */ \
if (M_UNLIKELY (M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_OVERWRITE) \
&& M_F(name, _full_p)(v))) { \
v->overwrite++; \
/* Let's overwrite the last element */ \
/* Compute the index of the last push element */ \
idx--; \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK)) { \
idx = idx >= M_BUFF3R_SIZE(m_size) ? M_BUFF3R_SIZE(m_size)-1 : idx; \
} \
/* Update data in the buffer */ \
M_CALL_SET(oplist, v->data[idx].x, data); \
previousSize = M_BUFF3R_SIZE(m_size); \
} else { \
/* Add a new item in the buffer */ \
/* PUSH data in the buffer */ \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, v->data[idx].x, data); \
} else { \
M_CALL_INIT_SET(oplist, v->data[idx].x, data); \
} \
\
/* Increment production INDEX of the buffer */ \
idx++; \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK)) { \
idx = (idx == M_BUFF3R_SIZE(m_size)) ? 0 : idx; \
} \
v->idx_prod = idx; \
\
/* number[] is the only variable which can be modified by both \
the consummer thread which has the pop lock and the producer \
thread which has the push lock. As such, it is an atomic variable \
that performs a release memory barrier. */ \
/* Increment number of elements of the buffer */ \
previousSize = m_buff3r_number_inc (v->number[0], policy); \
if (M_BUFF3R_POLICY_P((policy), M_BUFFER_DEFERRED_POP)) { \
previousSize = m_buff3r_number_inc (v->number[1], policy); \
} \
/* From this point, consummer may read the data in the table */ \
} \
\
/* Producer unlock (mutex unlock performs a release memory barrier) */ \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_mutex_unlock(v->mutexPush); \
/* If the number of items in the buffer was 0, some consummer \
may be waiting. Signal to them the availibility of the data \
We cannot only signal one thread. */ \
if (previousSize == 0) { \
m_mutex_lock(v->mutexPop); \
m_cond_broadcast(v->there_is_data); \
m_mutex_unlock(v->mutexPop); \
} \
} \
\
M_BUFF3R_CONTRACT(v,m_size); \
return true; \
} \
\
M_INLINE bool \
M_F(name, _pop_blocking)(type *data, buffer_t v, bool blocking) \
{ \
M_BUFF3R_CONTRACT(v,m_size); \
M_ASSERT (data != NULL); \
\
/* Consummer lock (mutex lock performs an acquire memory barrier) */ \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_mutex_lock(v->mutexPop); \
while (M_F(name, _empty_p)(v)) { \
if (!blocking) { \
m_mutex_unlock(v->mutexPop); \
return false; \
} \
m_cond_wait(v->there_is_data, v->mutexPop); \
} \
} else if (M_F(name, _empty_p)(v)) \
return false; \
M_BUFF3R_PROTECTED_CONTRACT(policy, v, m_size); \
\
/* POP data from the buffer and update INDEX */ \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_STACK)) { \
/* FIFO queue */ \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, *data, v->data[v->idx_cons].x); \
} else { \
M_DO_INIT_MOVE (oplist, *data, v->data[v->idx_cons].x); \
} \
v->idx_cons = (v->idx_cons == M_BUFF3R_SIZE(m_size)-1) ? 0 : (v->idx_cons + 1); \
} else { \
/* STACK queue */ \
v->idx_prod --; \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, *data, v->data[v->idx_prod].x); \
} else { \
M_DO_INIT_MOVE (oplist, *data, v->data[v->idx_prod].x); \
} \
} \
\
/* number[] is the only variable which can be modified by both \
the consummer thread which has the pop lock and the producer \
thread which has the push lock. As such, it is an atomic variable \
that performs a release memory barrier. */ \
/* Decrement number of elements in the buffer */ \
size_t previousSize; \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_DEFERRED_POP)) { \
previousSize = m_buff3r_number_dec (v->number[0], policy); \
} else { \
m_buff3r_number_dec (v->number[1], policy); \
} \
/* Space may be reused by a producer thread from this point */ \
\
/* Consummer unlock (mutex unlock perfoms a release memory barrier) */ \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_THREAD_UNSAFE)) { \
m_mutex_unlock(v->mutexPop); \
/* If the number of items in the buffer was the max, some producer \
may be waiting. Signal to them the availibility of the free room \
We cannot only signal one thread. */ \
if ((!M_BUFF3R_POLICY_P((policy), M_BUFFER_DEFERRED_POP)) \
&& previousSize == M_BUFF3R_SIZE(m_size)) { \
m_mutex_lock(v->mutexPush); \
m_cond_broadcast(v->there_is_room_for_data); \
m_mutex_unlock(v->mutexPush); \
} \
} \
\
M_BUFF3R_CONTRACT(v,m_size); \
return true; \
} \
\
\
M_INLINE bool \
M_F(name, _push)(buffer_t v, type const data) \
{ \
return M_F(name, _push_blocking)(v, data, \
!M_BUFF3R_POLICY_P((policy), M_BUFFER_UNBLOCKING_PUSH)); \
} \
\
M_INLINE bool \
M_F(name, _pop)(type *data, buffer_t v) \
{ \
return M_F(name, _pop_blocking)(data, v, \
!M_BUFF3R_POLICY_P((policy), M_BUFFER_UNBLOCKING_POP)); \
} \
\
M_INLINE size_t \
M_F(name, _overwrite)(const buffer_t v) \
{ \
return v->overwrite; \
} \
\
M_INLINE size_t \
M_F(name, _capacity)(const buffer_t v) \
{ \
(void) v; /* may be unused */ \
return M_BUFF3R_SIZE(m_size); \
} \
\
M_INLINE void \
M_F(name, _pop_release)(buffer_t v) \
{ \
/* Decrement the effective number of elements in the buffer */ \
if (M_BUFF3R_POLICY_P((policy), M_BUFFER_DEFERRED_POP)) { \
size_t previousSize = m_buff3r_number_dec (v->number[0], policy); \
if (previousSize == M_BUFF3R_SIZE(m_size)) { \
m_mutex_lock(v->mutexPush); \
m_cond_broadcast(v->there_is_room_for_data); \
m_mutex_unlock(v->mutexPush); \
} \
} \
} \
/********************************** INTERNAL *********************************/
/* Definition of a a QUEUE for Many Produccer / Many Consummer
for high bandwidth scenario:
* nearly lock-free,
* quite fast
* no blocking calls.
* only queue (no stack)
* size of queue is always a power of 2
* no overwriting.
*/
/* Deferred evaluation for the definition,
so that all arguments are evaluated before further expansion */
#define M_QU3UE_MPMC_DEF_P1(arg) M_ID( M_QU3UE_MPMC_DEF_P2 arg )
/* Validate the value oplist before going further */
#define M_QU3UE_MPMC_DEF_P2(name, type, policy, oplist, buffer_t) \
M_IF_OPLIST(oplist)(M_QU3UE_MPMC_DEF_P3, M_QU3UE_MPMC_DEF_FAILURE)(name, type, policy, oplist, buffer_t)
/* Stop processing with a compilation failure */
#define M_QU3UE_MPMC_DEF_FAILURE(name, type, policy, oplist, buffer_t) \
M_STATIC_FAILURE(M_LIB_NOT_AN_OPLIST, "(QUEUE_MPMC_DEF): the given argument is not a valid oplist: " M_AS_STR(oplist))
#ifdef NDEBUG
# define M_QU3UE_MPMC_CONTRACT(v) /* nothing */
#else
# define M_QU3UE_MPMC_CONTRACT(v) do { \
M_ASSERT (v != 0); \
M_ASSERT (v->Tab != NULL); \
unsigned int _r = atomic_load(&v->ConsoIdx); \
unsigned int _w = atomic_load(&v->ProdIdx); \
_r = atomic_load(&v->ConsoIdx); \
M_ASSERT (_r > _w || _w-_r <= v->size); \
M_ASSERT (M_POWEROF2_P(v->size)); \
} while (0)
#endif
/* Define the buffer type MPMC using atomics and its functions.
- name: main prefix of the container
- type: type of an element of the buffer
- policy: the policy of the buffer
- oplist: the oplist of the type of an element of the buffer
- buffer_t: name of the buffer
*/
#define M_QU3UE_MPMC_DEF_P3(name, type, policy, oplist, buffer_t) \
M_QU3UE_MPMC_DEF_TYPE(name, type, policy, oplist, buffer_t) \
M_CHECK_COMPATIBLE_OPLIST(name, 1, type, oplist) \
M_QU3UE_MPMC_DEF_CORE(name, type, policy, oplist, buffer_t) \
M_EMPLACE_QUEUE_DEF(name, buffer_t, M_F(name, _emplace), oplist, M_EMPLACE_QUEUE_GENE)
/* Define the type of a MPMC queue */
#define M_QU3UE_MPMC_DEF_TYPE(name, type, policy, oplist, buffer_t) \
\
/* The sequence number of an element will be equal to either \
- 2* the index of the production which creates it, \
- 1 + 2* the index of the consumption which consummes it \
In case of wrapping, as there is no order comparison but only \
equal comparison, there is no special issue. \
Each element is put in a separate cache line to avoid false \
sharing by multiple writing threads. \
*/ \
typedef struct M_F(name, _el_s) { \
atomic_uint seq; /* Can only increase until wrapping */ \
type x; \
M_CACHELINE_ALIGN(align, atomic_uint, type); \
} M_F(name, _el_ct); \
\
/* If there is only one producer and one consummer, then they won't \
typically use the same cache line, increasing performance. */ \
typedef struct M_F(name, _s) { \
atomic_uint ProdIdx; /* Can only increase until wrapping */ \
M_CACHELINE_ALIGN(align1, atomic_uint); \
atomic_uint ConsoIdx; /* Can only increase until wrapping */ \
M_CACHELINE_ALIGN(align2, atomic_uint); \
M_F(name, _el_ct) *Tab; \
unsigned int size; \
} buffer_t[1]; \
\
typedef type M_F(name, _subtype_ct); \
typedef buffer_t M_F(name, _ct); \
/* Define the core functionnalities of a MPMC queue */
#define M_QU3UE_MPMC_DEF_CORE(name, type, policy, oplist, buffer_t) \
M_INLINE bool \
M_F(name, _push)(buffer_t table, type const x) \
{ \
M_QU3UE_MPMC_CONTRACT(table); \
unsigned int idx = atomic_load_explicit(&table->ProdIdx, \
memory_order_relaxed); \
const unsigned int i = idx & (table->size -1); \
const unsigned int seq = atomic_load_explicit(&table->Tab[i].seq, \
memory_order_acquire); \
if (M_UNLIKELY (2*(idx - table->size) + 1 != seq)) { \
/* Buffer full (or unlikely preemption). Can not push */ \
return false; \
} \
if (M_UNLIKELY (!atomic_compare_exchange_strong_explicit(&table->ProdIdx, \
&idx, idx+1, memory_order_relaxed, memory_order_relaxed))) { \
/* Thread has been preempted by another one. */ \
return false; \
} \
/* If it is interrupted here, it may block all pop methods (not push) \
even if there is other threads that have pushed data later in the \
queue as all pop threads will try to enqueue this particular element \
but always fail. The won't try to enqueue other elements. \
As such, this queue is not strictly lock-free.*/ \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, table->Tab[i].x, x); \
} else { \
M_CALL_INIT_SET(oplist, table->Tab[i].x, x); \
} \
/* Finish transaction */ \
atomic_store_explicit(&table->Tab[i].seq, 2*idx, memory_order_release); \
M_QU3UE_MPMC_CONTRACT(table); \
return true; \
} \
\
M_INLINE bool \
M_F(name, _pop)(type *ptr, buffer_t table) \
{ \
M_QU3UE_MPMC_CONTRACT(table); \
M_ASSERT (ptr != NULL); \
unsigned int iC = atomic_load_explicit(&table->ConsoIdx, \
memory_order_relaxed); \
const unsigned int i = (iC & (table->size -1)); \
const unsigned int seq = atomic_load_explicit(&table->Tab[i].seq, \
memory_order_acquire); \
if (seq != 2 * iC) { \
/* Nothing in buffer to consumme (or unlikely preemption) */ \
return false; \
} \
if (M_UNLIKELY (!atomic_compare_exchange_strong_explicit(&table->ConsoIdx, \
&iC, iC+1, memory_order_relaxed, memory_order_relaxed))) { \
/* Thread has been preempted by another one */ \
return false; \
} \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, *ptr, table->Tab[i].x); \
} else { \
M_DO_INIT_MOVE (oplist, *ptr, table->Tab[i].x); \
} \
atomic_store_explicit(&table->Tab[i].seq, 2*iC + 1, memory_order_release); \
M_QU3UE_MPMC_CONTRACT(table); \
return true; \
} \
\
M_INLINE void \
M_F(name, _init)(buffer_t buffer, size_t size) \
{ \
M_ASSERT (buffer != NULL); \
M_ASSERT( M_POWEROF2_P(size)); \
M_ASSERT (0 < size && size <= UINT_MAX); \
M_ASSERT(((policy) & (M_BUFFER_STACK|M_BUFFER_THREAD_UNSAFE|M_BUFFER_PUSH_OVERWRITE)) == 0); \
atomic_init(&buffer->ProdIdx, (unsigned int) size); \
atomic_init(&buffer->ConsoIdx, (unsigned int) size); \
buffer->size = (unsigned int) size; \
buffer->Tab = M_CALL_REALLOC(oplist, M_F(name, _el_ct), NULL, size); \
if (M_UNLIKELY_NOMEM (buffer->Tab == NULL)) { \
M_MEMORY_FULL (size*sizeof(M_F(name, _el_ct) )); \
return; \
} \
for(unsigned int j = 0; j < (unsigned int) size; j++) { \
atomic_init(&buffer->Tab[j].seq, 2*j+1U); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_INIT(oplist, buffer->Tab[j].x); \
} \
} \
M_QU3UE_MPMC_CONTRACT(buffer); \
} \
\
M_INLINE void \
M_F(name, _clear)(buffer_t buffer) \
{ \
M_QU3UE_MPMC_CONTRACT(buffer); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
for(unsigned int j = 0; j < buffer->size; j++) { \
M_CALL_CLEAR(oplist, buffer->Tab[j].x); \
} \
} else { \
unsigned int iP = atomic_load_explicit(&buffer->ProdIdx, memory_order_relaxed); \
unsigned int i = iP & (buffer->size -1); \
unsigned int iC = atomic_load_explicit(&buffer->ConsoIdx, memory_order_relaxed); \
unsigned int j = iC & (buffer->size -1); \
while (i != j) { \
M_CALL_CLEAR(oplist, buffer->Tab[j].x); \
j++; \
if (j >= buffer->size) \
j = 0; \
} \
} \
M_CALL_FREE(oplist, buffer->Tab); \
buffer->Tab = NULL; /* safer */ \
buffer->size = 3; \
} \
\
M_INLINE size_t \
M_F(name, _size)(buffer_t table) \
{ \
M_QU3UE_MPMC_CONTRACT(table); \
const unsigned int iC = atomic_load_explicit(&table->ConsoIdx, memory_order_relaxed); \
const unsigned int iP = atomic_load_explicit(&table->ProdIdx, memory_order_acquire); \
/* We return an approximation as we can't read both iC & iP atomically \
As we read producer index after consummer index, \
and they are atomic variables without reordering \
producer index is always greater or equal than consumer index \
(or on overflow occurs, in which case as we compute with modulo \
arithmetic, the right result is computed). \
We may return a result which is greater than the size of the queue \
if the function is interrupted a long time between reading iC & \
iP. the function is not protected against it. \
*/ \
return iP-iC; \
} \
\
M_INLINE size_t \
M_F(name, _capacity)(buffer_t v) \
{ \
M_QU3UE_MPMC_CONTRACT(v); \
return v->size; \
} \
\
M_INLINE bool \
M_F(name, _empty_p)(buffer_t v) \
{ \
return M_F(name, _size) (v) == 0; \
} \
\
M_INLINE bool \
M_F(name, _full_p)(buffer_t v) \
{ \
return M_F(name, _size)(v) >= v->size; \
} \
/********************************** INTERNAL *********************************/
/* Definition of a a QUEUE for Single Producer / Single Consummer
for high bandwidth scenario:
* wait-free,
* quite fast
* no blocking calls.
* only queue (no stack)
* size of queue is always a power of 2
* no overwriting.
*/
/* Deferred evaluation for the definition,
so that all arguments are evaluated before further expansion */
#define M_QU3UE_SPSC_DEF_P1(arg) M_ID( M_QU3UE_SPSC_DEF_P2 arg )
/* Validate the value oplist before going further */
#define M_QU3UE_SPSC_DEF_P2(name, type, policy, oplist, buffer_t) \
M_IF_OPLIST(oplist)(M_QU3UE_SPSC_DEF_P3, M_QU3UE_SPSC_DEF_FAILURE)(name, type, policy, oplist, buffer_t)
/* Stop processing with a compilation failure */
#define M_QU3UE_SPSC_DEF_FAILURE(name, type, policy, oplist, buffer_t) \
M_STATIC_FAILURE(M_LIB_NOT_AN_OPLIST, "(QUEUE_SPSC_DEF): the given argument is not a valid oplist: " M_AS_STR(oplist))
#ifdef NDEBUG
#define M_QU3UE_SPSC_CONTRACT(table) do { } while (0)
#else
#define M_QU3UE_SPSC_CONTRACT(table) do { \
M_ASSERT (table != NULL); \
unsigned int _r = atomic_load(&table->consoIdx); \
unsigned int _w = atomic_load(&table->prodIdx); \
/* Due to overflow we don't have M_ASSERT (_r <= _w); */ \
_r = atomic_load(&table->consoIdx); \
M_ASSERT (_r > _w || _w-_r <= table->size); \
M_ASSERT (M_POWEROF2_P(table->size)); \
} while (0)
#endif
/* Define the buffer type SPSC using atomics and its functions.
- name: main prefix of the container
- type: type of an element of the buffer
- policy: the policy of the buffer
- oplist: the oplist of the type of an element of the buffer
- buffer_t: name of the buffer
*/
#define M_QU3UE_SPSC_DEF_P3(name, type, policy, oplist, buffer_t) \
M_QU3UE_SPSC_DEF_TYPE(name, type, policy, oplist, buffer_t) \
M_CHECK_COMPATIBLE_OPLIST(name, 1, type, oplist) \
M_QU3UE_SPSC_DEF_CORE(name, type, policy, oplist, buffer_t) \
M_EMPLACE_QUEUE_DEF(name, buffer_t, M_F(name, _emplace), oplist, M_EMPLACE_QUEUE_GENE)
/* Define the type of a SPSC queue */
#define M_QU3UE_SPSC_DEF_TYPE(name, type, policy, oplist, buffer_t) \
\
/* Single producer / Single consummer \
So, only one thread will write in this table. The other thread \
will only read. As such, there is no concurrent write, and no \
need to align the structure for best performance. */ \
typedef struct M_F(name, _el_s) { \
type x; \
} M_F(name, _el_ct); \
\
typedef struct M_F(name, _s) { \
atomic_uint consoIdx; /* Can only increase until overflow */ \
unsigned int size; \
M_F(name, _el_ct) *Tab; \
M_CACHELINE_ALIGN(align, atomic_uint, size_t, M_F(name, _el_ct) *); \
atomic_uint prodIdx; /* Can only increase until overflow */ \
} buffer_t[1]; \
\
typedef type M_F(name, _subtype_ct); \
typedef buffer_t M_F(name, _ct); \
/* Define the core functionnalities of a SPSC queue */
#define M_QU3UE_SPSC_DEF_CORE(name, type, policy, oplist, buffer_t) \
\
M_INLINE bool \
M_F(name, _push)(buffer_t table, type const x) \
{ \
M_QU3UE_SPSC_CONTRACT(table); \
unsigned int r = atomic_load_explicit(&table->consoIdx, \
memory_order_relaxed); \
unsigned int w = atomic_load_explicit(&table->prodIdx, \
memory_order_acquire); \
if (w-r >= table->size) \
return false; \
unsigned int i = w & (table->size -1); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, table->Tab[i].x, x); \
} else { \
M_CALL_INIT_SET(oplist, table->Tab[i].x, x); \
} \
atomic_store_explicit(&table->prodIdx, w+1, memory_order_release); \
M_QU3UE_SPSC_CONTRACT(table); \
return true; \
} \
\
M_INLINE bool \
M_F(name, _push_move)(buffer_t table, type *x) \
{ \
M_QU3UE_SPSC_CONTRACT(table); \
unsigned int r = atomic_load_explicit(&table->consoIdx, \
memory_order_relaxed); \
unsigned int w = atomic_load_explicit(&table->prodIdx, \
memory_order_acquire); \
if (w-r >= table->size) \
return false; \
unsigned int i = w & (table->size -1); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_DO_MOVE(oplist, table->Tab[i].x, *x); \
} else { \
M_DO_INIT_MOVE(oplist, table->Tab[i].x, *x); \
} \
atomic_store_explicit(&table->prodIdx, w+1, memory_order_release); \
M_QU3UE_SPSC_CONTRACT(table); \
return true; \
} \
\
M_INLINE bool \
M_F(name, _pop)(type *ptr, buffer_t table) \
{ \
M_QU3UE_SPSC_CONTRACT(table); \
M_ASSERT (ptr != NULL); \
unsigned int w = atomic_load_explicit(&table->prodIdx, \
memory_order_relaxed); \
unsigned int r = atomic_load_explicit(&table->consoIdx, \
memory_order_acquire); \
if (w-r == 0) \
return false; \
unsigned int i = r & (table->size -1); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, *ptr , table->Tab[i].x); \
} else { \
M_DO_INIT_MOVE (oplist, *ptr, table->Tab[i].x); \
} \
atomic_store_explicit(&table->consoIdx, r+1, memory_order_release); \
M_QU3UE_SPSC_CONTRACT(table); \
return true; \
} \
\
M_INLINE unsigned \
M_F(name, _push_bulk)(buffer_t table, unsigned n, type const x[]) \
{ \
M_QU3UE_SPSC_CONTRACT(table); \
M_ASSERT (x != NULL); \
M_ASSERT (n <= table->size); \
unsigned int r = atomic_load_explicit(&table->consoIdx, \
memory_order_relaxed); \
unsigned int w = atomic_load_explicit(&table->prodIdx, \
memory_order_acquire); \
unsigned int max = M_MIN(n, table->size - (w-r) ); \
if (max == 0) \
return 0; \
for(unsigned int k = 0; k < max; k++) { \
unsigned int i = (w+k) & (table->size -1); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, table->Tab[i].x, x[k]); \
} else { \
M_CALL_INIT_SET(oplist, table->Tab[i].x, x[k]); \
} \
} \
atomic_store_explicit(&table->prodIdx, w+max, memory_order_release); \
M_QU3UE_SPSC_CONTRACT(table); \
return max; \
} \
\
M_INLINE unsigned \
M_F(name, _pop_bulk)(unsigned int n, type ptr[], buffer_t table) \
{ \
M_QU3UE_SPSC_CONTRACT(table); \
M_ASSERT (ptr != NULL); \
M_ASSERT (n <= table->size); \
unsigned int w = atomic_load_explicit(&table->prodIdx, \
memory_order_relaxed); \
unsigned int r = atomic_load_explicit(&table->consoIdx, \
memory_order_acquire); \
if (w-r == 0) \
return 0; \
unsigned int max = M_MIN(w-r, n); \
for(unsigned int k = 0; k < max; k++) { \
unsigned int i = (r+k) & (table->size -1); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, ptr[k], table->Tab[i].x); \
} else { \
M_DO_INIT_MOVE (oplist, ptr[k], table->Tab[i].x); \
} \
} \
atomic_store_explicit(&table->consoIdx, r+max, memory_order_release); \
M_QU3UE_SPSC_CONTRACT(table); \
return max; \
} \
\
M_INLINE void \
M_F(name, _push_force)(buffer_t table, type const x) \
{ \
M_QU3UE_SPSC_CONTRACT(table); \
unsigned int r = atomic_load_explicit(&table->consoIdx, \
memory_order_relaxed); \
unsigned int w = atomic_load_explicit(&table->prodIdx, \
memory_order_acquire); \
/* If no place in queue, try to skip the last one */ \
while (w-r >= table->size) { \
bool b = atomic_compare_exchange_strong(&table->consoIdx, &r, r+1); \
r += b; \
} \
unsigned int i = w & (table->size -1); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
M_CALL_SET(oplist, table->Tab[i].x, x); \
} else { \
M_CALL_INIT_SET(oplist, table->Tab[i].x, x); \
} \
atomic_store_explicit(&table->prodIdx, w+1, memory_order_release); \
M_QU3UE_SPSC_CONTRACT(table); \
} \
\
M_INLINE size_t \
M_F(name, _size)(buffer_t table) \
{ \
M_QU3UE_SPSC_CONTRACT(table); \
unsigned int r = atomic_load_explicit(&table->consoIdx, \
memory_order_relaxed); \
unsigned int w = atomic_load_explicit(&table->prodIdx, \
memory_order_acquire); \
/* We return an approximation as we can't read both r & w atomically \
As we read producer index after consummer index, \
and they are atomic variables without reordering \
producer index is always greater or equal than consumer index \
(or on overflow occurs, in which case as we compute with modulo \
arithmetic, the right result is computed). \
We may return a result which is greater than the size of the queue \
if the function is interrupted a long time between reading the \
indexs. The function is not protected against it. \
*/ \
return w-r; \
} \
\
M_INLINE size_t \
M_F(name, _capacity)(buffer_t v) \
{ \
return v->size; \
} \
\
M_INLINE bool \
M_F(name, _empty_p)(buffer_t v) \
{ \
return M_F(name, _size) (v) == 0; \
} \
\
M_INLINE bool \
M_F(name, _full_p)(buffer_t v) \
{ \
return M_F(name, _size)(v) >= v->size; \
} \
\
M_INLINE void \
M_F(name, _init)(buffer_t buffer, size_t size) \
{ \
M_ASSERT (buffer != NULL); \
M_ASSERT( M_POWEROF2_P(size)); \
M_ASSERT (0 < size && size <= UINT_MAX); \
M_ASSERT(((policy) & (M_BUFFER_STACK|M_BUFFER_THREAD_UNSAFE|M_BUFFER_PUSH_OVERWRITE)) == 0); \
atomic_init(&buffer->prodIdx, (unsigned int) size); \
atomic_init(&buffer->consoIdx, (unsigned int) size); \
buffer->size = (unsigned int) size; \
buffer->Tab = M_CALL_REALLOC(oplist, M_F(name, _el_ct), NULL, size); \
if (M_UNLIKELY_NOMEM (buffer->Tab == NULL)) { \
M_MEMORY_FULL (size*sizeof(M_F(name, _el_ct) )); \
return; \
} \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
for(unsigned int j = 0; j < (unsigned int) size; j++) { \
M_CALL_INIT(oplist, buffer->Tab[j].x); \
} \
} \
M_QU3UE_SPSC_CONTRACT(buffer); \
} \
\
M_INLINE void \
M_F(name, _clear)(buffer_t buffer) \
{ \
M_QU3UE_SPSC_CONTRACT(buffer); \
if (!M_BUFF3R_POLICY_P((policy), M_BUFFER_PUSH_INIT_POP_MOVE)) { \
for(unsigned int j = 0; j < buffer->size; j++) { \
M_CALL_CLEAR(oplist, buffer->Tab[j].x); \
} \
} else { \
unsigned int iP = atomic_load_explicit(&buffer->prodIdx, memory_order_relaxed); \
unsigned int i = iP & (buffer->size -1); \
unsigned int iC = atomic_load_explicit(&buffer->consoIdx, memory_order_relaxed); \
unsigned int j = iC & (buffer->size -1); \
while (i != j) { \
M_CALL_CLEAR(oplist, buffer->Tab[j].x); \
j++; \
if (j >= buffer->size) \
j = 0; \
} \
} \
M_CALL_FREE(oplist, buffer->Tab); \
buffer->Tab = NULL; /* safer */ \
buffer->size = 3; \
} \
/********************************** INTERNAL *********************************/
/* Deferred evaluation for the definition,
so that all arguments are evaluated before further expansion */
#define M_BUFF3R_OPLIST_P1(arg) M_BUFF3R_OPLIST_P2 arg
/* Validation of the given oplist */
#define M_BUFF3R_OPLIST_P2(name, oplist) \
M_IF_OPLIST(oplist)(M_BUFF3R_OPLIST_P3, M_BUFF3R_OPLIST_FAILURE)(name, oplist)
/* Prepare a clean compilation failure */
#define M_BUFF3R_OPLIST_FAILURE(name, oplist) \
((M_LIB_ERROR(ARGUMENT_OF_BUFFER_OPLIST_IS_NOT_AN_OPLIST, name, oplist)))
/* OPLIST definition of a buffer */
#define M_BUFF3R_OPLIST_P3(name, oplist) \
(INIT(M_C3(m_buff3r_,name, _init)) \
,INIT_SET(M_F(name, _init_set)) \
,SET(M_F(name, _set)) \
,CLEAR(M_F(name, _clear)) \
,NAME(name) \
,TYPE(M_F(name,_ct)) \
,SUBTYPE(M_F(name, _subtype_ct)) \
,RESET(M_F(name,_reset)) \
,PUSH(M_F(name,_push)) \
,POP(M_F(name,_pop)) \
,OPLIST(oplist) \
,EMPTY_P(M_F(name, _empty_p)), \
,GET_SIZE(M_F(name, _size)) \
)
/********************************** INTERNAL *********************************/
#if M_USE_SMALL_NAME
#define BUFFER_DEF M_BUFFER_DEF
#define BUFFER_DEF_AS M_BUFFER_DEF_AS
#define BUFFER_OPLIST M_BUFFER_OPLIST
#define QUEUE_MPMC_DEF M_QUEUE_MPMC_DEF
#define QUEUE_MPMC_DEF_AS M_QUEUE_MPMC_DEF_AS
#define QUEUE_SPSC_DEF M_QUEUE_SPSC_DEF
#define QUEUE_SPSC_DEF_AS M_QUEUE_SPSC_DEF_AS
#define buffer_policy_e m_buffer_policy_e
#define BUFFER_QUEUE M_BUFFER_QUEUE
#define BUFFER_STACK M_BUFFER_STACK
#define BUFFER_BLOCKING_PUSH M_BUFFER_BLOCKING_PUSH
#define BUFFER_UNBLOCKING_PUSH M_BUFFER_UNBLOCKING_PUSH
#define BUFFER_BLOCKING_POP M_BUFFER_BLOCKING_POP
#define BUFFER_UNBLOCKING_POP M_BUFFER_UNBLOCKING_POP
#define BUFFER_BLOCKING M_BUFFER_BLOCKING
#define BUFFER_UNBLOCKING M_BUFFER_UNBLOCKING
#define BUFFER_THREAD_SAFE M_BUFFER_THREAD_SAFE
#define BUFFER_THREAD_UNSAFE M_BUFFER_THREAD_UNSAFE
#define BUFFER_PUSH_INIT_POP_MOVE M_BUFFER_PUSH_INIT_POP_MOVE
#define BUFFER_PUSH_OVERWRITE M_BUFFER_PUSH_OVERWRITE
#define BUFFER_DEFERRED_POP M_BUFFER_DEFERRED_POP
#endif
#endif