add net io logger (only enabled if compiled STRICT_CHECKING and env var VEEJAY_LOG_NET_IO="on" )

fixed corrupted header in tag type network for unicast/mcast
fixed segfault on listing v4l2 devices when there is none
This commit is contained in:
niels
2011-08-03 19:49:22 +02:00
parent cf1c63e170
commit 8f43336b80
9 changed files with 230 additions and 105 deletions

View File

@@ -1777,8 +1777,8 @@ char **v4l2_get_device_list()
const char prefix[] = "/sys/class/video4linux/";
const char v4lprefix[] = "/dev/";
if( (dir = opendir( prefix )) == NULL ) {
veejay_msg(VEEJAY_MSG_WARNING,"Failed to open '%s':%d, %s", prefix, errno,strerror(errno));
return v4l2_dummy_list();
veejay_msg(VEEJAY_MSG_WARNING,"Failed to open '%s':%s", prefix, strerror(errno));
return NULL;
}
char *list[255];

View File

@@ -248,7 +248,7 @@ char *vj_tag_scan_devices( void )
char **device_list = v4l2_get_device_list();
#endif
#endif
if(!device_list)
if(device_list==NULL)
return strdup(default_str);
for( i = 0; device_list[i] != NULL ;i++ )

View File

@@ -282,7 +282,7 @@ int sock_t_send_fd( int fd, int send_size, unsigned char *buf, int len )
}
if( n == 0 ) {
veejay_msg(VEEJAY_MSG_DEBUG, "Remote closed connection.");
return -1;
return 0;
}
buf += n;
length -= n;

View File

