refactor parallel tasks

This commit is contained in:
veejay
2023-10-09 23:47:12 +02:00
parent 91d582cde1
commit bcfbbfc2f8
3 changed files with 56 additions and 120 deletions

View File

@@ -61,6 +61,7 @@ typedef struct VJFrame_t
double timecode;
int yuv_fmt;
int range;
int offset;
} VJFrame __attribute__((aligned(16)));
typedef struct VJFrameInfo_t

View File

@@ -42,16 +42,12 @@
#include <pthread.h>
#include <sched.h>
#include <veejaycore/defs.h>
//#include <libsubsample/subsample.h>
//#include <veejay/vj-misc.h>
//#include <veejay/vj-lib.h>
#include <libvjmem/vjmem.h>
#include <libvjmsg/vj-msg.h>
//#include <libvje/internal.h>
#include <libavutil/pixfmt.h>
#include <veejaycore/vj-task.h>
#include <veejaycore/avcommon.h>
#include <stdatomic.h>
//@ job description
static vj_task_arg_t *vj_task_args[MAX_WORKERS];
@@ -61,7 +57,7 @@ struct task
int8_t task_id;
void *data;
performer_job_routine handler;
struct task *next;
char padding[64 - sizeof(int8_t) - sizeof(void*) - sizeof(performer_job_routine)];
};
//@ job structure
@@ -74,35 +70,28 @@ typedef struct {
static struct task running_tasks[MAX_WORKERS];
static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t tasks_completed;
static pthread_cond_t current_task;
struct task *tasks_ = NULL;
struct task *tail_task_= NULL;
static pthread_t p_threads[MAX_WORKERS];
static pthread_attr_t p_attr[MAX_WORKERS];
static pjob_t *job_list[MAX_WORKERS];
static uint8_t p_tasks[MAX_WORKERS];
static pjob_t *job_list[MAX_WORKERS];
static unsigned int n_cpu = 1;
static uint8_t numThreads = 0;
static uint8_t total_tasks = 0;
static uint8_t tasks_done[MAX_WORKERS];
static int exitFlag = 0;
static struct task running_tasks[MAX_WORKERS];
_Atomic static uint8_t total_tasks = 0;
_Atomic static uint8_t tasks_done[MAX_WORKERS];
_Atomic static int exitFlag = 0;
static struct sched_param param;
static int euid = 0;
static uint8_t task_get_workers();
#define __lock() pthread_mutex_lock(&queue_mutex)
#define __unlock() pthread_mutex_unlock(&queue_mutex)
static inline uint8_t task_is_work_done( )
{
const uint8_t n = task_get_workers();
uint8_t i;
uint8_t done = 0;
for( i = 0; i < n; i ++ ) {
done += tasks_done[i];
done += atomic_load(&tasks_done[i]);
}
return (done == n ? 1 : 0);
}
@@ -111,7 +100,7 @@ static void task_allocate()
{
unsigned int i;
for( i = 0; i < MAX_WORKERS; i ++ ) {
job_list[i] = vj_malloc(sizeof(pjob_t));
job_list[i] = vj_calloc(sizeof(pjob_t));
vj_task_args[i] = vj_calloc(sizeof(vj_task_arg_t));
}
@@ -146,75 +135,60 @@ static void task_reset()
numThreads = 0;
}
static void task_add(uint8_t task_no, performer_job_routine fp , void *data)
{
struct task *qp_task = &(running_tasks[task_no]);
qp_task->task_id = task_no;
qp_task->handler = fp;
qp_task->data = data;
qp_task->next = NULL;
static void task_add(uint8_t task_no, performer_job_routine fp, void *data) {
struct task *qp_task = &(running_tasks[task_no]);
if( total_tasks == 0 ) {
tasks_ = qp_task;
tail_task_ = tasks_;
}
else {
tail_task_->next= qp_task;
tail_task_ = qp_task;
}
qp_task->task_id = task_no;
qp_task->handler = fp;
qp_task->data = data;
total_tasks ++;
atomic_fetch_add(&total_tasks, 1);
}
static struct task *task_get()
{
struct task *t = NULL;
if( total_tasks > 0 ) {
t = tasks_;
tasks_ = tasks_->next;
if( tasks_ == NULL ) {
tail_task_ = NULL;
}
total_tasks --;
if( atomic_load(&total_tasks) > 0 ) {
atomic_fetch_sub(&total_tasks,1);
return &(running_tasks[atomic_load(&total_tasks)]);
}
return t;
return NULL;
}
static void task_run( struct task *task, void *data, uint8_t id)
{
static void task_run(struct task *task, void *data, uint8_t id) {
(*task->handler)(data);
__lock();
tasks_done[id] ++;
if( task_is_work_done() ) {
pthread_cond_signal( &tasks_completed );
atomic_fetch_add(&tasks_done[id],1);
if (task_is_work_done()) {
pthread_mutex_lock(&queue_mutex);
pthread_cond_signal(&tasks_completed);
pthread_mutex_unlock(&queue_mutex);
}
__unlock();
}
static void *task_thread(void *data)
{
for( ;; )
{
struct task *t = NULL;
__lock();
while( total_tasks == 0 ) {
if( exitFlag ) {
__unlock();
pthread_exit(0);
return NULL;
}
pthread_cond_wait(&current_task, &queue_mutex );
while( atomic_load(&total_tasks) == 0 && !atomic_load(&exitFlag)) {
pthread_mutex_lock(&queue_mutex);
pthread_cond_wait(&tasks_completed, &queue_mutex );
pthread_mutex_unlock(&queue_mutex);
}
if(atomic_load(&exitFlag) && atomic_load(&total_tasks) == 0) {
pthread_exit(NULL);
}
t = task_get();
__unlock();
if( t ) {
if( t != NULL ) {
task_run( t, t->data, t->task_id );
}
}
@@ -252,43 +226,13 @@ int task_start(unsigned int max_workers)
{
uint8_t i;
if( max_workers >= MAX_WORKERS ) {
veejay_msg(0, "Maximum number of threads is %d", MAX_WORKERS );
return 0;
}
exitFlag = 0;
#ifdef HAVE_LINUX
cpu_set_t cpuset;
#endif
atomic_store(&exitFlag, 0);
pthread_cond_init( &tasks_completed, NULL );
pthread_cond_init( &current_task, NULL );
for( i = 0 ; i < max_workers; i ++ ) {
pthread_attr_init( &p_attr[i] );
if( euid == 0 ) {
pthread_attr_setinheritsched( &p_attr[i], PTHREAD_EXPLICIT_SCHED );
pthread_attr_setschedpolicy( &p_attr[i], SCHED_FIFO );
}
pthread_attr_setschedparam( &p_attr[i], &param );
#ifdef HAVE_LINUX
/* Affinity API release notes for OS X v10.5 (https://developer.apple.com/library/archive/releasenotes/Performance/RN-AffinityAPI/
* The OS X kernel manages all thread placement.
* Setting n_cpu equal to the number of cores in a single cpu would let us share the L2 cache, but this requires a larger re-write and testing.
* See https://yyshen.github.io/2015/01/18/binding_threads_to_cores_osx.html
* However, only veejay-server currently uses this task manager.
*/
if( n_cpu > 1 ) {
unsigned int selected_cpu = ((i+1)%n_cpu);
CPU_ZERO(&cpuset);
CPU_SET( selected_cpu, &cpuset );
if(pthread_attr_setaffinity_np( &p_attr[i], sizeof(cpuset), &cpuset ) != 0 )
veejay_msg(VEEJAY_MSG_WARNING,"Unable to set CPU %d affinity to thread %d", ((i+1)%n_cpu),i);
}
#endif
if( pthread_create( &p_threads[i], (void*) &p_attr[i], task_thread, NULL ) )
{
veejay_msg(0, "%s: error starting thread %d/%d", __FUNCTION__,i,max_workers );
@@ -311,10 +255,10 @@ void task_stop(unsigned int max_workers)
{
unsigned int i;
__lock();
exitFlag = 1;
pthread_cond_broadcast( &current_task );
__unlock();
pthread_mutex_lock(&queue_mutex);
atomic_store(&exitFlag, 1);
pthread_cond_broadcast( &tasks_completed );
pthread_mutex_unlock(&queue_mutex);
for( i = 0; i < max_workers;i++ ) {
pthread_join( p_threads[i], (void*)&p_tasks[i] );
@@ -322,35 +266,28 @@ void task_stop(unsigned int max_workers)
}
pthread_cond_destroy( &tasks_completed );
pthread_cond_destroy( &current_task );
pthread_mutex_destroy( &queue_mutex );
task_reset();
}
void performer_job( uint8_t n )
{
uint8_t i;
__lock();
pthread_mutex_lock(&queue_mutex);
for( i = 0; i < n; i ++ ) {
pjob_t *slot = job_list[i];
tasks_done[i] = 0;
atomic_store(&tasks_done[i],0);
task_add( i, slot->job, slot->arg );
}
pthread_cond_broadcast( &current_task );
__unlock();
pthread_cond_broadcast( &tasks_completed );
pthread_mutex_unlock(&queue_mutex);
uint8_t stop = 0;
while(!stop) {
__lock();
if( !task_is_work_done() ) {
pthread_cond_wait( &tasks_completed, &queue_mutex );
}
if( task_is_work_done() ) {
stop = 1;
}
__unlock();
while(!task_is_work_done()) {
pthread_mutex_lock(&queue_mutex);
pthread_cond_wait( &tasks_completed, &queue_mutex );
pthread_mutex_unlock(&queue_mutex);
}
}
@@ -405,7 +342,7 @@ void vj_task_set_to_frame( VJFrame *in, int i, int job )
in->height= first->height;
in->ssm = first->ssm;
in->len = first->width * first->height;
in->offset = first->offset;
if( first->ssm ) {
in->uv_width = first->width;
in->uv_height= first->height;
@@ -480,6 +417,7 @@ void vj_task_set_from_frame( VJFrame *in )
v->shifth = 0;
v->format = in->format;
v->ssm = 0;
v->offset = i * v->strides[0];
}
}
else
@@ -498,6 +436,7 @@ void vj_task_set_from_frame( VJFrame *in )
v->shiftv = in->shift_v;
v->shifth = in->shift_h;
v->format = in->format;
v->offset = i * v->strides[0];
if( v->ssm == 1 ) {
v->strides[1] = v->strides[0];
v->strides[2] = v->strides[1];
@@ -555,11 +494,6 @@ int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int
}
performer_job( n );
/*
for( i = 0; i < n; i ++ ) {
veejay_memset( f[i], 0, sizeof(vj_task_arg_t));
}
*/
return 1;
}

View File

@@ -43,6 +43,7 @@ typedef struct
float fparam;
int iparam;
int iparams[32];
int offset;
} vj_task_arg_t;
int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int n_planes, performer_job_routine func );