Tactility/libs/mlib/m-worker.h
2024-01-17 21:45:57 +01:00

699 lines
30 KiB
C++

/*
* M*LIB / WORKER - Extra worker interface
*
* Copyright (c) 2017-2023, Patrick Pelissier
* All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* + Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* + Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef MSTARLIB_WORKER_H
#define MSTARLIB_WORKER_H
/* The User Code can define M_USE_WORKER to 0 to disable the use of workers.
The macros / functions are then defined to only use one core.
By default, the behavior is to use workers.
*/
#ifndef M_USE_WORKER
# define M_USE_WORKER 1
#endif
#if M_USE_WORKER
#include "m-atomic.h"
#include "m-buffer.h"
#include "m-thread.h"
/* Include needed system header for detection of how many cores are available in the system */
#if defined(_WIN32)
# include <sysinfoapi.h>
#elif (defined(__APPLE__) && defined(__MACH__)) \
|| defined(__DragonFly__) || defined(__FreeBSD__) \
|| defined(__NetBSD__) || defined(__OpenBSD__)
# include <sys/param.h>
# include <sys/sysctl.h>
# define M_USE_WORKER_SYSCTL 1
#else
# include <unistd.h>
#endif
/* Support for CLANG block since CLANG doesn't support nested function.
M-WORKER uses its 'blocks' extension instead, but it is not compatible
with function.
So you need to compile with "-fblocks" and link with "-lBlocksRuntime"
if you use clang & want to use the MACRO version.
if C++, it will use Lambda function (and std::function) instead
(It doesn't support pre-C++11 compiler).
Otherwise go with nested function (GCC) for the MACRO version.
This behavior can be overriden by User Code by defining to 1 or 0 the
following macros:
* M_USE_WORKER_CPP_FUNCTION
* M_USE_WORKER_CLANG_BLOCK
*/
#if defined(__cplusplus) && !defined(M_USE_WORKER_CPP_FUNCTION)
# define M_USE_WORKER_CPP_FUNCTION 1
# include <functional>
#elif defined(__has_extension) && !defined(M_USE_WORKER_CLANG_BLOCK)
# if __has_extension(blocks)
# define M_USE_WORKER_CLANG_BLOCK 1
# endif
#endif
#ifndef M_USE_WORKER_CLANG_BLOCK
# define M_USE_WORKER_CLANG_BLOCK 0
#endif
#ifndef M_USE_WORKER_CPP_FUNCTION
# define M_USE_WORKER_CPP_FUNCTION 0
#endif
/* Control that not both options are selected at the same time.
Note: there are not really incompatible, but if we use C++ we shall go to
lambda directly (there is no need to support blocks). */
#if M_USE_WORKER_CLANG_BLOCK && M_USE_WORKER_CPP_FUNCTION
# error M_USE_WORKER_CPP_FUNCTION and M_USE_WORKER_CLANG_BLOCK are both defined. This is not supported.
#endif
M_BEGIN_PROTECTED_CODE
/* Definition of a work order */
typedef struct m_work3r_order_s {
struct m_worker_sync_s *block; // Reference to the shared Synchronization block
void * data; // The work order data
void (*func) (void *data); // The work order function (for GCC)
#if M_USE_WORKER_CLANG_BLOCK
void (^blockFunc)(void *data); // The work order function (block for clang)
#endif
#if M_USE_WORKER_CPP_FUNCTION
std::function<void(void*)> function; // The work order function (for C++)
#endif
} m_work3r_order_ct;
/* Define the macros needed to initialize an order.
* * MACRO to be used to send an empty order to stop the thread
* * MACRO to complete the not-used fields
*/
#if M_USE_WORKER_CLANG_BLOCK || M_USE_WORKER_CPP_FUNCTION
# define M_WORK3R_EMPTY_ORDER { NULL, NULL, NULL, NULL }
# define M_WORK3R_EXTRA_ORDER , NULL
#else
# define M_WORK3R_EMPTY_ORDER { NULL, NULL, NULL }
# define M_WORK3R_EXTRA_ORDER
#endif
/* As it is C++, it uses std::function, M_POD_OPLIST
is not sufficient for initialization of the structure.
So let's use C++ constructor, destructor and copy constructor */
#if M_USE_WORKER_CPP_FUNCTION
# define M_WORK3R_CPP_INIT(x) (new (&(x)) m_work3r_order_ct())
# define M_WORK3R_CPP_INIT_SET(x, y) (new (&(x)) m_work3r_order_ct(y))
# define M_WORK3R_CPP_SET(x, y) ((x) = (y))
# define M_WORK3R_CPP_CLEAR(x) ((&(x))->~m_work3r_order_ct())
# define M_WORK3R_CPP_INIT_MOVE(x,y) (new (&(x)) m_work3r_order_ct(y), ((&(y))->~m_work3r_order_ct()))
# define M_WORK3R_OPLIST \
(INIT(M_WORK3R_CPP_INIT), INIT_SET(M_WORK3R_CPP_INIT_SET), \
SET(M_WORK3R_CPP_SET), CLEAR(M_WORK3R_CPP_CLEAR), INIT_MOVE(M_WORK3R_CPP_INIT_MOVE) )
#else
# define M_WORK3R_OPLIST M_POD_OPLIST
#endif
/* Definition of the identity of a worker thread */
typedef struct m_work3r_thread_s {
m_thread_t id;
} m_work3r_thread_ct;
/* Definition of the queue that will record the work orders */
BUFFER_DEF(m_work3r_queue, m_work3r_order_ct, 0,
BUFFER_QUEUE|BUFFER_UNBLOCKING_PUSH|BUFFER_BLOCKING_POP|BUFFER_THREAD_SAFE|BUFFER_DEFERRED_POP, M_WORK3R_OPLIST)
/* Definition the global pool of workers */
typedef struct m_worker_s {
/* The work order queue */
m_work3r_queue_t queue_g;
/* The table of available workers */
m_work3r_thread_ct *worker;
/* Number of workers in the table */
unsigned int numWorker_g;
/* The global reset function */
void (*resetFunc_g)(void);
/* The global clear function */
void (*clearFunc_g)(void);
m_mutex_t lock;
m_cond_t a_thread_ends; // EVENT: A worker has ended
} m_worker_t[1];
/* Definition of the synchronization point for workers */
typedef struct m_worker_sync_s {
atomic_int num_spawn; // Number of spawned workers accord this synchronization point
atomic_int num_terminated_spawn; // Number of terminated spawned workers
struct m_worker_s *worker; // Reference to the pool of workers
} m_worker_sync_t[1];
/* Extend m_worker_spawn by defining a specialization function
with the given arguments.
Generate the needed encapsulation for the user.
USAGE: name, oplists of arguments */
#define M_WORKER_SPAWN_DEF2(name, ...) \
M_BEGIN_PROTECTED_CODE \
M_WORK3R_SPAWN_EXTEND_P1( (name, M_MAP_C(M_WORK3R_SPAWN_EXTEND_P0, __VA_ARGS__) ) ) \
M_END_PROTECTED_CODE
/* Output a valid oplist with the given type.
input is (fieldname, type) or (fieldname, type, oplist)
Output shall be : M_OPEXTEND(M_GLOBAL_OPLIST_OR_DEF(type_or_oplist)(), TYPE(type)) / M_OPEXTEND(oplist, TYPE(type))
*/
#define M_WORK3R_SPAWN_EXTEND_P0(...) M_BY_NARGS(M_WORK3R_SPAWN_EXTEND_P0, M_ID __VA_ARGS__) __VA_ARGS__
#define M_WORK3R_SPAWN_EXTEND_P0__2(field, type) M_OPEXTEND(M_GLOBAL_OPLIST_OR_DEF(type)(), TYPE(type))
#define M_WORK3R_SPAWN_EXTEND_P0__3(field, type, oplist) M_IF_OPLIST(oplist)(M_WORK3R_SPAWN_EXTEND_P0__3_OK, M_WORK3R_SPAWN_EXTEND_P0__3_KO)(field, type, oplist)
#define M_WORK3R_SPAWN_EXTEND_P0__3_OK(field, type, oplist) M_OPEXTEND(oplist, TYPE(type))
#define M_WORK3R_SPAWN_EXTEND_P0__3_KO(field, type, oplist) \
M_STATIC_FAILURE(M_LIB_NOT_AN_OPLIST, "(M_WORKER_SPAWN_EXTEND): the argument is not a valid oplist: " M_MAP(M_AS_STR, oplist))
/* Deferred evaluation for the definition,
so that all arguments are evaluated before further expansion */
#define M_WORK3R_SPAWN_EXTEND_P1(arg) M_ID( M_WORK3R_SPAWN_EXTEND_P2 arg )
/* Validate the oplist before going further */
#define M_WORK3R_SPAWN_EXTEND_P2(name, ...) \
M_IF(M_REDUCE(M_OPLIST_P, M_AND, __VA_ARGS__)) \
(M_WORK3R_SPAWN_EXTEND_P3, M_WORK3R_SPAWN_EXTEND_FAILURE)(name, __VA_ARGS__)
/* Stop processing with a compilation failure */
#define M_WORK3R_SPAWN_EXTEND_FAILURE(name, ...) \
M_STATIC_FAILURE(M_LIB_NOT_AN_OPLIST, \
"(M_WORKER_SPAWN_EXTEND): at least one of the given argument is not a valid oplist: " \
M_MAP(M_AS_STR, __VA_ARGS__))
/* Define the extension of spawn */
#define M_WORK3R_SPAWN_EXTEND_P3(name, ...) \
M_WORK3R_SPAWN_EXTEND_DEF_TYPE(name, __VA_ARGS__) \
M_WORK3R_SPAWN_EXTEND_DEF_CALLBACK(name, __VA_ARGS__) \
M_WORK3R_SPAWN_EXTEND_DEF_EMPLACE(name, __VA_ARGS__) \
/* Define the type */
#define M_WORK3R_SPAWN_EXTEND_DEF_TYPE(name, ...) \
typedef void (*M_C3(m_worker_,name, _callback_ct))(M_MAP_C(M_WORK3R_SPAWN_EXTEND_DEF_TYPE_TYPE, __VA_ARGS__)); \
struct M_C3(m_worker_, name, _s){ \
M_C3(m_worker_, name, _callback_ct) callback; \
M_MAP3(M_WORK3R_SPAWN_EXTEND_DEF_TYPE_FIELD, data, __VA_ARGS__) \
};
#define M_WORK3R_SPAWN_EXTEND_DEF_TYPE_FIELD(data, num, oplist) \
M_GET_TYPE oplist M_C(field, num);
#define M_WORK3R_SPAWN_EXTEND_DEF_TYPE_TYPE(oplist) \
M_GET_TYPE oplist
/* Define the callback */
#define M_WORK3R_SPAWN_EXTEND_DEF_CALLBACK(name, ...) \
M_INLINE void \
M_C3(m_work3r_, name, _clear)(struct M_C3(m_worker_, name, _s) *p) \
{ \
M_MAP3(M_WORK3R_SPAWN_EXTEND_DEF_CALLBACK_CLEAR, data, __VA_ARGS__) \
/* TODO: Overload */ \
M_MEMORY_DEL(p); \
} \
\
M_INLINE void \
M_C3(m_work3r_, name, _callback)(void *data) \
{ \
struct M_C3(m_worker_, name, _s) *p = (struct M_C3(m_worker_, name, _s) *) data; \
(*p->callback)( \
M_MAP3_C(M_WORK3R_SPAWN_EXTEND_DEF_CALLBACK_FIELD, data, __VA_ARGS__) \
); \
M_C3(m_work3r_, name, _clear)(p); \
}
#define M_WORK3R_SPAWN_EXTEND_DEF_CALLBACK_FIELD(data, num, oplist) \
p->M_C(field, num)
#define M_WORK3R_SPAWN_EXTEND_DEF_CALLBACK_CLEAR(data, num, oplist) \
M_CALL_CLEAR(oplist, p->M_C(field, num)) ;
/* Define the emplace like spawn method */
#define M_WORK3R_SPAWN_EXTEND_DEF_EMPLACE(name, ...) \
M_INLINE void \
M_C(m_worker_spawn_, name)(m_worker_sync_t block, M_C3(m_worker_, name, _callback_ct) callback, \
M_MAP3_C(M_WORK3R_SPAWN_EXTEND_DEF_EMPLACE_FIELD, data, __VA_ARGS__) \
) \
{ \
if (!m_work3r_queue_full_p(block->worker->queue_g)) { \
struct M_C3(m_worker_, name, _s) *p = M_MEMORY_ALLOC ( struct M_C3(m_worker_, name, _s)); \
if (M_UNLIKELY_NOMEM(p == NULL)) { \
M_MEMORY_FULL(sizeof (struct M_C3(m_worker_, name, _s))); \
} \
p->callback = callback; \
M_MAP3(M_WORK3R_SPAWN_EXTEND_DEF_EMPLACE_FIELD_COPY, data, __VA_ARGS__) \
const m_work3r_order_ct w = { block, p, M_C3(m_work3r_, name, _callback) M_WORK3R_EXTRA_ORDER }; \
if (m_work3r_queue_push (block->worker->queue_g, w) == true) { \
atomic_fetch_add (&block->num_spawn, 1); \
return; \
} \
/* No worker available now. Call the function ourself */ \
/* But before clear the allocated data */ \
M_C3(m_work3r_, name, _clear)(p); \
} \
/* No worker available. Call the function ourself */ \
(*callback) ( \
M_MAP3_C(M_WORK3R_SPAWN_EXTEND_DEF_EMPLACE_FIELD_ALONE, data, __VA_ARGS__) \
); \
}
#define M_WORK3R_SPAWN_EXTEND_DEF_EMPLACE_FIELD(data, num, oplist) \
M_GET_TYPE oplist M_C(param, num)
#define M_WORK3R_SPAWN_EXTEND_DEF_EMPLACE_FIELD_COPY(data, num, oplist) \
M_CALL_INIT_SET(oplist, p-> M_C(field, num), M_C(param, num) );
#define M_WORK3R_SPAWN_EXTEND_DEF_EMPLACE_FIELD_ALONE(data, num, oplist) \
M_C(param, num)
/* Return the number of CPU cores available in the system.
Works for WINDOWS, MACOS, *BSD, LINUX.
*/
M_INLINE int
m_work3r_get_cpu_count(void)
{
#if defined(_WIN32)
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
M_ASSERT(sysinfo.dwNumberOfProcessors <= INT_MAX);
return (int) sysinfo.dwNumberOfProcessors;
#elif defined(M_USE_WORKER_SYSCTL)
int nm[2];
int count = 0;
size_t len = sizeof (count);
nm[0] = CTL_HW;
nm[1] = HW_NCPU;
sysctl(nm, 2, &count, &len, NULL, 0);
return M_MAX(1, count);
#elif defined (_SC_NPROCESSORS_ONLN)
return (int) sysconf(_SC_NPROCESSORS_ONLN);
#elif defined (_SC_NPROCESSORS_CONF)
return (int) sysconf(_SC_NPROCESSORS_CONF);
#else
return 1;
#endif
}
// (INTERNAL) Debug support for workers
#if 1
#define M_WORK3R_DEBUG(...) (void) 0
#else
#define M_WORK3R_DEBUG(...) printf(__VA_ARGS__)
#endif
/* Execute the registered work order **synchronously** */
M_INLINE void
m_work3r_exec(m_work3r_order_ct *w)
{
M_ASSERT (w!= NULL && w->block != NULL);
M_WORK3R_DEBUG ("Starting thread with data %p\n", w->data);
#if M_USE_WORKER_CLANG_BLOCK
M_WORK3R_DEBUG ("Running %s f=%p b=%p\n", (w->func == NULL) ? "Blocks" : "Function", w->func, w->blockFunc);
if (w->func == NULL)
w->blockFunc(w->data);
else
#endif
#if M_USE_WORKER_CPP_FUNCTION
M_WORK3R_DEBUG ("Running %s f=%p b=%p\n", (w->function == NULL) ? "Lambda" : "Function", w->func, w->blockFunc);
if (w->function)
w->function(w->data);
else
#endif
w->func(w->data);
/* Increment the number of terminated work order for the synchronous point */
atomic_fetch_add (&w->block->num_terminated_spawn, 1);
}
/* The worker thread main loop*/
M_INLINE void
m_work3r_thread(void *arg)
{
// Get back the given argument
struct m_worker_s *g = M_ASSIGN_CAST(struct m_worker_s *, arg);
while (true) {
m_work3r_order_ct w;
// If needed, reset the global state of the worker
if (g->resetFunc_g != NULL) {
g->resetFunc_g();
}
// Waiting for data
M_WORK3R_DEBUG ("Waiting for data (queue: %lu / %lu)\n", m_work3r_queue_size(g->queue_g), m_work3r_queue_capacity(g->queue_g));
m_work3r_queue_pop(&w, g->queue_g);
// We received a work order
// Note: that the work order is still present in the queue
// preventing further work order to be pushed in the queue until it finishes doing the work
// If a stop request is received, terminate the thread
if (w.block == NULL) break;
// Execute the work order
m_work3r_exec(&w);
// Consumme fully the work order in the queue
m_work3r_queue_pop_release(g->queue_g);
// Signal that a worker has finished.
m_mutex_lock(g->lock);
m_cond_broadcast(g->a_thread_ends);
m_mutex_unlock(g->lock);
}
// If needed, clear global state of the thread
if (g->clearFunc_g != NULL) {
g->clearFunc_g();
}
}
/* Initialization of the worker module (constructor)
Input:
@numWorker: number of worker to create (0=autodetect, -1=2*autodetect)
@extraQueue: number of extra work order we can get if all workers are full
@resetFunc: function to reset the state of a worker between work orders (or NULL if none)
@clearFunc: function to clear the state of a worker before terminaning (or NULL if none)
*/
M_INLINE void
m_worker_init(m_worker_t g, int numWorker, unsigned int extraQueue, void (*resetFunc)(void), void (*clearFunc)(void))
{
M_ASSERT (numWorker >= -1);
// Auto compute number of workers if the argument is 0
if (numWorker <= 0)
numWorker = (1 + (numWorker == -1))*m_work3r_get_cpu_count()-1;
M_WORK3R_DEBUG ("Starting queue with: %d\n", numWorker + extraQueue);
// Initialization
// numWorker can still be 0 if it is a single core cpu (no worker available)
M_ASSERT(numWorker >= 0);
size_t numWorker_st = (size_t) numWorker;
g->worker = M_MEMORY_REALLOC(m_work3r_thread_ct, NULL, numWorker_st);
if (M_UNLIKELY_NOMEM (g->worker == NULL)) {
M_MEMORY_FULL(sizeof (m_work3r_thread_ct) * numWorker_st);
return;
}
m_work3r_queue_init(g->queue_g, numWorker_st + extraQueue);
g->numWorker_g = (unsigned int) numWorker_st;
g->resetFunc_g = resetFunc;
g->clearFunc_g = clearFunc;
m_mutex_init(g->lock);
m_cond_init(g->a_thread_ends);
// Create & start the workers
for(size_t i = 0; i < numWorker_st; i++) {
m_thread_create(g->worker[i].id, m_work3r_thread, M_ASSIGN_CAST(void*, g));
}
}
/* Initialization of the worker module (constructor)
Provide default values for the arguments.
Input:
@numWorker: number of worker to create (0=autodetect, -1=2*autodetect)
@extraQueue: number of extra work order we can get if all workers are full
@resetFunc: function to reset the state of a worker between work orders (optional)
@clearFunc: function to clear the state of a worker before terminaning (optional)
*/
#define m_worker_init(...) m_worker_init(M_DEFAULT_ARGS(5, (0, 0, NULL, NULL), __VA_ARGS__))
/* Clear of the worker module (destructor) */
M_INLINE void
m_worker_clear(m_worker_t g)
{
M_ASSERT (m_work3r_queue_empty_p (g->queue_g));
// Push the terminate order on the queue
for(unsigned int i = 0; i < g->numWorker_g; i++) {
m_work3r_order_ct w = M_WORK3R_EMPTY_ORDER;
// Normaly all worker threads shall be waiting at this
// stage, so all push won't block as the queue is empty.
// But for robustness, let's wait.
m_work3r_queue_push_blocking (g->queue_g, w, true);
}
// Wait for thread terminanison
for(unsigned int i = 0; i < g->numWorker_g; i++) {
m_thread_join(g->worker[i].id);
}
// Clear memory
M_MEMORY_FREE(g->worker);
m_mutex_clear(g->lock);
m_cond_clear(g->a_thread_ends);
m_work3r_queue_clear(g->queue_g);
}
/* Start a new collaboration between workers of pool 'g'
by defining the synchronization point 'block' */
M_INLINE void
m_worker_start(m_worker_sync_t block, m_worker_t g)
{
atomic_init (&block->num_spawn, 0);
atomic_init (&block->num_terminated_spawn, 0);
block->worker = g;
}
/* Spawn the given work order to workers if possible,
or do it ourself if no worker is available.
The synchronization point is defined a 'block'
The work order if composed of the function 'func' and its 'data'
*/
M_INLINE void
m_worker_spawn(m_worker_sync_t block, void (*func)(void *data), void *data)
{
const m_work3r_order_ct w = { block, data, func M_WORK3R_EXTRA_ORDER };
if (M_UNLIKELY (!m_work3r_queue_full_p(block->worker->queue_g))
&& m_work3r_queue_push (block->worker->queue_g, w) == true) {
M_WORK3R_DEBUG ("Sending data to thread: %p (block: %d / %d)\n", data, block->num_spawn, block->num_terminated_spawn);
atomic_fetch_add (&block->num_spawn, 1);
return;
}
M_WORK3R_DEBUG ("Running data ourself: %p\n", data);
/* No worker available. Call the function ourself */
(*func) (data);
}
#if M_USE_WORKER_CLANG_BLOCK
/* Spawn or not the given work order to workers,
or do it ourself if no worker is available */
M_INLINE void
m_work3r_spawn_block(m_worker_sync_t block, void (^func)(void *data), void *data)
{
const m_work3r_order_ct w = { block, data, NULL, func };
if (M_UNLIKELY (!m_work3r_queue_full_p(block->worker->queue_g))
&& m_work3r_queue_push (block->worker->queue_g, w) == true) {
M_WORK3R_DEBUG ("Sending data to thread as block: %p (block: %d / %d)\n", data, block->num_spawn, block->num_terminated_spawn);
atomic_fetch_add (&block->num_spawn, 1);
return;
}
M_WORK3R_DEBUG ("Running data ourself as block: %p\n", data);
/* No worker available. Call the function ourself */
func (data);
}
#endif
#if M_USE_WORKER_CPP_FUNCTION
/* Spawn or not the given work order to workers,
or do it ourself if no worker is available */
M_INLINE void
m_work3r_spawn_function(m_worker_sync_t block, std::function<void(void *data)> func, void *data)
{
const m_work3r_order_ct w = { block, data, NULL, func };
if (M_UNLIKELY (!m_work3r_queue_full_p(block->worker->queue_g))
&& m_work3r_queue_push (block->worker->queue_g, w) == true) {
M_WORK3R_DEBUG ("Sending data to thread as block: %p (block: %d / %d)\n", data, block->num_spawn, block->num_terminated_spawn);
atomic_fetch_add (&block->num_spawn, 1);
return;
}
M_WORK3R_DEBUG ("Running data ourself as block: %p\n", data);
/* No worker available. Call the function ourself */
func (data);
}
#endif
/* Test if all work orders of the given synchronization point are finished */
M_INLINE bool
m_worker_sync_p(m_worker_sync_t block)
{
/* If the number of spawns is greated than the number
of terminated spawns, some spawns are still working.
So wait for terminaison */
return (atomic_load(&block->num_spawn) == atomic_load (&block->num_terminated_spawn));
}
/* Wait for all work orders of the given synchronization point to be finished */
M_INLINE void
m_worker_sync(m_worker_sync_t block)
{
M_WORK3R_DEBUG ("Waiting for thread terminasion.\n");
// Fast case: all workers have finished
if (m_worker_sync_p(block)) return;
// Slow case: perform a locked wait to put this thread to waiting state
m_mutex_lock(block->worker->lock);
while (!m_worker_sync_p(block)) {
m_cond_wait(block->worker->a_thread_ends, block->worker->lock);
}
m_mutex_unlock(block->worker->lock);
}
/* Flush any work order in the queue ourself if some remains.*/
M_INLINE void
m_worker_flush(m_worker_t g)
{
m_work3r_order_ct w;
while (m_work3r_queue_pop_blocking (&w, g->queue_g, false) == true) {
m_work3r_exec(&w);
m_work3r_queue_pop_release(g->queue_g);
}
}
/* Return the number of workers */
M_INLINE size_t
m_worker_count(m_worker_t g)
{
return g->numWorker_g + 1;
}
/* Spawn the 'core' block computation into another thread if
a worker thread is available. Compute it in the current thread otherwise.
'block' shall be the initialised synchronised block for all threads.
'input' is the list of input variables of the 'core' block within "( )"
'output' is the list of output variables of the 'core' block within "( )"
Output variables are only available after a synchronisation block.
TODO: Support oplist for input & outputs parameters
*/
#if M_USE_WORKER_CLANG_BLOCK
#define M_WORKER_SPAWN(_block, _input, _core, _output) \
M_WORK3R_DEF_DATA(_input, _output) \
M_WORK3R_DEF_SUBBLOCK(_input, _output, _core) \
m_work3r_spawn_block ((_block), M_WORK3R_SPAWN_SUBFUNC_NAME, &M_WORK3R_SPAWN_DATA_NAME)
#elif M_USE_WORKER_CPP_FUNCTION
// TODO: Explicit pass all arguments by reference.
#define M_WORKER_SPAWN(_block, _input, _core, _output) \
m_work3r_spawn_function ((_block), [&](void *param) {(void)param ; _core } , NULL)
#else
#define M_WORKER_SPAWN(_block, _input, _core, _output) \
M_WORK3R_DEF_DATA(_input, _output) \
M_WORK3R_DEF_SUBFUNC(_input, _output, _core) \
m_worker_spawn ((_block), M_WORK3R_SPAWN_SUBFUNC_NAME, &M_WORK3R_SPAWN_DATA_NAME)
#endif
#define M_WORK3R_SPAWN_STRUCT_NAME M_C(m_work3r_data_s_, __LINE__)
#define M_WORK3R_SPAWN_DATA_NAME M_C(m_work3r_data_, __LINE__)
#define M_WORK3R_SPAWN_SUBFUNC_NAME M_C(m_work3r_subfunc_, __LINE__)
#define M_WORK3R_DEF_DATA(_input, _output) \
struct M_WORK3R_SPAWN_STRUCT_NAME { \
M_WORK3R_DEF_DATA_INPUT _input \
M_IF_EMPTY _output ( , M_WORK3R_DEF_DATA_OUTPUT _output) \
} M_WORK3R_SPAWN_DATA_NAME = { \
M_WORK3R_INIT_DATA_INPUT _input \
M_IF_EMPTY _output (, M_WORK3R_INIT_DATA_OUTPUT _output) \
};
#define M_WORK3R_DEF_SINGLE_INPUT(var) __typeof__(var) var;
#define M_WORK3R_DEF_DATA_INPUT(...) \
M_MAP(M_WORK3R_DEF_SINGLE_INPUT, __VA_ARGS__)
#define M_WORK3R_DEF_SINGLE_OUTPUT(var) \
__typeof__(var) *M_C(var, _ptr);
#define M_WORK3R_DEF_DATA_OUTPUT(...) \
M_MAP(M_WORK3R_DEF_SINGLE_OUTPUT, __VA_ARGS__)
#define M_WORK3R_INIT_SINGLE_INPUT(var) \
.var = var,
#define M_WORK3R_INIT_DATA_INPUT(...) \
M_MAP(M_WORK3R_INIT_SINGLE_INPUT, __VA_ARGS__)
#define M_WORK3R_INIT_SINGLE_OUTPUT(var) \
.M_C(var, _ptr) = &var,
#define M_WORK3R_INIT_DATA_OUTPUT(...) \
M_MAP(M_WORK3R_INIT_SINGLE_OUTPUT, __VA_ARGS__)
#define M_WORK3R_DEF_SUBFUNC(_input, _output, _core) \
__extension__ auto void M_WORK3R_SPAWN_SUBFUNC_NAME(void *) ; \
__extension__ void M_WORK3R_SPAWN_SUBFUNC_NAME(void *_data) \
{ \
struct M_WORK3R_SPAWN_STRUCT_NAME *_s_data = _data ; \
M_WORK3R_INIT_LOCAL_INPUT _input \
M_IF_EMPTY _output ( , M_WORK3R_INIT_LOCAL_OUTPUT _output) \
do { _core } while (0); \
M_IF_EMPTY _output ( , M_WORK3R_PROPAGATE_LOCAL_OUTPUT _output) \
};
#define M_WORK3R_DEF_SUBBLOCK(_input, _output, _core) \
void (^M_WORK3R_SPAWN_SUBFUNC_NAME) (void *) = ^ void (void * _data) \
{ \
struct M_WORK3R_SPAWN_STRUCT_NAME *_s_data = _data ; \
M_WORK3R_INIT_LOCAL_INPUT _input \
M_IF_EMPTY _output ( , M_WORK3R_INIT_LOCAL_OUTPUT _output) \
do { _core } while (0); \
M_IF_EMPTY _output ( , M_WORK3R_PROPAGATE_LOCAL_OUTPUT _output) \
};
#define M_WORK3R_INIT_SINGLE_LOCAL_INPUT(var) \
__typeof__(var) var = _s_data->var;
#define M_WORK3R_INIT_LOCAL_INPUT(...) \
M_MAP(M_WORK3R_INIT_SINGLE_LOCAL_INPUT, __VA_ARGS__)
#define M_WORK3R_INIT_SINGLE_LOCAL_OUTPUT(var) \
__typeof__(var) var;
#define M_WORK3R_INIT_LOCAL_OUTPUT(...) \
M_MAP(M_WORK3R_INIT_SINGLE_LOCAL_OUTPUT, __VA_ARGS__)
#define M_WORK3R_PROPAGATE_SINGLE_OUTPUT(var) \
*(_s_data->M_C(var, _ptr)) = var;
#define M_WORK3R_PROPAGATE_LOCAL_OUTPUT(...) \
M_MAP(M_WORK3R_PROPAGATE_SINGLE_OUTPUT, __VA_ARGS__)
M_END_PROTECTED_CODE
#else /* M_USE_WORKER */
/* Define empty types and empty functions to not use any worker */
typedef struct m_worker_block_s {
int x;
} m_worker_sync_t[1];
typedef struct m_worker_s {
int x;
} m_worker_t[1];
#define m_worker_init(g, numWorker, extraQueue, resetFunc) do { (void) g; } while (0)
#define m_worker_clear(g) do { (void) g; } while (0)
#define m_worker_start(b, w) do { (void) b; } while (0)
#define m_worker_spawn(b, f, d) do { f(d); } while (0)
#define m_worker_sync_p(b) true
#define m_worker_sync(b) do { (void) b; } while (0)
#define m_worker_count(w) 1
#define m_worker_flush(w) do { (void) w; } while (0)
#define M_WORKER_SPAWN(b, i, c, o) do { c } while (0)
#endif /* M_USE_WORKER */
#if M_USE_SMALL_NAME
#define worker_t m_worker_t
#define worker_sync_t m_worker_sync_t
#define worker_init m_worker_init
#define worker_clear m_worker_clear
#define worker_start m_worker_start
#define worker_spawn m_worker_spawn
#define worker_sync_p m_worker_sync_p
#define worker_sync m_worker_sync
#define worker_count m_worker_count
#define worker_flush m_worker_flush
#define WORKER_SPAWN M_WORKER_SPAWN
#endif
#endif