@@ -295,7 +295,7 @@ static uint32_t getint(uint8_t *in, int len ) {
int vj_client_read_i( vj_client *v, uint8_t *dst, int len )
{
uint8_t line[48];
uint8_t line[128];
uint32_t p[4] = {0, 0,0,0 };
uint32_t strides[4] = { 0,0,0,0 };
int n = 0;
@@ -309,10 +309,7 @@ int vj_client_read_i( vj_client *v, uint8_t *dst, int len )
uint8_t *in = mcast_recv_frame( v->c[0]->r, &p[0],&p[1], &p[2], &plen );
if( in == NULL )
return 0;
#ifdef STRICT_CHECKING
assert( p[0] > 0 );
assert( p[1] > 0 );
#endif
v->in_width = p[0];
v->in_height = p[1];
v->in_fmt = p[2];
@@ -321,29 +318,33 @@ int vj_client_read_i( vj_client *v, uint8_t *dst, int len )
y_len = p[0] * p[1];
switch(v->in_fmt )
{
case FMT_420F:
case FMT_422:
case FMT_422F:
uv_len = y_len / 2;
break;
case FMT_420:
case FMT_420F:
uv_len = y_len / 4;break;
default:
uv_len = y_len/2;break;
veejay_msg(VEEJAY_MSG_ERROR, "Unknown data format: %02x", v->in_fmt);
break;
}
if( p[0] != v->cur_width || p[1] != v->cur_height || p[2] != v->cur_fmt )
return 2;
strides[0] = getint( in + 9 + 8, 8 );
strides[1] = getint( in + 9 + 16, 8 );
strides[2] = getint( in + 9 + 24, 8 );
//vj_client_decompress( v,in, dst,plen,y_len,uv_len ,16);
strides[0] = getint( in + 12 + 8, 8 );
strides[1] = getint( in + 12 + 16, 8 );
strides[2] = getint( in + 12 + 24, 8 );
vj_client_decompress( v,in, dst, p[3], y_len, uv_len , plen, strides[0],strides[1],strides[2]);
return 1;
} else if ( v->c[0]->type == VSOCK_C )
}
else if ( v->c[0]->type == VSOCK_C )
{
veejay_memset( line,0, sizeof(line));
plen = sock_t_recv( v->c[0]->fd, line, 41 );
plen = sock_t_recv( v->c[0]->fd, line, 44 );
if( plen == 0 ) {
veejay_msg(VEEJAY_MSG_DEBUG, "Remote closed connection.");
@@ -357,17 +358,15 @@ int vj_client_read_i( vj_client *v, uint8_t *dst, int len )
}
//vj_client_stdout_dump_recv( v->c[0]->fd, plen, line );
#ifdef STRICT_CHECKING
assert( plen == 41 );
#endif
p[0] = getint( line , 4 );
p[1] = getint( line + 4, 4 );
p[2] = getint( line + 8, 1 );
p[3] = getint( line + 9, 8 );
p[2] = getint( line + 8, 4 );
p[3] = getint( line + 12, 8 );
strides[0] = getint( line + 9 + 8, 8 );
strides[1] = getint( line + 9 + 16, 8 );
strides[2] = getint( line + 9 + 24, 8 );
strides[0] = getint( line + 12 + 8, 8 );
strides[1] = getint( line + 12 + 16, 8 );
strides[2] = getint( line + 12 + 24, 8 );
if( v->cur_width != p[0] || v->cur_height != p[1] || v->cur_fmt != p[2]) {
veejay_msg(VEEJAY_MSG_ERROR, "Unexpected video frame format, %dx%d (%d) , received %dx%d(%d)",
@@ -382,26 +381,31 @@ int vj_client_read_i( vj_client *v, uint8_t *dst, int len )
y_len = p[0] * p[1];
switch(v->in_fmt )
{
case FMT_420F:
case FMT_422:
case FMT_422F:
uv_len = y_len / 2;
break;
case FMT_420:
case FMT_420F:
uv_len = y_len / 4;break;
default:
uv_len = y_len/2;break;
veejay_msg(VEEJAY_MSG_ERROR, "Unknown data format: %02x", v->in_fmt);
break;
}
int n = sock_t_recv( v->c[0]->fd,v->space,p[3] );
if( n <= 0 ) {
veejay_msg(VEEJAY_MSG_DEBUG,"Remote closed connection.");
if( n == -1 ) {
veejay_msg(VEEJAY_MSG_ERROR, "Error '%s' while reading socket", strerror(errno));
} else {
veejay_msg(VEEJAY_MSG_DEBUG,"Remote closed connection");
}
return -1;
}
if( n != p[3] )
if( n != p[3] && n > 0 )
{
if( n < 0 ) {
veejay_msg(VEEJAY_MSG_ERROR, "Network I/O Error: %s", strerror(errno));
} else {
veejay_msg(VEEJAY_MSG_ERROR, "Broken video packet , got %d out of %d bytes",
n, p[3] );
}
veejay_msg(VEEJAY_MSG_ERROR, "Broken video packet , got %d out of %d bytes",n, p[3] );
return -1;
}

View File

@@ -65,7 +65,27 @@ typedef struct
#define VJ_MAX_PENDING_MSG 128
#define RECV_SIZE (32000)
#ifdef STRICT_CHECKING
static void printbuf( FILE *f, uint8_t *buf , int len )
{
int i;
for( i = 0; i < len ; i ++ ) {
if( (i%32) == 0 ) {
fprintf(f, "\n");
}
fprintf(f, "%02x ", buf[i]);
}
fprintf(f, "\ntext:\n");
for( i = 0; i < len ; i ++ ) {
if( (i%32) == 0 ) {
fprintf(f, "\n");
}
fprintf(f, "%c ", buf[i]);
}
fprintf(f, "\n");
}
#endif
int _vj_server_free_slot(vj_server *vje);
int _vj_server_new_client(vj_server *vje, int socket_fd);
int _vj_server_parse_msg(vj_server *vje,int link_id, char *buf, int buf_len, int priority );
@@ -186,11 +206,13 @@ static int _vj_server_classic(vj_server *vjs, int port_offset)
veejay_msg(VEEJAY_MSG_ERROR, "Unable to create a socket: %s", strerror(errno));
return 0;
}
if (setsockopt( vjs->handle, SOL_SOCKET, SO_REUSEADDR, (const char*) &on, sizeof(on) )== -1)
{
veejay_msg(VEEJAY_MSG_ERROR, "%s", strerror(errno));
return 0;
}
vjs->myself.sin_family = AF_INET;
vjs->myself.sin_addr.s_addr = INADDR_ANY;
@@ -212,6 +234,11 @@ static int _vj_server_classic(vj_server *vjs, int port_offset)
return 0;
}
#ifdef STRICT_CHECKING
if(vjs->logfd) {
fprintf( vjs->logfd, "selected port %d, maximum connections is %d", port_num, VJ_MAX_CONNECTIONS );
}
#endif
int send_size = 1024 * 1024;
if( setsockopt( vjs->handle, SOL_SOCKET, SO_SNDBUF, (const char*) &send_size, sizeof(send_size) ) == - 1)
@@ -224,6 +251,12 @@ static int _vj_server_classic(vj_server *vjs, int port_offset)
veejay_msg(0, "Cannot read socket buffer size: %s", strerror(errno));
return 0;
}
#ifdef STRICT_CHECKING
if(vjs->logfd) {
fprintf( vjs->logfd, "socket send buffer size is %d bytes", vjs->send_size );
}
#endif
if( setsockopt( vjs->handle, SOL_SOCKET, SO_RCVBUF, (const char*) &send_size, sizeof(send_size)) == 1 )
{
veejay_msg(0, "Cannot set recv buffer sze:%s", strerror(errno));
@@ -234,6 +267,11 @@ static int _vj_server_classic(vj_server *vjs, int port_offset)
veejay_msg(0, "Cannot read socket buffer receive size %s" , strerror(errno));
return 0;
}
#ifdef STRICT_CHECKING
if(vjs->logfd) {
fprintf( vjs->logfd, "socket recv buffer size is %d bytes", vjs->recv_size );
}
#endif
int flag = 1;
if( setsockopt( vjs->handle, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(int)) == -1 )
@@ -269,6 +307,11 @@ static int _vj_server_classic(vj_server *vjs, int port_offset)
}
vjs->link = (void**) link;
vjs->nr_of_connections = vjs->handle;
#ifdef STRICT_CHECKING
if(vjs->logfd) {
fprintf( vjs->logfd, "allocated queue for max %d connctions", VJ_MAX_CONNECTIONS );
}
#endif
switch(vjs->server_type )
{
@@ -305,6 +348,25 @@ vj_server *vj_server_alloc(int port_offset, char *mcast_group_name, int type)
vjs->server_type = type;
char *netlog = getenv("VEEJAY_LOG_NET_IO" );
#ifdef STRICT_CHECKING
if( netlog != NULL && strncasecmp("ON",netlog, 2) == 0 ) {
char logpath[1024];
sprintf( logpath, "%s.%d", VEEJAY_SERVER_LOG, port_offset );
vjs->logfd = fopen( logpath, "w" );
if(!vjs->logfd) {
veejay_msg(VEEJAY_MSG_WARNING, "Unable to open %s for logging Network I/O\n", VEEJAY_SERVER_LOG );
vjs->logfd=0;
}
else {
fprintf( vjs->logfd, "Server setup: port %d, name %s type %d\n", port_offset,mcast_group_name,type);
fprintf( vjs->logfd, "receive buffer size: %d bytes\n", RECV_SIZE);
}
}
#endif
/* setup peer to peer socket */
if( mcast_group_name == NULL )
{
@@ -338,18 +400,31 @@ int vj_server_send( vj_server *vje, int link_id, uint8_t *buf, int len )
#endif
vj_link **Link = (vj_link**) vje->link;
if( !Link[link_id]->in_use )
if( !Link[link_id]->in_use ) {
return 0;
}
if( !vj_server_link_can_write( vje,link_id ) ) {
veejay_msg(0,"Not ready for sending.");
#ifdef STRICT_CHECKING
if( vje->logfd ) {
fprintf(vje->logfd, "failed to send buf of len %d to link_id %d, not ready for writing!\n", len, link_id );
printbuf(vje->logfd,buf,len);
}
#endif
return -1;
}
if( !vje->use_mcast)
{
total = sock_t_send_fd( Link[link_id]->handle, vje->send_size, buf, len);
#ifdef STRICT_CHECKING
if( vje->logfd ) {
fprintf(vje->logfd, "sent %d of %d bytes to handle %d (link %d) %s\n", total,len, Link[link_id]->handle,link_id,(char*)(inet_ntoa(vje->remote.sin_addr)) );
printbuf( vje->logfd, buf, len );
}
#endif
if( total <= 0 )
{
veejay_msg(0,"Unable to send buffer to %s:%s ",
@@ -362,9 +437,10 @@ int vj_server_send( vj_server *vje, int link_id, uint8_t *buf, int len )
else
{
vj_proto **proto = (vj_proto**) vje->protocol;
if( vje->server_type == V_CMD )
if( vje->server_type == V_CMD ) {
return mcast_send( proto[0]->s, buf, bytes_left, vje->ports[0] );
}
}
return total;
}
@@ -412,6 +488,13 @@ static int vj_server_send_frame_now( vj_server *vje, int link_id, uint8_t *buf,
if( total != len )
veejay_msg(VEEJAY_MSG_ERROR, "Only sent %d out of %d bytes", total,len);
#endif
#ifdef STRICT_CHECKING
if( vje->logfd ) {
fprintf(vje->logfd, "sent frame %d of %d bytes to handle %d (link %d) %s\n", total,len, Link[link_id]->handle,link_id,(char*)(inet_ntoa(vje->remote.sin_addr)) );
// printbuf( vje->logfd, buf, len );
}
#endif
if( total <= 0 )
{
veejay_msg(0,"Unable to send buffer to %s: %s",
@@ -471,6 +554,11 @@ int _vj_server_new_client(vj_server *vje, int socket_fd)
FD_SET( socket_fd, &(vje->fds) );
FD_SET( socket_fd, &(vje->wds) );
#ifdef STRICT_CHECKING
if( vje->logfd ) {
fprintf(vje->logfd, "new socket %d (link %d)\n", socket_fd,entry );
}
#endif
return entry;
}
@@ -488,6 +576,12 @@ int _vj_server_del_client(vj_server * vje, int link_id)
FD_CLR( Link[link_id]->handle, &(vje->fds) );
FD_CLR( Link[link_id]->handle, &(vje->wds) );
#ifdef STRICT_CHECKING
if( vje->logfd ) {
fprintf(vje->logfd, "closing link %d\n",link_id );
}
#endif
}
Link[link_id]->handle = 0;
Link[link_id]->promote = 0;
@@ -516,9 +610,11 @@ void vj_server_client_promote( vj_server *vje, int link_id)
#endif
Link[link_id]->promote = 1;
#ifdef STRICT_CHECKING
veejay_msg(VEEJAY_MSG_DEBUG, "Knock knock %x",
Link[link_id]->handle );
if( vje->logfd ) {
fprintf(vje->logfd, "promote link %d\n", link_id );
}
#endif
}
@@ -864,6 +960,13 @@ int vj_server_new_connection(vj_server *vje)
close(fd);
return 0;
}
#ifdef STRICT_CHECKING
if( vje->logfd ) {
fprintf(vje->logfd, "new connection, socket=%d, max connections=%d\n",
fd, vje->nr_of_connections );
}
#endif
return 1;
}
return 0;
@@ -874,11 +977,12 @@ int vj_server_update( vj_server *vje, int id )
{
int sock_fd = vje->handle;
int n = 0;
vj_link **Link = (vj_link**) vje->link;
_vj_server_empty_queue(vje, id);
if(!vje->use_mcast)
{
vj_link **Link = (vj_link**) vje->link;
sock_fd = Link[id]->handle;
if(!FD_ISSET( sock_fd, &(vje->fds)) )
@@ -888,6 +992,14 @@ int vj_server_update( vj_server *vje, int id )
int bytes_received = 0;
if(!vje->use_mcast) {
bytes_received = recv( sock_fd, vje->recv_buf, RECV_SIZE, 0 );
#ifdef STRICT_CHECKING
if( vje->logfd ) {
fprintf(vje->logfd, "received %d of %d bytes from handle %d (link %d)\n",
bytes_received,RECV_SIZE, Link[id]->handle,id );
printbuf( vje->logfd, vje->recv_buf, bytes_received );
}
#endif
}
else
{
@@ -914,6 +1026,13 @@ int vj_server_update( vj_server *vje, int id )
if(n_msg == 0 && bytes_received > 0)
{
veejay_msg(VEEJAY_MSG_ERROR, "Invalid instruction '%s'", msg_buf );
#ifdef STRICT_CHECKING
if( vje->logfd ) {
fprintf(vje->logfd, "no valid messages in buffer!\n" );
printbuf( vje->logfd, msg_buf, bytes_left );
}
#endif
return -1; //@ close client now
}
@@ -940,6 +1059,12 @@ void vj_server_shutdown(vj_server *vje)
vj_link **Link = (vj_link**) vje->link;
int k = VJ_MAX_CONNECTIONS;
#ifdef STRICT_CHECKING
if( vje->logfd ) {
fclose( vje->logfd );
}
#endif
if(vje->use_mcast) k = 1;
for(i=0; i < k; i++)

View File

@@ -25,6 +25,8 @@
#define VJ_PORT 3490
#define VJ_MAX_CONNECTIONS 16
#define VEEJAY_SERVER_LOG "/tmp/veejay.net.log"
typedef struct vj_server_t {
struct sockaddr_in myself;
struct sockaddr_in remote;
@@ -42,6 +44,7 @@ typedef struct vj_server_t {
int send_size;
int recv_size;
int mcast_gray;
FILE *logfd;
} vj_server;
vj_server *vj_server_alloc(int port, char *mcast_group_name, int type);

View File

@@ -1524,10 +1524,10 @@ static void veejay_handle_callbacks(veejay_t *info) {
int i;
for( i = 0; i < VJ_MAX_CONNECTIONS ; i ++ ) {
int res = vj_server_send( info->vjs[VEEJAY_PORT_STA], i, info->status_line, status_line_len);
if( res < 0 ) {
/* if( res < 0 ) {
_vj_server_del_client( info->vjs[VEEJAY_PORT_CMD], i );
_vj_server_del_client( info->vjs[VEEJAY_PORT_STA], i );
}
}*/
}
}

View File

@@ -1711,7 +1711,8 @@ void vj_event_update_remote(void *ptr)
n++;
}
}
if( res == -1 )
if( res <= 0 )
// if( res == -1 )
{
_vj_server_del_client( v->vjs[VEEJAY_PORT_CMD], i );
_vj_server_del_client( v->vjs[VEEJAY_PORT_STA], i );
@@ -1726,10 +1727,7 @@ void vj_event_update_remote(void *ptr)
{
if( vj_server_link_can_read( v->vjs[VEEJAY_PORT_DAT], i ) )
{
int res = 1;
while( res != 0 )
{
res = vj_server_update( v->vjs[VEEJAY_PORT_DAT], i );
int res = vj_server_update( v->vjs[VEEJAY_PORT_DAT], i );
if(res>0)
{
v->uc->current_link = i;
@@ -1742,7 +1740,8 @@ void vj_event_update_remote(void *ptr)
n++;
}
}
if( res == -1 )
if( res <= 0 )
//if( res == -1 )
{
_vj_server_del_client( v->vjs[VEEJAY_PORT_DAT], i );
_vj_server_del_client( v->vjs[VEEJAY_PORT_CMD], i );
@@ -1750,7 +1749,6 @@ void vj_event_update_remote(void *ptr)
}
}
}
}
//@ repeat macros
if(macro_status_ == 2 && macro_port_ != NULL)

View File

@@ -108,6 +108,7 @@ static uint8_t *audio_render_buffer = NULL;
static uint8_t *down_sample_buffer = NULL;
static uint8_t *temp_buffer[4];
static uint8_t *feedback_buffer[4];
static uint8_t *socket_buffer = NULL;
static ycbcr_frame *record_buffer = NULL; // needed for recording invisible streams
static VJFrame *helper_frame = NULL;
static int vj_perform_record_buffer_init();
@@ -661,6 +662,11 @@ int vj_perform_init(veejay_t * info)
frame_buffer[c] = (ycbcr_frame *) vj_calloc(sizeof(ycbcr_frame));
if(!frame_buffer[c]) return 0;
}
/* pre allocate space for streaming */
socket_buffer = (uint8_t*) vj_malloc(sizeof(uint8_t) * RUP8(frame_len+16));
// clear the cache information
vj_perform_clear_cache();
veejay_memset( frame_info[0],0,SAMPLE_MAX_EFFECTS);
@@ -809,6 +815,9 @@ void vj_perform_free(veejay_t * info)
sample_record_free();
if( socket_buffer )
free( socket_buffer );
if(info->edit_list->has_audio)
vj_perform_close_audio();
@@ -1106,59 +1115,47 @@ static int vj_perform_compress_frame( veejay_t *info, uint8_t *dst, uint32_t *p1
int vj_perform_send_primary_frame_s2(veejay_t *info, int mcast, int to_link_id)
{
int hlen =0;
unsigned char info_line[48];
uint8_t *socket_buffer = (uint8_t*) vj_malloc(sizeof(uint8_t) * info->effect_frame1->width *
info->effect_frame1->height * 3 );
unsigned char info_line[128];
int compr_len = 0;
uint32_t planes[3];
const int data_len = 44;
memset(info_line, 0, sizeof(info_line) );
if( !mcast )
{
uint8_t *sbuf = socket_buffer + (sizeof(uint8_t) * 41 );
uint8_t *sbuf = socket_buffer + (sizeof(uint8_t) * data_len );
compr_len = vj_perform_compress_frame(info,sbuf, &planes[0], &planes[1], &planes[2]);
#ifdef STRICT_CHECKING
assert( compr_len > 0 );
#endif
/* peer to peer connection */
sprintf(info_line, "%04d%04d%1d%08d%08d%08d%08d", info->effect_frame1->width,
info->effect_frame1->height, info->effect_frame1->format,
compr_len,
planes[0],
planes[1],
planes[2] );
hlen = strlen(info_line );
#ifdef STRICT_CHECKING
assert( hlen == 41 );
#endif
veejay_memcpy( socket_buffer, info_line, sizeof(uint8_t) * hlen );
snprintf(info_line,
data_len + 1, //@ '\0' counts
"%04d%04d%04d%08d%08d%08d%08d", info->effect_frame1->width,
info->effect_frame1->height, info->pixel_format,
compr_len,planes[0],planes[1],planes[2] );
veejay_memcpy( socket_buffer, info_line, sizeof(uint8_t) * data_len );
}
else
{
uint8_t *sbuf = socket_buffer + (sizeof(uint8_t) * 41 );
uint8_t *sbuf = socket_buffer + (sizeof(uint8_t) * 43 );
compr_len = vj_perform_compress_frame(info,sbuf, &planes[0], &planes[1], &planes[2] );
#ifdef STRICT_CHECKING
assert(compr_len > 0 );
#endif
sprintf(info_line, "%04d%04d%1d%08d%08d%08d%08d", info->effect_frame1->width,
info->effect_frame1->height, info->effect_frame1->format,
compr_len,
planes[0],
planes[1],
planes[2] );
hlen = strlen(info_line );
#ifdef STRICT_CHECKING
assert( hlen == 41 );
#endif
veejay_memcpy( socket_buffer, info_line, sizeof(uint8_t) * hlen );
snprintf(info_line,
data_len + 1,
"%04d%04d%04d%08d%08d%08d%08d", info->effect_frame1->width,
info->effect_frame1->height, info->pixel_format,
compr_len,planes[0],planes[1],planes[2] );
veejay_memcpy( socket_buffer, info_line, sizeof(uint8_t) * data_len );
}
int id = (mcast ? 2: 3);
int __socket_len = hlen + compr_len;
int __socket_len = data_len + compr_len;
if(!mcast)
{
@@ -1186,8 +1183,6 @@ int vj_perform_send_primary_frame_s2(veejay_t *info, int mcast, int to_link_id)
}
free(socket_buffer);
return 1;
}