move multithread code to vj-task.c, added tasks for sub- and supersampling, refactor multithreaded memcpy methods

This commit is contained in:
niels
2012-11-22 23:00:40 +01:00
parent 701870e911
commit 59db8c5106
6 changed files with 231 additions and 630 deletions

View File

@@ -136,6 +136,7 @@
#include <libvjmsg/vj-msg.h>
#include <aclib/ac.h>
#include <libyuv/mmx.h>
#include <veejay/vj-task.h>
#ifdef STRICT_CHECKING
#include <assert.h>
#endif
@@ -1062,18 +1063,9 @@ void find_best_memset()
free( buf2 );
}
typedef struct {
uint8_t *input[4];
uint8_t *output[4];
int strides[4];
uint8_t *temp[4];
float fparam;
} fcpy_info;
static fcpy_info *fcpy_info_arr[16];
static void vj_frame_copy_job( fcpy_info *info ) {
static void vj_frame_copy_job( void *arg ) {
int i;
vj_task_arg_t *info = (vj_task_arg_t*) arg;
#ifdef STRICT_CHECKING
assert( task_get_workers() > 0 );
#endif
@@ -1083,91 +1075,30 @@ static void vj_frame_copy_job( fcpy_info *info ) {
}
}
//@ cleanup FIXME: all below -> move.
static void vj_frame_copy2( uint8_t **input, uint8_t **output, int *strides, int planes )
{
fcpy_info **f = fcpy_info_arr;
memset(f[0],0,sizeof(fcpy_info));
memset(f[1],0,sizeof(fcpy_info));
static void vj_frame_clear_job( void *arg ) {
int i;
int j;
/* calculate new stride lengths */
for( j = 0; j < 2; j ++ ) {
for( i = 0; i < planes; i ++ ) {
f[j]->strides[i] = strides[i] >> 1;
}
vj_task_arg_t *info = (vj_task_arg_t*) arg;
#ifdef STRICT_CHECKING
assert( task_get_workers() > 0 );
#endif
for( i = 0; i < 4; i ++ ) {
if( info->strides[i] > 0 )
veejay_memset( info->input[i], info->iparam, info->strides[i] );
}
/* assign planes for job 0 */
for( i = 0; i < planes; i ++ ) {
f[0]->input[i] = input[i];
f[0]->output[i] = output[i];
}
/* assign pointers for job 1 */
for( i = 0; i < planes; i ++ ) {
f[1]->input[i] = input[i] + (strides[i] >> 1);
f[1]->output[i] = output[i] + (strides[i] >> 1);
}
/* launch the two jobs in parallel */
performer_new_job( 2 );
performer_set_job( 0, &vj_frame_copy_job, f[0]);
performer_set_job( 1, &vj_frame_copy_job, f[1]);
performer_job( 2 );
}
static void vj_frame_copyN( uint8_t **input, uint8_t **output, int *strides, int planes, int n )
static void vj_frame_copyN( uint8_t **input, uint8_t **output, int *strides, int planes )
{
int i,j;
for( i = 0; i < n ; i ++ )
memset( fcpy_info_arr[i],0,sizeof(fcpy_info));
fcpy_info **f = fcpy_info_arr;
/* calculate new stride lengths */
for( j = 0; j < n; j ++ ) {
for( i = 0; i < planes; i ++ ) {
f[j]->strides[i] = strides[i] / n;
}
}
/* assign planes for job 0 */
for( i = 0; i < planes; i ++ ) {
f[0]->input[i] = input[i];
f[0]->output[i] = output[i];
}
/* assign pointers for other jobs */
for( j = 1; j < n; j ++ ) {
for( i = 0; i < planes; i ++ ) {
f[j]->input[i] = f[(j-1)]->input[i] + f[(j-1)]->strides[i];
f[j]->output[i] = f[(j-1)]->output[i] + f[(j-1)]->strides[i];
//f[j].input[i] = input[i] + ((strides[i] >> 2) * j);
//f[j].output[i] = output[i] + ((strides[i] >> 2) * j);
}
}
/* launch the four jobs in parallel */
performer_new_job( n );
for( i = 0; i < n; i ++ ) {
performer_set_job( i, &vj_frame_copy_job, f[i]);
}
performer_job( n );
vj_task_run( input, output, NULL, strides,planes, (performer_job_routine) &vj_frame_copy_job );
}
/*
test pattern:
static void vj_frame_clearN( uint8_t **input, uint8_t **output, int *strides, int planes )
{
vj_task_run( input, output, NULL, strides,planes, (performer_job_routine) &vj_frame_clear_job );
}
/*test pattern:
int len = job->strides[0];
for( i = 0; i < len; i ++ )
img[0][i] = 255 - ( 15 * job->id );
@@ -1179,8 +1110,9 @@ test pattern:
}
*/
void vj_frame_slow_job( fcpy_info *job )
static void vj_frame_slow_job( void *arg )
{
vj_task_arg_t *job = (vj_task_arg_t*) arg;
int i,j;
uint8_t **img = job->output;
uint8_t **p0_buffer = job->input;
@@ -1198,11 +1130,13 @@ void vj_frame_slow_job( fcpy_info *job )
void vj_frame_slow_threaded( uint8_t **p0_buffer, uint8_t **p1_buffer, uint8_t **img, int len, int uv_len,const float frac )
{
int N = task_get_workers();
int strides[4] = { len, uv_len, uv_len,0 };
int i;
if( N == 0 ) {
if( vj_task_available() ) {
int strides[4] = { len, uv_len, uv_len, 0 };
vj_task_set_float( frac );
vj_task_run( p0_buffer, img, p1_buffer,strides, 4,(performer_job_routine) &vj_frame_slow_job );
} else {
int i,j;
if( uv_len != len ) {
for( i = 0; i < len ; i ++ ) {
img[0][i] = p0_buffer[0][i] + ( frac * (p1_buffer[0][i] - p0_buffer[0][i]));
@@ -1219,56 +1153,14 @@ void vj_frame_slow_threaded( uint8_t **p0_buffer, uint8_t **p1_buffer, uint8_t *
}
}
}
else {
int i,j;
fcpy_info **f = fcpy_info_arr;
for( j = 0; j < N; j ++ ) {
memset( f[j], 0, sizeof( fcpy_info ));
f[j]->strides[0] = len / N;
f[j]->strides[1] = uv_len / N;
f[j]->strides[2] = uv_len / N;
f[j]->strides[3] = 0;
f[j]->temp[3] = NULL;
f[j]->input[3] = NULL;
f[j]->output[3] = NULL;
f[j]->fparam = frac;
}
for( i = 0; i < 3; i ++ ) {
f[0]->input[i] = p0_buffer[i];
f[0]->output[i] = img[i];
f[0]->temp[i] = p1_buffer[i];
}
for( j = 1; j < N; j ++ ) {
for( i = 0; i < 3; i ++ ) {
f[j]->input[i] = p0_buffer[i] + (f[0]->strides[i] * j);// ( f[(j-1)].input[i] + f[(j-1)].strides[i];
f[j]->output[i] = img[i] + (f[0]->strides[i] * j);// f[(j-1)].output[i] + f[(j-1)].strides[i];
f[j]->temp[i] = p1_buffer[i] + (f[0]->strides[i]* j); //f[(j-1)].temp[i] + f[(j-1)].strides[i];
}
}
performer_new_job( N );
for( i = 0; i < N; i ++ ) {
performer_set_job( i, &vj_frame_slow_job, f[i]);
}
performer_job( N );
}
}
static void vj_frame_copy4( uint8_t **input, uint8_t **output, int *strides, int planes )
void vj_frame_simple_clear( uint8_t **input, int *strides, int v )
{
vj_frame_copyN( input,output,strides,planes,4 );
}
static void vj_frame_copy6( uint8_t **input, uint8_t **output, int *strides, int planes )
{
vj_frame_copyN( input,output,strides,planes,6 );
}
static void vj_frame_copy8( uint8_t **input, uint8_t **output, int *strides, int planes )
{
vj_frame_copyN( input,output,strides,planes,8 );
int i;
for( i = 0; i < 4; i ++ )
if( strides[i] > 0 )
veejay_memset( input[i], v,strides[i] );
}
@@ -1276,55 +1168,14 @@ void vj_frame_simple_copy( uint8_t **input, uint8_t **output, int *strides, int
{
int i;
for( i = 0; i < planes; i ++ )
veejay_memcpy( output[i],input[i], strides[i] );
veejay_memcpy( output[i],input[i], strides[i] );
}
static int num_tasks_ = 0;
int num_threaded_tasks()
{
return num_tasks_;
}
static void vj_frame_user1( uint8_t **input, uint8_t **output, int *strides, int planes )
{
vj_frame_copyN( input,output,strides,planes,num_tasks_ );
}
static void vj_frame_copy12( uint8_t **input, uint8_t **output, int *strides, int planes )
{
vj_frame_copyN( input,output,strides,planes,12 );
}
static void vj_frame_copy16( uint8_t **input, uint8_t **output, int *strides, int planes )
{
vj_frame_copyN( input,output,strides,planes,16 );
}
static struct {
char *name;
void (*function)(uint8_t **input, uint8_t **output, int *strides, int planes );
unsigned long long time;
int tasks;
} multithreaded_copy_method[] =
{
{ NULL, NULL, 0,0 },
{ "Inside main (classic)", vj_frame_simple_copy, 0,0 },
{ "2 threads", vj_frame_copy2, 0,2 },
{ "4 threads", vj_frame_copy4, 0,4 },
{ "6 threads", vj_frame_copy6, 0,6 },
{ "8 threads", vj_frame_copy8, 0,8 },
{ "12 threads", vj_frame_copy12,0,12 },
{ "16 threads", vj_frame_copy16,0,16 },
{ "user defined", vj_frame_user1, 0, 0 },
{ NULL, NULL, 0,0 },
};
void *(* vj_frame_copy)( uint8_t **input, uint8_t **output, int *strides, int n_planes ) = 0;
void vj_frame_copy1( uint8_t *input, uint8_t *output, int size )
void *(* vj_frame_clear)( uint8_t **input, int *strides, int val ) = 0;
void vj_frame_copy1( uint8_t *input, uint8_t *output, int size )
{
uint8_t *in[4] = { input, NULL,NULL,NULL };
uint8_t *ou[4] = { output,NULL,NULL,NULL };
@@ -1332,7 +1183,6 @@ void vj_frame_copy1( uint8_t *input, uint8_t *output, int size )
vj_frame_copy( in, ou, strides, 1 );
}
int find_best_threaded_memcpy(int w, int h)
{
uint8_t *src = (uint8_t*) vj_malloc(sizeof(uint8_t) * w * h * 4 );
@@ -1350,17 +1200,9 @@ int find_best_threaded_memcpy(int w, int h)
long k;
int j;
int i,best=0;
unsigned long long stats[c];
unsigned long long best_avg[c];
//@ fire up the threadpool
for ( i = 0; i < MAX_WORKERS ; i++ ) {
fcpy_info_arr[i] = (fcpy_info*) vj_malloc(sizeof(fcpy_info));
memset(fcpy_info_arr[i],0,sizeof(fcpy_info));
}
task_init();
int cpus = task_num_cpus();
@@ -1370,27 +1212,14 @@ int find_best_threaded_memcpy(int w, int h)
if( w <= 720 && h <= 480 )
{
preferred_tasks = 1; //@ classic, run in 1/4 PAL, low res, fast, keep it outside threadpooling.
best = 1;
}
else {
preferred_tasks = (cpus * 2 );
warn_user = 1;
}
for ( i = 1; multithreaded_copy_method[i].name; i ++ )
if( preferred_tasks == multithreaded_copy_method[i].tasks )
best = i;
unsigned long long best_time = 0;
char *str2 = getenv( "VEEJAY_MULTITHREAD_TASKS" );
int num_tasks = preferred_tasks;
if( best == 0 ) {
best = 8; //@ user defined
}
else {
num_tasks = multithreaded_copy_method[best].tasks;
}
int num_tasks = preferred_tasks;
if( str2 != NULL ) {
num_tasks = atoi( str2 );
@@ -1400,11 +1229,6 @@ int find_best_threaded_memcpy(int w, int h)
return -1;
}
if(num_tasks_ <= 1 )
best = 1;
else
best = 8;
veejay_msg(VEEJAY_MSG_DEBUG, "Testing your settings ...");
if( num_tasks > 1 )
{
@@ -1412,51 +1236,49 @@ int find_best_threaded_memcpy(int w, int h)
veejay_msg(0,"Failed to launch %d threads ?!", num_tasks);
return -1;
}
}
for( k = 0; k < c; k ++ )
{
unsigned long long t = rdtsc();
multithreaded_copy_method[best].function( source,dest,planes,4 );
t = rdtsc() - t;
stats[k] = t;
}
for( k = 0; k < c; k ++ )
{
unsigned long long t = rdtsc();
vj_frame_copyN( source,dest,planes,4 );
t = rdtsc() - t;
stats[k] = t;
}
int sum = 0;
for( k = 0; k < c ;k ++ )
sum += stats[k];
int sum = 0;
for( k = 0; k < c ;k ++ )
sum += stats[k];
unsigned long long best_time = (sum / c );
veejay_msg(VEEJAY_MSG_DEBUG, "Timing results for copying %2.2f MB data with %d thread(s): %lld",
unsigned long long best_time = (sum / c );
veejay_msg(VEEJAY_MSG_DEBUG, "Timing results for copying %2.2f MB data with %d thread(s): %lld",
(c * (w*h) *4) /1048576.0f, num_tasks, best_time);
if( num_tasks > 1 ) {
task_stop( num_tasks );
veejay_msg( VEEJAY_MSG_INFO, "Threadpool is %d threads.", num_tasks );
veejay_msg( VEEJAY_MSG_INFO, "Settings will be used for Slow Motion, Feedback, Cache and FX Cache");
}
else {
veejay_msg( VEEJAY_MSG_WARNING, "Not multithreading pixel operations.");
}
}
if( warn_user ){
if( warn_user && num_tasks > 1){
veejay_msg(VEEJAY_MSG_WARNING, "(Experimental) Enabling multicore support!");
}
if( num_tasks > 1 ) {
veejay_msg( VEEJAY_MSG_INFO, "Using %d threads scheduled over %d cpus in performer.", num_tasks, cpus );
veejay_msg( VEEJAY_MSG_DEBUG,"Use envvar VEEJAY_MULTITHREAD_TASKS=<num threads> to customize.");
vj_frame_copy = vj_frame_copyN;
vj_frame_clear= vj_frame_clearN;
}
else {
vj_frame_copy = vj_frame_simple_copy;
vj_frame_clear = vj_frame_simple_clear;
}
vj_frame_copy = multithreaded_copy_method[best].function;
free(src);
free(dst);
num_tasks_ = num_tasks;
return num_tasks;
}

View File

@@ -37,7 +37,7 @@
#include <libvjmsg/vj-msg.h>
#include <libvje/vje.h>
#include <libyuv/yuvconv.h>
#include <veejay/vj-task.h>
const char *ssm_id[SSM_COUNT] = {
"unknown",
@@ -61,7 +61,7 @@ const char *ssm_description[SSM_COUNT] = {
#endif
};
#define RUP8(num)(((num)+8)&~8)
// forward decl
@@ -754,22 +754,122 @@ static void ss_444_to_420mpeg2(uint8_t *buffer, int width, int height)
}
}
static void chroma_subsample_cp_task( void *ptr )
{
vj_task_arg_t *f = (vj_task_arg_t*) ptr;
switch (f->iparam) {
case SSM_420_JPEG_BOX:
case SSM_420_JPEG_TR:
ss_444_to_420jpeg_cp(f->input[1],f->output[1], f->width, f->height);
ss_444_to_420jpeg_cp(f->input[2],f->output[2], f->width, f->height);
break;
case SSM_420_MPEG2:
break;
case SSM_422_444:
ss_444_to_422_cp(f->priv,f->input[1],f->output[1],f->width,f->height);
ss_444_to_422_cp(f->priv,f->input[2],f->output[2],f->width,f->height);
#ifdef HAVE_ASM_MMX
__asm__ __volatile__ ( _EMMS:::"memory");
#endif
break;
}
}
static void chroma_subsample_task( void *ptr )
{
vj_task_arg_t *f = (vj_task_arg_t*) ptr;
switch (f->iparam) {
case SSM_420_JPEG_BOX:
case SSM_420_JPEG_TR:
ss_444_to_420jpeg(f->input[1], f->width, f->height);
ss_444_to_420jpeg(f->input[2], f->width, f->height);
#ifdef HAVE_ASM_MMX
__asm__ __volatile__ ( _EMMS:::"memory");
#endif
break;
case SSM_420_MPEG2:
ss_444_to_420mpeg2(f->input[1], f->width, f->height);
ss_444_to_420mpeg2(f->input[2], f->width, f->height);
break;
case SSM_422_444:
ss_444_to_422(f->priv,f->input[1],f->width,f->height);
ss_444_to_422(f->priv,f->input[2],f->width,f->height);
#ifdef HAVE_ASM_MMX
__asm__ __volatile__ ( _EMMS:::"memory");
#endif
break;
case SSM_420_422:
ss_422_to_420(f->input[1],f->width,f->height);
ss_422_to_420(f->input[2],f->width,f->height);
break;
default:
break;
}
}
static void chroma_supersample_task( void *ptr )
{
vj_task_arg_t *f = (vj_task_arg_t*) ptr;
switch (f->iparam) {
case SSM_420_JPEG_BOX:
ss_420jpeg_to_444(f->input[1], f->width, f->height);
ss_420jpeg_to_444(f->input[2], f->width, f->height);
#ifdef HAVE_ASM_MMX
__asm__ __volatile__ ( _EMMS:::"memory");
#endif
break;
case SSM_420_JPEG_TR:
tr_420jpeg_to_444(f->priv,f->input[1], f->width, f->height);
tr_420jpeg_to_444(f->priv,f->input[2], f->width, f->height);
break;
case SSM_422_444:
tr_422_to_444(f->priv,f->input[1],f->width,f->height);
tr_422_to_444(f->priv,f->input[2],f->width,f->height);
#ifdef HAVE_ASM_MMX
__asm__ __volatile__ ( _EMMS:::"memory");
#endif
break;
case SSM_420_422:
ss_420_to_422( f->input[1], f->width, f->height );
ss_420_to_422( f->input[2], f->width, f->height );
break;
case SSM_420_MPEG2:
// ss_420mpeg2_to_444(ycbcr[1], width, height);
// ss_420mpeg2_to_444(ycbcr[2], width, height);
break;
default:
break;
}
}
void chroma_subsample_cp(subsample_mode_t mode, void *data, uint8_t *ycbcr[], uint8_t *dcbcr[],
int width, int height)
{
if( vj_task_available() ) {
vj_task_set_wid( width );
vj_task_set_hei( height );
vj_task_set_int( mode );
vj_task_alloc_internal_buf( width * 2);
vj_task_run( ycbcr, dcbcr, NULL, NULL,3, (performer_job_routine) &chroma_subsample_cp_task );
return;
}
switch (mode) {
case SSM_420_JPEG_BOX:
case SSM_420_JPEG_TR:
ss_444_to_420jpeg_cp(ycbcr[1],dcbcr[1], width, height);
ss_444_to_420jpeg_cp(ycbcr[2],dcbcr[2], width, height);
break;
case SSM_420_MPEG2:
break;
case SSM_422_444:
ss_444_to_422_cp(data,ycbcr[1],dcbcr[1],width,height);
ss_444_to_422_cp(data,ycbcr[2],dcbcr[2],width,height);
case SSM_420_JPEG_BOX:
case SSM_420_JPEG_TR:
ss_444_to_420jpeg_cp(ycbcr[1],dcbcr[1], width, height);
ss_444_to_420jpeg_cp(ycbcr[2],dcbcr[2], width, height);
break;
case SSM_420_MPEG2:
break;
case SSM_422_444:
ss_444_to_422_cp(data,ycbcr[1],dcbcr[1],width,height);
ss_444_to_422_cp(data,ycbcr[2],dcbcr[2],width,height);
#ifdef HAVE_ASM_MMX
__asm__ __volatile__ ( _EMMS:::"memory");
#endif
@@ -781,13 +881,22 @@ void chroma_subsample_cp(subsample_mode_t mode, void *data, uint8_t *ycbcr[], ui
}
}
void chroma_subsample(subsample_mode_t mode, void *data, uint8_t *ycbcr[],
int width, int height)
{
if( vj_task_available() ) {
vj_task_set_shift(0,0);
vj_task_set_wid( width );
vj_task_set_hei( height );
vj_task_set_int( mode );
vj_task_alloc_internal_buf( width * 2); //@ FIXME, redundant malloc of *data (unused in multithreaded context)
vj_task_run( ycbcr, ycbcr, NULL, NULL, 3, (performer_job_routine ) &chroma_subsample_task );
return;
}
switch (mode) {
case SSM_420_JPEG_BOX:
case SSM_420_JPEG_TR:
@@ -821,35 +930,56 @@ void chroma_subsample(subsample_mode_t mode, void *data, uint8_t *ycbcr[],
void chroma_supersample(subsample_mode_t mode,void *data, uint8_t *ycbcr[],
int width, int height)
{
if( vj_task_available() ) {
switch( mode ) {
case SSM_420_JPEG_BOX:
case SSM_420_JPEG_TR:
case SSM_420_422:
vj_task_set_shift(1,1);
break;
default:
vj_task_set_shift(0,1 );
break;
}
vj_task_set_wid( width );
vj_task_set_hei( height );
vj_task_set_int( mode );
vj_task_alloc_internal_buf( width * 2 );
vj_task_run( ycbcr, ycbcr, NULL, NULL,3, (performer_job_routine) &chroma_supersample_task );
return;
}
switch (mode) {
case SSM_420_JPEG_BOX:
case SSM_420_JPEG_BOX:
ss_420jpeg_to_444(ycbcr[1], width, height);
ss_420jpeg_to_444(ycbcr[2], width, height);
#ifdef HAVE_ASM_MMX
__asm__ __volatile__ ( _EMMS:::"memory");
#endif
break;
case SSM_420_JPEG_TR:
tr_420jpeg_to_444(data,ycbcr[1], width, height);
tr_420jpeg_to_444(data,ycbcr[2], width, height);
break;
case SSM_422_444:
tr_422_to_444(data,ycbcr[1],width,height);
tr_422_to_444(data,ycbcr[2],width,height);
case SSM_420_JPEG_TR:
tr_420jpeg_to_444(data,ycbcr[1], width, height);
tr_420jpeg_to_444(data,ycbcr[2], width, height);
break;
case SSM_422_444:
tr_422_to_444(data,ycbcr[1],width,height);
tr_422_to_444(data,ycbcr[2],width,height);
#ifdef HAVE_ASM_MMX
__asm__ __volatile__ ( _EMMS:::"memory");
__asm__ __volatile__ ( _EMMS:::"memory");
#endif
break;
case SSM_420_422:
ss_420_to_422( ycbcr[1], width, height );
ss_420_to_422( ycbcr[2], width, height );
break;
case SSM_420_MPEG2:
// ss_420mpeg2_to_444(ycbcr[1], width, height);
// ss_420mpeg2_to_444(ycbcr[2], width, height);
break;
default:
break;
case SSM_420_422:
ss_420_to_422( ycbcr[1], width, height );
ss_420_to_422( ycbcr[2], width, height );
break;
case SSM_420_MPEG2:
// ss_420mpeg2_to_444(ycbcr[1], width, height);
// ss_420mpeg2_to_444(ycbcr[2], width, height);
break;
default:
break;
}
}

View File

@@ -29,7 +29,7 @@ VEEJAY_LIB_FILE = libveejay.la
lib_LTLIBRARIES = $(VEEJAY_LIB_FILE)
libveejay_la_SOURCES = jpegutils.c vj-misc.c \
libveejay_la_SOURCES = jpegutils.c vj-misc.c vj-task.c \
vj-osc.c vjkf.c vj-event.c vj-eventman.c vj-perform.c \
x11misc.c vj-shm.c vj-sdl.c vj-dfb.c vj-viewport.c vj-composite.c \
vj-font.c vj-pjack.c vj-splitdisplay.c vj-share.c liblavplayvj.c
@@ -75,12 +75,12 @@ veejay_headers = vims.h jpegutils.h vevo.h vj-composite.h vj-jack.h \
vj-OSC.h vj-viewport.h lav_common.h vims.h vj-dfb.h \
vjkf.h vj-perform.h vj-viewport-xml.h libveejay.h \
vj-audio.h vj-event.h vj-lib.h vj-plug.h x11misc.h \
veejay.h vj-bjack.h vj-font.h vj-misc.h vj-sdl.h
veejay.h vj-bjack.h vj-font.h vj-misc.h vj-sdl.h vj-task.h
veejay_SOURCES = veejay.c ${veejay_headers}
veejay_LDADD = libveejay.la @LIBGETOPT_LIB@
vevotest_SOURCES = vevotest.c ${veejay_headres}
vevotest_LDADD = libveejay.la @LIBGETOPT_LIB@
#vevotest_SOURCES = vevotest.c ${veejay_headres}
#vevotest_LDADD = libveejay.la @LIBGETOPT_LIB@

View File

@@ -660,345 +660,3 @@ int veejay_sprintf( char *s, size_t size, const char *format, ... )
}
#endif
/* performer: run a function in parallel on multi core cpu
* move this to a new file.
*
*/
/*
#define THREAD_STACK 1024*1024
void performer_init()
{
mlockall( MCL_CURRENT | MCL_FUTURE );
stack = malloc(THREAD_STACK);
pthread_attr_setstack( &attr, stack, THREAD_STACK );
}
*/
struct task
{
int task_id;
void *data;
void *(*handler)(void *arg);
// performer_job_routine handler;
struct task *next;
};
typedef struct {
performer_job_routine job;
void *arg;
} pjob_t;
static int total_tasks_ = 0;
static int tasks_done[MAX_WORKERS];
static int tasks_todo = 0;
static int exitFlag = 0;
static int taskLock = 0;
static pthread_mutex_t queue_mutex;// = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; //PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
//pthread_mutex_t sync_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static pthread_cond_t tasks_completed;
static pthread_cond_t current_task;// = PTHREAD_COND_INITIALIZER;
static int numThreads = 0;
struct task *tasks_ = NULL;
struct task *tail_task_ = NULL;
static pthread_t p_threads[MAX_WORKERS];
static pthread_attr_t p_attr[MAX_WORKERS];
static int p_tasks[MAX_WORKERS];
static int thr_id[MAX_WORKERS];
static pjob_t *job_list[MAX_WORKERS];
int task_get_workers()
{
return numThreads;
}
void *task_add(int task_no, void *(fp)(void *data), void *data)
{
struct task *enqueue_task = (struct task*) malloc( sizeof(struct task));
if(!enqueue_task) {
return NULL;
}
// int err = pthread_mutex_lock( &queue_mutex );
enqueue_task->task_id = task_no;
enqueue_task->handler = fp;
enqueue_task->data = data;
enqueue_task->next = NULL;
if( total_tasks_ == 0 ) {
tasks_ = enqueue_task;
tail_task_ = tasks_;
}
else {
tail_task_->next = enqueue_task;
tail_task_ = enqueue_task;
}
total_tasks_ ++;
int err = pthread_cond_signal( &current_task );
// err = pthread_mutex_unlock( &queue_mutex );
}
struct task *task_get( pthread_mutex_t *mutex )
{
int err = pthread_mutex_lock(mutex);
struct task *t = NULL;
if( total_tasks_ > 0 ) {
t = tasks_;
tasks_ = tasks_->next;
if( tasks_ == NULL ) {
tail_task_ = NULL;
}
total_tasks_ --;
}
err = pthread_mutex_unlock(mutex);
return t;
}
void task_run( struct task *task, void *data, int id)
{
if( task )
{
(*task->handler)(data);
pthread_mutex_lock(&queue_mutex);
tasks_done[id] ++; //@ inc, it is possible same thread tasks both tasks
pthread_cond_signal( &tasks_completed );
pthread_mutex_unlock( &queue_mutex );
}
}
void *task_thread(void *data)
{
const unsigned int id = (int) (int*) data;
for( ;; )
{
pthread_mutex_lock( &queue_mutex );
while( total_tasks_ == 0 ) {
if( exitFlag ) {
pthread_mutex_unlock(&queue_mutex);
pthread_exit(0);
return NULL;
}
pthread_cond_wait(&current_task, &queue_mutex );
}
pthread_mutex_unlock( &queue_mutex );
struct task *t = task_get( &queue_mutex );
if( t ) {
task_run( t, t->data, id );
free(t);
t = NULL;
}
}
}
int n_cpu = 1;
void task_free()
{
int i;
for ( i = 0; i < MAX_WORKERS; i ++ ) {
free(job_list[i]);
}
}
void task_init()
{
int i;
memset( &thr_id, 0,sizeof(thr_id));
memset( &p_threads,0,sizeof(p_threads));
memset( &p_tasks, 0,sizeof(p_tasks));
memset( job_list, 0,sizeof(pjob_t*) * MAX_WORKERS );
for( i = 0; i < MAX_WORKERS; i ++ ) {
job_list[i] = vj_malloc(sizeof(pjob_t));
memset( job_list[i], 0, sizeof(pjob_t));
}
n_cpu = sysconf( _SC_NPROCESSORS_ONLN );
if( n_cpu <= 0 )
n_cpu = 1;
numThreads = 0;
}
int task_num_cpus()
{
return n_cpu;
}
int task_start(int max_workers)
{
int i;
if( max_workers >= MAX_WORKERS ) {
veejay_msg(0, "Maximum number of threads is %d", MAX_WORKERS );
return 0;
}
exitFlag = 0;
/*int max_p = sched_get_priority_max( SCHED_FIFO );
int min_p = sched_get_priority_min( SCHED_FIFO );
max_p = (int) ( ((float) max_p) * 0.8f );
if( max_p < min_p )
max_p = min_p;
*/
struct sched_param param;
cpu_set_t cpuset;
pthread_cond_init( &tasks_completed, NULL );
pthread_cond_init( &current_task, NULL );
pthread_mutex_init( &queue_mutex , NULL);
pthread_mutex_lock( &queue_mutex );
for( i = 0 ; i < max_workers; i ++ ) {
thr_id[i] = i;
pthread_attr_init( &p_attr[i] );
// pthread_attr_setstacksize( &p_attr[i], 4096 );
// pthread_attr_setschedpolicy( &p_attr[i], SCHED_FIFO );
// pthread_attr_setschedparam( &p_attr[i], &param );
if( n_cpu > 1 ) {
CPU_ZERO(&cpuset);
CPU_SET( ((i+1) % n_cpu ), &cpuset );
if(pthread_attr_setaffinity_np( &p_attr[i], sizeof(cpuset), &cpuset ) != 0 )
veejay_msg(0,"Unable to set CPU %d affinity to thread %d", ((i+1)%n_cpu),i);
}
if( pthread_create( &p_threads[i], (void*) &p_attr[i], task_thread, i ) )
{
veejay_msg(0, "%s: error starting thread %d/%d", __FUNCTION__,i,max_workers );
memset( &p_threads[i], 0, sizeof(pthread_t) );
return -1;
}
}
numThreads = max_workers;
pthread_mutex_unlock( &queue_mutex );
return numThreads;
}
void task_stop(int max_workers)
{
int i;
pthread_mutex_lock(&queue_mutex);
exitFlag = 1;
pthread_cond_broadcast( &current_task );
pthread_mutex_unlock(&queue_mutex);
for( i = 0; i < max_workers;i++ ) {
pthread_join( p_threads[i], (void*)&p_tasks[i] );
pthread_attr_destroy( &p_attr[i] );
}
pthread_mutex_destroy( &queue_mutex );
pthread_cond_destroy( &tasks_completed );
pthread_cond_destroy( &current_task );
task_init();
}
void task_wait_all()
{
}
void performer_job( int n )
{
int i;
pthread_mutex_lock(&queue_mutex);
// taskLock = 1;
// pthread_mutex_unlock(&queue_mutex);
tasks_todo = n;
veejay_memset( tasks_done, 0, sizeof(tasks_done));
for( i = 0; i < n; i ++ ) {
pjob_t *slot = job_list[i];
task_add( i, slot->job, slot->arg );
}
//task_ready(n);
pthread_mutex_unlock( &queue_mutex );
/* for( i = 0; i < n; i ++ ) {
void *exit_code = NULL;
pjob_t *slot = &(arr[i]);
pthread_join( slot->thread, NULL );
if( exit_code != 0 ) {
}
} */
int stop = 0;
int c = 0;
while(!stop) {
pthread_mutex_lock( &queue_mutex );
int done = 0;
for( i = 0 ; i < tasks_todo; i ++ ) {
done += tasks_done[i];
}
if( done < tasks_todo ) {
pthread_cond_wait( &tasks_completed, &queue_mutex );
done = 0;
for( i = 0 ; i < tasks_todo; i ++ ) {
done += tasks_done[i];
}
}
if( done == tasks_todo )
{
stop = 1;
}
pthread_mutex_unlock(&queue_mutex);
}
}
void performer_set_job( int num, performer_job_routine job , void *arg )
{
#ifdef STRICT_CHECKING
assert( num >= 0 && num < MAX_WORKERS );
#endif
job_list[ num ]->job = job;
job_list[ num ]->arg = arg;
}
void performer_new_job( int n_jobs )
{
int i;
// for( i = 0; i < n_jobs; i ++ )
// veejay_memset( job_list[i], 0, sizeof(pjob_t) );
}

View File

@@ -30,7 +30,6 @@ typedef struct
int max_files;
} filelist_t;
typedef void *(*performer_job_routine)(void *);
#define VEEJAY_FILE_LIMIT (1048576 * 2000)
int available_diskspace(void);
@@ -63,13 +62,4 @@ void vj_get_yuv444_template(VJFrame *src, int w, int h);
int verify_working_dir();
void performer_job( int job_num );
void performer_new_job( int n_jobs );
void performer_set_job( int num, performer_job_routine job , void *arg );
int task_start(int max_workers);
void task_stop(int max_workers);
void task_init();
int task_num_cpus();
#endif

View File

@@ -41,6 +41,7 @@
#include <libsamplerec/samplerecord.h>
#include <libel/pixbuf.h>
#include <veejay/vj-misc.h>
#include <veejay/vj-task.h>
#include <liblzo/lzo.h>
#include <veejay/vj-viewport.h>
#include <veejay/vj-composite.h>