add thread local storage; each fx worker thread has its own buffer of size width * (height / num_threads). once processing is done, the thread local bufs are copied back to the main output

This commit is contained in:
veejay
2023-11-30 23:29:24 +01:00
parent 7c5bb76a72
commit 9cb41d12c1
6 changed files with 95 additions and 14 deletions

View File

@@ -51,6 +51,8 @@
//@ job description
static vj_task_arg_t *vj_task_args[MAX_WORKERS];
static pthread_key_t thread_buf_key;
static int thread_buf_size = 0;
//@ job structure
typedef struct {
@@ -68,6 +70,7 @@ typedef struct {
atomic_int stop_flag;
int num_submitted_tasks;
int num_completed_tasks;
uint8_t ***thread_local_bufs;
} thread_pool_t;
typedef struct {
@@ -154,6 +157,11 @@ void vj_task_set_to_frame( VJFrame *in, int i, int job )
{
vj_task_arg_t *first = vj_task_args[job];
in->local = first->local;
in->jobnum = job;
in->totaljobs = numThreads;
in->out_width = first->out_width;
in->out_height = first->out_height;
in->width = first->width;
in->height= first->height;
in->ssm = first->ssm;
@@ -234,6 +242,8 @@ void vj_task_set_from_frame( VJFrame *in )
v->format = in->format;
v->ssm = 0;
v->offset = i * v->strides[0];
v->out_width = in->width;
v->out_height = in->height;
}
}
else
@@ -253,6 +263,8 @@ void vj_task_set_from_frame( VJFrame *in )
v->shifth = in->shift_h;
v->format = in->format;
v->offset = i * v->strides[0];
v->out_width = in->width;
v->out_height = in->height;
if( v->ssm == 1 ) {
v->strides[1] = v->strides[0];
v->strides[2] = v->strides[1];
@@ -261,12 +273,37 @@ void vj_task_set_from_frame( VJFrame *in )
}
}
static void init_thread_local_bufs(size_t plane_size) {
uint8_t **tlbuf = (uint8_t**) vj_malloc( sizeof(uint8_t*) * 4 );
tlbuf[0] = (uint8_t*) vj_malloc( plane_size * 4 );
tlbuf[1] = tlbuf[0] + plane_size;
tlbuf[2] = tlbuf[1] + plane_size;
tlbuf[3] = tlbuf[2] + plane_size;
veejay_memset( tlbuf[0], 255, plane_size );
veejay_memset( tlbuf[1], 128, plane_size );
veejay_memset( tlbuf[2], 128, plane_size );
pthread_setspecific( thread_buf_key, tlbuf );
}
static void* task_worker(void *arg) {
task_thread_args_t *ptr = (task_thread_args_t*) arg;
thread_pool_t *pool = (thread_pool_t *)ptr->pool;
int job_num = ptr->job_num;
init_thread_local_bufs( thread_buf_size );
uint8_t **tlbuf = (uint8_t**) pthread_getspecific( thread_buf_key );
pthread_mutex_lock( &(pool->lock) );
pool->thread_local_bufs[ job_num ] = tlbuf;
pthread_mutex_unlock( &(pool->lock) );
while (1) {
pthread_mutex_lock(&pool->lock);
while( pool->queue[ job_num ].job == NULL) {
@@ -283,7 +320,6 @@ static void* task_worker(void *arg) {
}
pjob_t task = pool->queue[ job_num ];
task.job(task.arg);
@@ -303,6 +339,15 @@ static void* task_worker(void *arg) {
return NULL;
}
static void free_thread_local_bufs(void *buf) {
uint8_t **tlbuf = (uint8_t**) buf;
free( tlbuf[0] );
free( tlbuf );
}
static thread_pool_t* create_thread_pool(int num_threads) {
thread_pool_t *pool = (thread_pool_t *)vj_calloc(sizeof(thread_pool_t));
pool->queue = (pjob_t *)vj_calloc(sizeof(pjob_t) * num_threads);
@@ -312,6 +357,10 @@ static thread_pool_t* create_thread_pool(int num_threads) {
pthread_cond_init(&pool->start_signal, NULL);
atomic_init(&pool->stop_flag, 0);
pthread_key_create( &thread_buf_key, free_thread_local_bufs );
pool->thread_local_bufs = (uint8_t***) vj_malloc( sizeof(uint8_t**) * num_threads );
for (int i = 0; i < num_threads; i++) {
task_thread_args_t *args = thread_args[i];
@@ -366,7 +415,9 @@ static void destroy_thread_pool(thread_pool_t *pool) {
}
int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int n_planes, performer_job_routine func )
int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int n_planes, performer_job_routine func, int use_thread_local )
{
const uint8_t n = vj_task_get_workers();
if( n <= 1 ) {
@@ -404,14 +455,15 @@ int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int
if( buf3 != NULL )
f[j]->temp[i] = buf3[i] + (f[j]->strides[i]* j);
}
}
vj_task_lock();
for( i = 0; i < n; i ++ ) {
f[i]->jobnum = i;
}
vj_task_lock();
f[i]->local = task_pool->thread_local_bufs[i];
}
for( i = 0; i < n; i ++ ) {
submit_job( task_pool, func, f[i] );
@@ -422,18 +474,35 @@ int vj_task_run(uint8_t **buf1, uint8_t **buf2, uint8_t **buf3, int *strides,int
wait_all_tasks_completed(task_pool);
if( use_thread_local ) {
for( i = 0; i < n; i ++ ) {
for( j = 0; j < n_planes; j ++ ) {
veejay_memcpy( f[i]->input[j], task_pool->thread_local_bufs[i][j], f[i]->strides[j] );
}
}
}
return 1;
}
void task_destroy()
{
pthread_key_delete( thread_buf_key );
free(task_pool->thread_local_bufs);
destroy_thread_pool( task_pool );
}
void task_init()
void task_init(int w, int h)
{
thread_buf_size = w * h;
vj_task_get_num_cpus();
numThreads = n_cpu/2;