add some FIXMEs, refactor, fix av log level, fix garbage in video

This commit is contained in:
c0ntrol
2019-01-08 22:33:06 +01:00
parent 2db26e310f
commit 178c57fdf9
6 changed files with 211 additions and 175 deletions

View File

@@ -40,8 +40,8 @@ typedef struct
{
pthread_mutex_t mutex;
pthread_t thread;
int state;
int have_frame;
int state;
vj_client *v;
VJFrame *info;
VJFrame *dest; // there is no full range YUV + alpha in PIX_FMT family
@@ -50,6 +50,8 @@ typedef struct
#define STATE_INACTIVE 0
#define STATE_RUNNING 1
#define STATE_QUIT 2
#define STATE_ERROR 4
static int lock(threaded_t *t)
{
@@ -61,6 +63,69 @@ static int unlock(threaded_t *t)
return pthread_mutex_unlock( &(t->mutex ));
}
static int eval_state(threaded_t *t, vj_tag *tag)
{
int ret = 0;
lock(t);
if(t->state == STATE_ERROR || t->v == NULL) {
if(t->v == NULL) {
t->v = vj_client_alloc_stream(t->info);
}
veejay_msg(VEEJAY_MSG_INFO, " ... Waiting for network stream to become ready [%s]",tag->source_name);
int success = vj_client_connect_dat( t->v, tag->source_name,tag->video_channel );
if(success <= 0) {
t->state = STATE_ERROR;
if(t->v) {
vj_client_close(t->v);
vj_client_free(t->v);
t->v = NULL;
}
}
else {
veejay_msg(VEEJAY_MSG_INFO, "Connecton established with %s:%d",tag->source_name, tag->video_channel + 5);
t->state = STATE_RUNNING;
}
}
ret = t->state;
unlock(t);
return ret;
}
static int eval_state_mcast(threaded_t *t, vj_tag *tag) //FIXME: split this up in seperate source files, use modular structure
{
int ret = 0;
lock(t);
if(t->state == STATE_ERROR || t->v == NULL) {
if(t->v == NULL) {
t->v = vj_client_alloc_stream(t->info);
}
veejay_msg(VEEJAY_MSG_INFO, " ... Waiting for mcast stream to become ready [%s]",tag->source_name);
int success = vj_client_connect( t->v, NULL, tag->source_name, tag->video_channel );
if(success <= 0) {
t->state = STATE_ERROR;
if(t->v) {
vj_client_close(t->v);
vj_client_free(t->v);
t->v = NULL;
}
}
else {
veejay_msg(VEEJAY_MSG_INFO, "Connecton established with %s:%d",tag->source_name, tag->video_channel + 5);
t->state = STATE_RUNNING;
}
}
ret = t->state;
unlock(t);
return ret;
}
#define MS_TO_NANO(a) (a *= 1000000)
static void net_delay(long ms, long sec )
{
@@ -85,114 +150,88 @@ static void *reader_thread(void *data)
int retrieve = 0;
int success = 0;
t->v = vj_client_alloc_stream(t->info);
if(t->v == NULL) {
return NULL;
}
snprintf(buf,sizeof(buf), "%03d:%d;", VIMS_GET_FRAME, my_screen_id);
success = vj_client_connect_dat( t->v, tag->source_name,tag->video_channel );
if( success > 0 ) {
veejay_msg(VEEJAY_MSG_INFO, "Connecton established with %s:%d",tag->source_name, tag->video_channel + 5);
}
else
{
veejay_msg(0, "Unable to connect to %s: %d", tag->source_name, tag->video_channel+5);
goto NETTHREADEXIT;
}
lock(t);
t->state = STATE_RUNNING;
unlock(t);
t->state = STATE_ERROR;
for( ;; ) {
int error = 0;
int res = 0;
int ret = 0;
if( retrieve == 0 && t->have_frame == 0 ) {
ret = vj_client_send( t->v, V_CMD,(unsigned char*) buf );
if( ret <= 0 ) {
error = 1;
}
else {
retrieve = 1;
}
}
if(!error && retrieve == 1 ) {
res = vj_client_poll(t->v, V_CMD );
if( res ) {
if(vj_client_link_can_read( t->v, V_CMD ) ) {
retrieve = 2;
}
}
else if ( res < 0 ) {
error = 1;
} else if ( res == 0 ) {
int state_eval = eval_state(t, tag);
switch(state_eval) {
case STATE_INACTIVE:
net_delay(10,0);
continue;
}
}
if(!error && retrieve == 2) {
int frame_len = vj_client_read_frame_hdr( t->v );
if( frame_len <= 0 ) {
error = 1;
}
if( error == 0 ) {
lock(t);
ret = vj_client_read_frame_data( t->v, frame_len );
unlock(t);
}
if(ret) {
t->have_frame = 1;
retrieve = 0;
}
success = 0;
continue;
break;
case STATE_RUNNING:
if( retrieve == 0 ) {
ret = vj_client_send( t->v, V_CMD,(unsigned char*) buf );
if( ret <= 0 ) {
error = 1;
}
else {
retrieve = 1; /* queried a frame, try to fetch it */
}
}
if(!error && retrieve == 1 ) {
res = vj_client_poll(t->v, V_CMD );
if( res ) {
if(vj_client_link_can_read( t->v, V_CMD ) ) {
retrieve = 2; /* data waiting at socket */
}
}
else if ( res < 0 ) {
error = 1;
} else if ( res == 0 ) {
net_delay(10,0);
continue;
}
}
if(!error && retrieve == 2) {
int frame_len = vj_client_read_frame_hdr( t->v );
if( frame_len <= 0 ) {
error = 1;
}
if( ret <= 0 ) {
veejay_msg(VEEJAY_MSG_DEBUG,"Error reading video frame from %s:%d",tag->source_name,tag->video_channel );
error = 1;
}
}
NETTHREADRETRY:
if( error == 0 ) {
lock(t);
ret = vj_client_read_frame_data( t->v, frame_len );
if(ret) {
t->have_frame = 1;
}
unlock(t);
}
if( error )
{
vj_client_close(t->v);
if(ret) {
retrieve = 0;
}
veejay_msg(VEEJAY_MSG_INFO, " ZZzzzzz ... waiting for Link %s:%d to become ready", tag->source_name, tag->video_channel );
net_delay( 0, 5 );
if( ret <= 0 ) {
veejay_msg(VEEJAY_MSG_DEBUG,"Error reading video frame from %s:%d",tag->source_name,tag->video_channel );
error = 1;
}
}
if( error ) {
lock(t);
t->state = STATE_ERROR;
t->have_frame = 0;
unlock(t);
retrieve = 0;
success = vj_client_connect_dat( t->v, tag->source_name,tag->video_channel );
if( t->state == 0 )
{
veejay_msg(VEEJAY_MSG_INFO, "Network thread with %s: %d was told to exit",tag->source_name,tag->video_channel+5);
goto NETTHREADEXIT;
}
}
if( success <= 0 )
{
goto NETTHREADRETRY;
}
else
{
veejay_msg(VEEJAY_MSG_INFO, "Connecton re-established with %s:%d",tag->source_name,tag->video_channel + 5);
}
retrieve = 0;
break;
case STATE_QUIT:
{
goto NETTHREADEXIT;
}
}
if( t->state == 0 )
{
veejay_msg(VEEJAY_MSG_INFO, "Network thread with %s: %d was told to exit",tag->source_name,tag->video_channel+5);
goto NETTHREADEXIT;
}
}
NETTHREADEXIT:
@@ -205,6 +244,8 @@ NETTHREADEXIT:
veejay_msg(VEEJAY_MSG_INFO, "Network thread with %s: %d has exited",tag->source_name,tag->video_channel+5);
pthread_exit(NULL);
return NULL;
}
@@ -214,72 +255,35 @@ static void *mcast_reader_thread(void *data)
vj_tag *tag = (vj_tag*) data;
threaded_t *t = tag->priv;
int success = 0;
t->v = vj_client_alloc_stream(t->info);
if(t->v == NULL) {
return NULL;
}
success = vj_client_connect( t->v, NULL, tag->source_name, tag->video_channel );
if( success > 0 ) {
veejay_msg(VEEJAY_MSG_INFO, "Multicast connecton established with %s:%d",tag->source_name, tag->video_channel + 5);
}
else
{
veejay_msg(0, "Unable to connect to %s: %d", tag->source_name, tag->video_channel+5);
goto NETTHREADEXIT;
}
lock(t);
t->state = STATE_RUNNING;
unlock(t);
const int padded = 256;
int max_len = padded + RUP8( 1920 * 1080 * 3 );
for( ;; ) {
int error = 0;
if( vj_client_read_mcast_data( t->v, max_len ) < 0 ) {
error = 1;
}
if(error == 0) {
t->have_frame = 1;
}
int state_eval = eval_state_mcast(t, tag);
NETTHREADRETRY:
switch(state_eval) {
case STATE_INACTIVE:
net_delay(10,0);
continue;
break;
case STATE_RUNNING:
lock(t);
if( vj_client_read_mcast_data( t->v, max_len ) < 0 ) {
error = 1;
}
t->have_frame = 1;
if( error ) {
t->state = STATE_ERROR;
t->have_frame = 0;
}
unlock(t);
if( error )
{
vj_client_close(t->v);
veejay_msg(VEEJAY_MSG_INFO, " ZZzzzzz ... waiting for multicast server %s:%d to become ready", tag->source_name, tag->video_channel );
net_delay( 0, 5 );
success = vj_client_connect( t->v,NULL,tag->source_name,tag->video_channel );
if( t->state == 0 )
{
veejay_msg(VEEJAY_MSG_INFO, "Multicast receiver %s: %d was told to stop",tag->source_name,tag->video_channel+5);
break;
case STATE_QUIT:
goto NETTHREADEXIT;
}
if( success <= 0 )
{
goto NETTHREADRETRY;
}
else
{
veejay_msg(VEEJAY_MSG_INFO, "Multicast receiver has re-established with %s:%d",tag->source_name,tag->video_channel + 5);
}
}
if( t->state == 0 )
{
veejay_msg(VEEJAY_MSG_INFO, "Multicast receiver %s: %d was told to stop",tag->source_name,tag->video_channel+5);
goto NETTHREADEXIT;
break;
}
}
@@ -306,16 +310,12 @@ int net_thread_get_frame( vj_tag *tag, VJFrame *dst )
{
threaded_t *t = (threaded_t*) tag->priv;
int state = 0;
/* frame ready ? */
lock(t);
state = t->state;
if( state == 0 || t->have_frame == 0 ) {
if(!(t->state == STATE_RUNNING) || t->have_frame == 0) {
unlock(t);
return 1; // not active or no frame
}
return 1;
}
VJFrame *src = avhelper_get_decoded_video(t->v->decoder);
if(t->scaler == NULL) {
@@ -326,9 +326,6 @@ int net_thread_get_frame( vj_tag *tag, VJFrame *dst )
}
yuv_convert_and_scale( t->scaler, src,dst );
t->have_frame = 0; // frame is consumed
unlock(t);
return 1;
@@ -346,7 +343,6 @@ int net_thread_start(vj_tag *tag, VJFrame *info)
pthread_mutex_init( &(t->mutex), NULL );
t->have_frame = 0;
t->info = info;
@@ -358,7 +354,8 @@ int net_thread_start(vj_tag *tag, VJFrame *info)
}
if( p_err ==0) {
veejay_msg(VEEJAY_MSG_INFO, "Created new %s threaded stream to veejay host %s port %d",
veejay_msg(VEEJAY_MSG_INFO, "Created new input stream [%d] (%s) to veejay host %s port %d",
tag->id,
tag->source_type == VJ_TAG_TYPE_MCAST ?
"multicast" : "unicast", tag->source_name,tag->video_channel);
}