simply threadpool

This commit is contained in:
veejay
2023-10-11 04:13:26 +02:00
parent b0a2aa1cd8
commit d0e6f9c67e
2 changed files with 154 additions and 243 deletions

View File

@@ -48,263 +48,57 @@
#include <veejaycore/vj-task.h>
#include <veejaycore/avcommon.h>
#include <stdatomic.h>
//@ job description
static vj_task_arg_t *vj_task_args[MAX_WORKERS];
//@ task
struct task
{
int8_t task_id;
void *data;
performer_job_routine handler;
char padding[64 - sizeof(int8_t) - sizeof(void*) - sizeof(performer_job_routine)];
};
//@ job structure
typedef struct {
performer_job_routine job;
void *arg;
} pjob_t;
//@ no dynamic, static allocation here.
static struct task running_tasks[MAX_WORKERS];
static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t tasks_completed;
static pthread_cond_t current_task;
struct task *tasks_ = NULL;
struct task *tail_task_= NULL;
static pthread_t p_threads[MAX_WORKERS];
static pthread_attr_t p_attr[MAX_WORKERS];
static pjob_t *job_list[MAX_WORKERS];
static uint8_t p_tasks[MAX_WORKERS];
static unsigned int n_cpu = 1;
static uint8_t numThreads = 0;
_Atomic static uint8_t total_tasks = 0;
_Atomic static uint8_t tasks_done[MAX_WORKERS];
_Atomic static int exitFlag = 0;
static struct sched_param param;
static int euid = 0;
static uint8_t task_get_workers();
typedef struct {
pjob_t *queue;
int queue_size;
int front;
int rear;
pthread_mutex_t lock;
pthread_cond_t task_available;
pthread_cond_t task_completed;
pthread_t threads[MAX_WORKERS];
atomic_int stop_flag;
} thread_pool_t;
static inline uint8_t task_is_work_done( )
static unsigned int n_cpu = 0;
static uint8_t numThreads = 0;
static thread_pool_t *task_pool = NULL;
static uint8_t task_get_workers()
{
const uint8_t n = task_get_workers();
uint8_t i;
uint8_t done = 0;
for( i = 0; i < n; i ++ ) {
done += atomic_load(&tasks_done[i]);
}
return (done == n ? 1 : 0);
return numThreads;
}
static void task_allocate()
int vj_task_get_num_cpus()
{
unsigned int i;
for( i = 0; i < MAX_WORKERS; i ++ ) {
job_list[i] = vj_calloc(sizeof(pjob_t));
vj_task_args[i] = vj_calloc(sizeof(vj_task_arg_t));
}
if( n_cpu > 0 )
return n_cpu;
long ret = sysconf( _SC_NPROCESSORS_ONLN );
if( ret <= 0)
n_cpu = 1;
else
n_cpu = (unsigned int) ret;
}
void task_destroy()
{
unsigned int i;
for( i = 0; i < MAX_WORKERS; i ++ ) {
free(job_list[i]);
free(vj_task_args[i]);
}
}
static void task_reset()
{
unsigned int i;
veejay_memset( &p_threads,0,sizeof(p_threads));
veejay_memset( &p_tasks,0,sizeof(p_tasks));
for( i = 0; i < MAX_WORKERS; i ++ ) {
veejay_memset( job_list[i],0, sizeof(pjob_t));
veejay_memset( vj_task_args[i],0, sizeof(vj_task_arg_t));
veejay_memset( &(running_tasks[i]), 0, sizeof(struct task));
atomic_store_explicit( &(tasks_done[i]), 0, memory_order_relaxed );
if( n_cpu > MAX_WORKERS ) {
n_cpu = MAX_WORKERS;
}
atomic_store_explicit( &total_tasks, 0, memory_order_relaxed );
numThreads = 0;
}
static void task_add(uint8_t task_no, performer_job_routine fp, void *data) {
struct task *qp_task = &(running_tasks[task_no]);
qp_task->task_id = task_no;
qp_task->handler = fp;
qp_task->data = data;
atomic_fetch_add(&total_tasks, 1);
}
static struct task *task_get()
{
if( atomic_load(&total_tasks) > 0 ) {
atomic_fetch_sub(&total_tasks,1);
return &(running_tasks[atomic_load(&total_tasks)]);
}
return NULL;
}
static void task_run(struct task *task, void *data, uint8_t id) {
(*task->handler)(data);
atomic_fetch_add(&tasks_done[id],1);
if (task_is_work_done()) {
pthread_mutex_lock(&queue_mutex);
pthread_cond_signal(&tasks_completed);
pthread_mutex_unlock(&queue_mutex);
}
}
static void *task_thread(void *data)
{
for( ;; )
{
struct task *t = NULL;
while( atomic_load(&total_tasks) == 0) {
if(atomic_load(&exitFlag)) {
pthread_exit(NULL);
return NULL;
}
pthread_mutex_lock(&queue_mutex);
pthread_cond_wait(&current_task, &queue_mutex );
pthread_mutex_unlock(&queue_mutex);
}
t = task_get();
if( t != NULL ) {
task_run( t, t->data, t->task_id );
}
}
return NULL;
}
static uint8_t task_get_workers()
{
return numThreads;
}
void task_init()
{
task_allocate();
int max_p = sched_get_priority_max( SCHED_FIFO );
int min_p = sched_get_priority_min( SCHED_FIFO );
max_p = (int) ( ((float) max_p) * 0.95f );
if( max_p < min_p )
max_p = min_p;
veejay_memset( &param, 0, sizeof(param));
euid = geteuid();
if( euid == 0 ) {
param.sched_priority = max_p;
}
}
unsigned int task_num_cpus()
{
return n_cpu;
}
int task_start(unsigned int max_workers)
{
uint8_t i;
if( max_workers >= MAX_WORKERS ) {
return 0;
}
atomic_store(&exitFlag, 0);
pthread_cond_init( &tasks_completed, NULL );
pthread_cond_init( &current_task, NULL );
for( i = 0 ; i < max_workers; i ++ ) {
if( pthread_create( &p_threads[i], (void*) &p_attr[i], task_thread, NULL ) )
{
veejay_msg(0, "%s: error starting thread %d/%d", __FUNCTION__,i,max_workers );
memset( &p_threads[i], 0, sizeof(pthread_t) );
return -1;
}
}
numThreads = max_workers;
return numThreads;
}
uint8_t num_threaded_tasks()
{
return numThreads;
}
void task_stop(unsigned int max_workers)
{
unsigned int i;
atomic_store_explicit(&exitFlag, 1, memory_order_relaxed);
pthread_mutex_lock(&queue_mutex);
pthread_cond_broadcast( &tasks_completed );
pthread_mutex_unlock(&queue_mutex);
for( i = 0; i < max_workers;i++ ) {
pthread_join( p_threads[i], (void*)&p_tasks[i] );
pthread_attr_destroy( &p_attr[i] );
}
pthread_cond_destroy( &current_task );
pthread_cond_destroy( &tasks_completed );
task_reset();
}
void performer_job( uint8_t n )
{
uint8_t i;
for( i = 0; i < n; i ++ ) {
atomic_store(&tasks_done[i],0);
}
pthread_mutex_lock(&queue_mutex);
for( i = 0; i < n; i ++ ) {
pjob_t *slot = job_list[i];
task_add( i, slot->job, slot->arg );
}
pthread_cond_broadcast( &current_task );
pthread_mutex_unlock(&queue_mutex);
while(!task_is_work_done()) {
pthread_mutex_lock(&queue_mutex);
pthread_cond_wait( &tasks_completed, &queue_mutex );
pthread_mutex_unlock(&queue_mutex);
}
}
void vj_task_set_float( float f ){
uint8_t i;
uint8_t n = task_get_workers();
@@ -320,11 +114,6 @@ void vj_task_set_param( int val , int idx ){
vj_task_args[i]->iparams[idx] = val;
}
uint8_t vj_task_available()
{
return ( task_get_workers() > 1 ? 1 : 0);
}
void vj_task_set_ptr( void *ptr )
{
uint8_t i;
@@ -459,12 +248,105 @@ void vj_task_set_from_frame( VJFrame *in )
}
}
static void* task_worker(void *arg) {
thread_pool_t *pool = (thread_pool_t *)arg;
while (1) {
pthread_mutex_lock(&pool->lock);
while (pool->front == pool->rear && !atomic_load(&pool->stop_flag)) {
pthread_cond_wait(&pool->task_available, &pool->lock);
}
if (atomic_load(&pool->stop_flag)) {
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
pjob_t task = pool->queue[pool->front];
pool->front = (pool->front + 1) % pool->queue_size;
pthread_cond_signal(&pool->task_completed);
pthread_mutex_unlock(&pool->lock);
task.job(task.arg);
task.job = NULL;
task.arg = NULL;
}
return NULL;
}
static thread_pool_t* create_thread_pool(int num_threads, int queue_size) {
thread_pool_t *pool = (thread_pool_t *)malloc(sizeof(thread_pool_t));
pool->queue = (pjob_t *)malloc(sizeof(pjob_t) * queue_size);
pool->queue_size = queue_size;
pool->front = 0;
pool->rear = 0;
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->task_available, NULL);
pthread_cond_init(&pool->task_completed, NULL);
atomic_init(&pool->stop_flag, 0);
for (int i = 0; i < num_threads; ++i) {
pthread_create(&(pool->threads[i]), NULL, task_worker, (void *)pool);
}
veejay_msg(VEEJAY_MSG_INFO, "Created thread pool with %d workers" , num_threads );
return pool;
}
static void submit_job(thread_pool_t *pool, performer_job_routine job, void *arg) {
pthread_mutex_lock(&pool->lock);
while ((pool->rear + 1) % pool->queue_size == pool->front) {
pthread_cond_wait(&pool->task_completed, &pool->lock);
}
pool->queue[pool->rear].job = job;
pool->queue[pool->rear].arg = arg;
pool->rear = (pool->rear + 1) % pool->queue_size;
pthread_cond_signal(&pool->task_available);
pthread_mutex_unlock(&pool->lock);
}
static void wait_all_tasks_completed(thread_pool_t *pool) {
pthread_mutex_lock(&pool->lock);
while (pool->front != pool->rear) {
pthread_cond_wait(&pool->task_completed, &pool->lock);
}
pthread_mutex_unlock(&pool->lock);
}
static void destroy_thread_pool(thread_pool_t *pool) {
pthread_mutex_lock(&pool->lock);
atomic_store(&pool->stop_flag, 1);
pthread_cond_broadcast(&pool->task_available);
pthread_mutex_unlock(&pool->lock);
for (int i = 0; i < 4; ++i) {
pthread_join(pool->threads[i], NULL);
}
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->task_available);
pthread_cond_destroy(&pool->task_completed);
free(pool->queue);
free(pool);
}
int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int n_planes, performer_job_routine func )
{
const uint8_t n = task_get_workers();
if( n <= 1 ) {
return 0;
}
return 0;
}
vj_task_arg_t **f = (vj_task_arg_t**) vj_task_args;
uint8_t i,j;
@@ -503,11 +385,41 @@ int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int
}
for( i = 0; i < n; i ++ ) {
job_list[i]->job = func;
job_list[i]->arg = f[i];
submit_job( task_pool, func, f[i] );
}
performer_job( n );
wait_all_tasks_completed(task_pool);
return 1;
}
void task_destroy()
{
destroy_thread_pool( task_pool );
}
void task_init()
{
vj_task_get_num_cpus();
numThreads = n_cpu/2;
char *task_cfg = getenv( "VEEJAY_MULTITHREAD_TASKS" );
if( task_cfg != NULL ) {
numThreads = atoi( task_cfg );
}
if( numThreads < 0 || numThreads > MAX_WORKERS ) {
numThreads = n_cpu;
veejay_msg(VEEJAY_MSG_ERROR, "Invalid value for VEEJAY_MULTITHREAD_TASKS, using default (%d)",numThreads);
}
int i;
for( i = 0; i < MAX_WORKERS ; i ++ ) {
vj_task_args[i] = vj_calloc( sizeof(vj_task_arg_t) );
}
task_pool = create_thread_pool( n_cpu, numThreads );
}

View File

@@ -47,7 +47,6 @@ typedef struct
} vj_task_arg_t;
int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int n_planes, performer_job_routine func );
uint8_t vj_task_available();
void vj_task_set_float( float f );
void vj_task_set_ptr( void *ptr );
void vj_task_set_to_frame( VJFrame *frame, int pos, int job );
@@ -58,7 +57,7 @@ int task_start(unsigned int max_workers);
void task_stop(unsigned int max_workers);
void task_init();
void task_destroy();
unsigned int task_num_cpus();
int vj_task_get_num_cpus();
void vj_task_set_overlap( int val );
void performer_job( uint8_t job_num );
uint8_t num_threaded_tasks();