diff --git a/veejay-current/veejay-core/veejaycore/vj-task.c b/veejay-current/veejay-core/veejaycore/vj-task.c index 9234300c..cb8f5201 100644 --- a/veejay-current/veejay-core/veejaycore/vj-task.c +++ b/veejay-current/veejay-core/veejaycore/vj-task.c @@ -48,263 +48,57 @@ #include #include #include + //@ job description static vj_task_arg_t *vj_task_args[MAX_WORKERS]; -//@ task -struct task -{ - int8_t task_id; - void *data; - performer_job_routine handler; - char padding[64 - sizeof(int8_t) - sizeof(void*) - sizeof(performer_job_routine)]; -}; - //@ job structure typedef struct { performer_job_routine job; void *arg; } pjob_t; -//@ no dynamic, static allocation here. -static struct task running_tasks[MAX_WORKERS]; -static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t tasks_completed; -static pthread_cond_t current_task; -struct task *tasks_ = NULL; -struct task *tail_task_= NULL; -static pthread_t p_threads[MAX_WORKERS]; -static pthread_attr_t p_attr[MAX_WORKERS]; -static pjob_t *job_list[MAX_WORKERS]; -static uint8_t p_tasks[MAX_WORKERS]; -static unsigned int n_cpu = 1; -static uint8_t numThreads = 0; -_Atomic static uint8_t total_tasks = 0; -_Atomic static uint8_t tasks_done[MAX_WORKERS]; -_Atomic static int exitFlag = 0; -static struct sched_param param; -static int euid = 0; -static uint8_t task_get_workers(); +typedef struct { + pjob_t *queue; + int queue_size; + int front; + int rear; + pthread_mutex_t lock; + pthread_cond_t task_available; + pthread_cond_t task_completed; + pthread_t threads[MAX_WORKERS]; + atomic_int stop_flag; +} thread_pool_t; -static inline uint8_t task_is_work_done( ) + +static unsigned int n_cpu = 0; +static uint8_t numThreads = 0; +static thread_pool_t *task_pool = NULL; + +static uint8_t task_get_workers() { - const uint8_t n = task_get_workers(); - uint8_t i; - uint8_t done = 0; - for( i = 0; i < n; i ++ ) { - done += atomic_load(&tasks_done[i]); - } - return (done == n ? 1 : 0); + return numThreads; } -static void task_allocate() + +int vj_task_get_num_cpus() { - unsigned int i; - for( i = 0; i < MAX_WORKERS; i ++ ) { - job_list[i] = vj_calloc(sizeof(pjob_t)); - vj_task_args[i] = vj_calloc(sizeof(vj_task_arg_t)); - } + if( n_cpu > 0 ) + return n_cpu; long ret = sysconf( _SC_NPROCESSORS_ONLN ); if( ret <= 0) n_cpu = 1; else n_cpu = (unsigned int) ret; -} -void task_destroy() -{ - unsigned int i; - for( i = 0; i < MAX_WORKERS; i ++ ) { - free(job_list[i]); - free(vj_task_args[i]); - } -} - -static void task_reset() -{ - unsigned int i; - - veejay_memset( &p_threads,0,sizeof(p_threads)); - veejay_memset( &p_tasks,0,sizeof(p_tasks)); - for( i = 0; i < MAX_WORKERS; i ++ ) { - veejay_memset( job_list[i],0, sizeof(pjob_t)); - veejay_memset( vj_task_args[i],0, sizeof(vj_task_arg_t)); - veejay_memset( &(running_tasks[i]), 0, sizeof(struct task)); - - atomic_store_explicit( &(tasks_done[i]), 0, memory_order_relaxed ); + if( n_cpu > MAX_WORKERS ) { + n_cpu = MAX_WORKERS; } - atomic_store_explicit( &total_tasks, 0, memory_order_relaxed ); - - numThreads = 0; -} - - -static void task_add(uint8_t task_no, performer_job_routine fp, void *data) { - struct task *qp_task = &(running_tasks[task_no]); - - qp_task->task_id = task_no; - qp_task->handler = fp; - qp_task->data = data; - - atomic_fetch_add(&total_tasks, 1); -} - -static struct task *task_get() -{ - if( atomic_load(&total_tasks) > 0 ) { - atomic_fetch_sub(&total_tasks,1); - return &(running_tasks[atomic_load(&total_tasks)]); - } - return NULL; -} - -static void task_run(struct task *task, void *data, uint8_t id) { - - (*task->handler)(data); - - atomic_fetch_add(&tasks_done[id],1); - - if (task_is_work_done()) { - pthread_mutex_lock(&queue_mutex); - pthread_cond_signal(&tasks_completed); - pthread_mutex_unlock(&queue_mutex); - } -} - - - -static void *task_thread(void *data) -{ - for( ;; ) - { - struct task *t = NULL; - - while( atomic_load(&total_tasks) == 0) { - if(atomic_load(&exitFlag)) { - pthread_exit(NULL); - return NULL; - } - - pthread_mutex_lock(&queue_mutex); - pthread_cond_wait(¤t_task, &queue_mutex ); - pthread_mutex_unlock(&queue_mutex); - - } - - t = task_get(); - - if( t != NULL ) { - task_run( t, t->data, t->task_id ); - } - } - - return NULL; -} - -static uint8_t task_get_workers() -{ - return numThreads; -} - -void task_init() -{ - task_allocate(); - - int max_p = sched_get_priority_max( SCHED_FIFO ); - int min_p = sched_get_priority_min( SCHED_FIFO ); - - max_p = (int) ( ((float) max_p) * 0.95f ); - if( max_p < min_p ) - max_p = min_p; - - veejay_memset( ¶m, 0, sizeof(param)); - euid = geteuid(); - if( euid == 0 ) { - param.sched_priority = max_p; - } -} - -unsigned int task_num_cpus() -{ return n_cpu; } -int task_start(unsigned int max_workers) -{ - uint8_t i; - if( max_workers >= MAX_WORKERS ) { - return 0; - } - - atomic_store(&exitFlag, 0); - - pthread_cond_init( &tasks_completed, NULL ); - pthread_cond_init( ¤t_task, NULL ); - - for( i = 0 ; i < max_workers; i ++ ) { - if( pthread_create( &p_threads[i], (void*) &p_attr[i], task_thread, NULL ) ) - { - veejay_msg(0, "%s: error starting thread %d/%d", __FUNCTION__,i,max_workers ); - memset( &p_threads[i], 0, sizeof(pthread_t) ); - return -1; - } - } - - numThreads = max_workers; - - return numThreads; -} - -uint8_t num_threaded_tasks() -{ - return numThreads; -} - -void task_stop(unsigned int max_workers) -{ - unsigned int i; - - atomic_store_explicit(&exitFlag, 1, memory_order_relaxed); - - pthread_mutex_lock(&queue_mutex); - pthread_cond_broadcast( &tasks_completed ); - pthread_mutex_unlock(&queue_mutex); - - for( i = 0; i < max_workers;i++ ) { - pthread_join( p_threads[i], (void*)&p_tasks[i] ); - pthread_attr_destroy( &p_attr[i] ); - } - - pthread_cond_destroy( ¤t_task ); - pthread_cond_destroy( &tasks_completed ); - - task_reset(); -} - -void performer_job( uint8_t n ) -{ - uint8_t i; - for( i = 0; i < n; i ++ ) { - atomic_store(&tasks_done[i],0); - } - - pthread_mutex_lock(&queue_mutex); - for( i = 0; i < n; i ++ ) { - pjob_t *slot = job_list[i]; - task_add( i, slot->job, slot->arg ); - } - pthread_cond_broadcast( ¤t_task ); - pthread_mutex_unlock(&queue_mutex); - - while(!task_is_work_done()) { - pthread_mutex_lock(&queue_mutex); - pthread_cond_wait( &tasks_completed, &queue_mutex ); - pthread_mutex_unlock(&queue_mutex); - } -} - void vj_task_set_float( float f ){ uint8_t i; uint8_t n = task_get_workers(); @@ -320,11 +114,6 @@ void vj_task_set_param( int val , int idx ){ vj_task_args[i]->iparams[idx] = val; } -uint8_t vj_task_available() -{ - return ( task_get_workers() > 1 ? 1 : 0); -} - void vj_task_set_ptr( void *ptr ) { uint8_t i; @@ -459,12 +248,105 @@ void vj_task_set_from_frame( VJFrame *in ) } } + +static void* task_worker(void *arg) { + thread_pool_t *pool = (thread_pool_t *)arg; + + while (1) { + pthread_mutex_lock(&pool->lock); + + while (pool->front == pool->rear && !atomic_load(&pool->stop_flag)) { + pthread_cond_wait(&pool->task_available, &pool->lock); + } + + if (atomic_load(&pool->stop_flag)) { + pthread_mutex_unlock(&pool->lock); + pthread_exit(NULL); + } + + pjob_t task = pool->queue[pool->front]; + pool->front = (pool->front + 1) % pool->queue_size; + pthread_cond_signal(&pool->task_completed); + + pthread_mutex_unlock(&pool->lock); + + task.job(task.arg); + + task.job = NULL; + task.arg = NULL; + } + + return NULL; +} + +static thread_pool_t* create_thread_pool(int num_threads, int queue_size) { + thread_pool_t *pool = (thread_pool_t *)malloc(sizeof(thread_pool_t)); + pool->queue = (pjob_t *)malloc(sizeof(pjob_t) * queue_size); + pool->queue_size = queue_size; + pool->front = 0; + pool->rear = 0; + pthread_mutex_init(&pool->lock, NULL); + pthread_cond_init(&pool->task_available, NULL); + pthread_cond_init(&pool->task_completed, NULL); + atomic_init(&pool->stop_flag, 0); + + for (int i = 0; i < num_threads; ++i) { + pthread_create(&(pool->threads[i]), NULL, task_worker, (void *)pool); + } + + veejay_msg(VEEJAY_MSG_INFO, "Created thread pool with %d workers" , num_threads ); + + return pool; +} + +static void submit_job(thread_pool_t *pool, performer_job_routine job, void *arg) { + pthread_mutex_lock(&pool->lock); + + while ((pool->rear + 1) % pool->queue_size == pool->front) { + pthread_cond_wait(&pool->task_completed, &pool->lock); + } + + pool->queue[pool->rear].job = job; + pool->queue[pool->rear].arg = arg; + pool->rear = (pool->rear + 1) % pool->queue_size; + + pthread_cond_signal(&pool->task_available); + pthread_mutex_unlock(&pool->lock); +} + +static void wait_all_tasks_completed(thread_pool_t *pool) { + pthread_mutex_lock(&pool->lock); + while (pool->front != pool->rear) { + pthread_cond_wait(&pool->task_completed, &pool->lock); + } + pthread_mutex_unlock(&pool->lock); +} + +static void destroy_thread_pool(thread_pool_t *pool) { + pthread_mutex_lock(&pool->lock); + atomic_store(&pool->stop_flag, 1); + pthread_cond_broadcast(&pool->task_available); + pthread_mutex_unlock(&pool->lock); + + for (int i = 0; i < 4; ++i) { + pthread_join(pool->threads[i], NULL); + } + + pthread_mutex_destroy(&pool->lock); + pthread_cond_destroy(&pool->task_available); + pthread_cond_destroy(&pool->task_completed); + + free(pool->queue); + free(pool); +} + + int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int n_planes, performer_job_routine func ) { const uint8_t n = task_get_workers(); if( n <= 1 ) { - return 0; - } + return 0; + } vj_task_arg_t **f = (vj_task_arg_t**) vj_task_args; uint8_t i,j; @@ -503,11 +385,41 @@ int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int } for( i = 0; i < n; i ++ ) { - job_list[i]->job = func; - job_list[i]->arg = f[i]; + submit_job( task_pool, func, f[i] ); } - performer_job( n ); + wait_all_tasks_completed(task_pool); + return 1; } + + +void task_destroy() +{ + destroy_thread_pool( task_pool ); +} + +void task_init() +{ + vj_task_get_num_cpus(); + + numThreads = n_cpu/2; + + char *task_cfg = getenv( "VEEJAY_MULTITHREAD_TASKS" ); + if( task_cfg != NULL ) { + numThreads = atoi( task_cfg ); + } + + if( numThreads < 0 || numThreads > MAX_WORKERS ) { + numThreads = n_cpu; + veejay_msg(VEEJAY_MSG_ERROR, "Invalid value for VEEJAY_MULTITHREAD_TASKS, using default (%d)",numThreads); + } + + int i; + for( i = 0; i < MAX_WORKERS ; i ++ ) { + vj_task_args[i] = vj_calloc( sizeof(vj_task_arg_t) ); + } + + task_pool = create_thread_pool( n_cpu, numThreads ); +} diff --git a/veejay-current/veejay-core/veejaycore/vj-task.h b/veejay-current/veejay-core/veejaycore/vj-task.h index a9c02e24..7239eb11 100644 --- a/veejay-current/veejay-core/veejaycore/vj-task.h +++ b/veejay-current/veejay-core/veejaycore/vj-task.h @@ -47,7 +47,6 @@ typedef struct } vj_task_arg_t; int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int n_planes, performer_job_routine func ); -uint8_t vj_task_available(); void vj_task_set_float( float f ); void vj_task_set_ptr( void *ptr ); void vj_task_set_to_frame( VJFrame *frame, int pos, int job ); @@ -58,7 +57,7 @@ int task_start(unsigned int max_workers); void task_stop(unsigned int max_workers); void task_init(); void task_destroy(); -unsigned int task_num_cpus(); +int vj_task_get_num_cpus(); void vj_task_set_overlap( int val ); void performer_job( uint8_t job_num ); uint8_t num_threaded_tasks();