From 2d4fe59a66f8ef7992fdda10bf6bb0fd5bc8db7f Mon Sep 17 00:00:00 2001 From: Niels Elburg Date: Thu, 13 Jan 2005 12:13:11 +0000 Subject: [PATCH] refactorized git-svn-id: svn://code.dyne.org/veejay/trunk@125 eb8d1916-c9e9-0310-b8de-cf0c9472ead5 --- veejay-current/veejay/vj-server.c | 726 ++++++++++++------------------ 1 file changed, 282 insertions(+), 444 deletions(-) diff --git a/veejay-current/veejay/vj-server.c b/veejay-current/veejay/vj-server.c index e9c3d179..2737b74c 100644 --- a/veejay-current/veejay/vj-server.c +++ b/veejay-current/veejay/vj-server.c @@ -1,5 +1,5 @@ /* veejay - Linux VeeJay - * (C) 2002-2004 Niels Elburg + * (C) 2002-2005 Niels Elburg * * * This program is free software; you can redistribute it and/or modify @@ -16,6 +16,7 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + #include #include #include @@ -26,545 +27,382 @@ #include #include #include +#include #include #include #include #include "vj-global.h" -/* todo : better server implementation - V0004DxxxxxxSUMV - OSC - -*/ - -typedef struct { - char header[1]; - int data_len; - char data[1]; - char message[MESSAGE_SIZE]; +typedef struct +{ + char *msg; + int len; } vj_message; typedef struct { - int handle; - int in_use; - vj_message **m_queue; + int handle; + int in_use; + vj_message **m_queue; + int n_queued; + int n_retrieved; } vj_link; -typedef struct { - int handle; - int in_use; -} vj_slink; - #define VJ_MAX_PENDING_MSG 64 -static vj_link **Link; -static vj_slink **StatusLink; +#define RECV_SIZE (1024*256) -static int m_queue_write = 0; -static int m_queue_read = 0; - -int vj_server_is_connected(int link_id); -void vj_server_list_clients(void); - -int _vj_server_free_slot(int type); -int _vj_server_new_client(int socket_fd); +int _vj_server_free_slot(vj_server *vje); +int _vj_server_new_client(vj_server *vje, int socket_fd); int _vj_server_del_client(vj_server *vje, int link_id); +int _vj_server_parse_msg(vj_server *vje,int link_id, char *buf, int buf_len ); -int vj_server_is_connected(int link_id) { - return (Link[link_id]->in_use); -} +static char *recv_buffer = NULL; -void vj_server_list_clients() +int vj_server_init() { - int i; - for (i = 0; i < VJ_MAX_CONNECTIONS; i++) { - if (Link[i]->in_use == 1) { - if(StatusLink[i]->in_use==1) { - veejay_msg(VEEJAY_MSG_WARNING, "Link %d in use, processes status information.",i); - } - else { - veejay_msg(VEEJAY_MSG_WARNING, "Link %d in use, no status processing.",i); - } - } else { - veejay_msg(VEEJAY_MSG_DEBUG, "Link %d is available", i); - } - } + // 1 mb buffer + recv_buffer = (char*) malloc(sizeof(char) * RECV_SIZE); + if(!recv_buffer) + return 0; + bzero( recv_buffer, RECV_SIZE ); + return 1; } -vj_server *vj_server_alloc(int port, int type) +vj_server *vj_server_alloc(int port) { int i; int on = 1; - int flags; + vj_link **link; vj_server *vjs = (vj_server *) vj_malloc(sizeof(struct vj_server_t)); if (!vjs) - return NULL; + return NULL; - FD_ZERO(&(vjs->master)); - FD_ZERO(&(vjs->current)); - - /* listener */ - if ((vjs->handle = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { - veejay_msg(VEEJAY_MSG_ERROR, "Failed to create a socket"); - if(vjs) free(vjs); - return NULL; + if ((vjs->handle = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) + { + veejay_msg(VEEJAY_MSG_ERROR, "Failed to create a socket"); + free(vjs); + return NULL; } - fcntl(vjs->handle, F_GETFL, &flags); - //flags |= O_NONBLOCK; - //fcntl(vjs->handle, F_SETFL, &flags); - - if ((setsockopt - (vjs->handle, SOL_SOCKET, SO_REUSEADDR, (const char *) &on, - sizeof(on))) == -1) { - veejay_msg(VEEJAY_MSG_ERROR, - "Cannot turn off bind addres checking"); + if (setsockopt( vjs->handle, SOL_SOCKET, SO_REUSEADDR, (const char*) &on, sizeof(on) )== -1) + { + veejay_msg(VEEJAY_MSG_ERROR, + "Cannot turn off bind addres checking"); } vjs->myself.sin_family = AF_INET; vjs->myself.sin_addr.s_addr = INADDR_ANY; vjs->myself.sin_port = htons(port); + memset(&(vjs->myself.sin_zero), '\0', 8); - if (bind - (vjs->handle, (struct sockaddr *) &(vjs->myself), - sizeof(vjs->myself)) == -1) { - veejay_msg(VEEJAY_MSG_ERROR, "Bind error - Port %d in use ?", port); - return NULL; + if (bind(vjs->handle, (struct sockaddr *) &(vjs->myself), sizeof(vjs->myself) ) == -1 ) + { + veejay_msg(VEEJAY_MSG_ERROR, "Bind error - Port %d in use ?", port); + return NULL; } - if (listen(vjs->handle, VJ_MAX_CONNECTIONS) == -1) { - veejay_msg(VEEJAY_MSG_ERROR, "Listen error."); - return NULL; + if (listen(vjs->handle, VJ_MAX_CONNECTIONS) == -1) + { + veejay_msg(VEEJAY_MSG_ERROR, "Listen error."); + return NULL; } - FD_SET(vjs->handle, &(vjs->master)); - vjs->nr_of_connections = vjs->handle; - vjs->nr_of_links = 0; - - if (type == 0) { - Link = (vj_link **) malloc(sizeof(vj_link *) * (VJ_MAX_CONNECTIONS+1)); - - for (i = 0; i <= VJ_MAX_CONNECTIONS; i++) { - int j; - Link[i] = (vj_link *) malloc(sizeof(vj_link)); - Link[i]->in_use = 0; - Link[i]->m_queue = - (vj_message **) malloc(sizeof(vj_message *) * - (VJ_MAX_PENDING_MSG + 1)); - if(!Link[i]->m_queue) return NULL; - for (j = 0; j < (VJ_MAX_PENDING_MSG + 1); j++) { - Link[i]->m_queue[j] = - (vj_message *) malloc(sizeof(vj_message)); - if(!Link[i]->m_queue[j]) return NULL; - Link[i]->m_queue[j]->data_len = 0; - bzero(Link[i]->m_queue[j]->message,0); - Link[i]->m_queue[j]->data[0] = 0; - Link[i]->m_queue[j]->header[0] = 0; - } + link = (vj_link **) vj_malloc(sizeof(vj_link *) * VJ_MAX_CONNECTIONS); + if(!link) + { + veejay_msg(VEEJAY_MSG_ERROR, "Out of memory"); + free(vjs); + return NULL; } - veejay_msg(VEEJAY_MSG_INFO, "Command socket ready at port %d", port); - } - if (type == 1) { - StatusLink = - (vj_slink **) malloc(sizeof(vj_slink *) * (VJ_MAX_CONNECTIONS+1)); - for (i = 0; i <= VJ_MAX_CONNECTIONS; i++) { - - StatusLink[i] = (vj_slink *) malloc(sizeof(vj_slink)); - StatusLink[i]->in_use = 0; + + for( i = 0; i < VJ_MAX_CONNECTIONS; i ++ ) + { + int j; + link[i] = (vj_link*) vj_malloc(sizeof(vj_link)); + if(!link[i]) + { + free(vjs); + free(link); + return NULL; + } + link[i]->in_use = 0; + link[i]->m_queue = (vj_message**) vj_malloc(sizeof( vj_message * ) * VJ_MAX_PENDING_MSG ); + if(!link[i]->m_queue) + { + free(vjs); + free(link[i]); + return NULL; + } + for( j = 0; j < VJ_MAX_PENDING_MSG; j ++ ) + { + link[i]->m_queue[j] = (vj_message*) vj_malloc(sizeof(vj_message)); + link[i]->m_queue[j]->len = 0; + link[i]->m_queue[j]->msg = NULL; + } + link[i]->n_queued = 0; + link[i]->n_retrieved = 0; } - veejay_msg(VEEJAY_MSG_INFO, "Status socket ready at port %d", port); - } + vjs->link = (void**) link; + vjs->nr_of_links = 0; + + FD_ZERO( &(vjs->fds) ); + FD_ZERO( &(vjs->wds) ); + FD_SET( vjs->handle, &(vjs->fds) ); + //FD_SET( vjs->handle, &(vjs->wds) ); + + vjs->nr_of_connections = vjs->handle; + + + veejay_msg(VEEJAY_MSG_DEBUG,"Connection ready at socket %d", vjs->handle ); + return vjs; } -int vj_server_raw_send(int link_id, uint8_t *buf, int len ) + +int vj_server_send( vj_server *vje, int link_id, uint8_t *buf, int len ) { - unsigned int total = 0; + unsigned int total = 0; unsigned int bytes_left = len; int n; - if (len <= 0) + vj_link **Link = (vj_link **) vje->link; + + if (len <= 0 || Link[link_id]->in_use==0) { veejay_msg(VEEJAY_MSG_ERROR, "nothing to send"); - return -1; + return 0; } + + if(!FD_ISSET( Link[link_id]->handle, &(vje->wds) ) ) + { + return 0; + } + while (total < len) { n = send(Link[link_id]->handle, buf + total, bytes_left, 0); if (n == -1) { - veejay_msg(VEEJAY_MSG_ERROR, "Cant send %d bytes",bytes_left); - return -1; + return 0; } - total += n; - bytes_left -= n; - } - return total; - -} - -int vj_server_send(int link_id, const char *buf, int len) -{ - unsigned int total = 0; - unsigned int bytes_left = len; - int n; - if (len <= 0) - return -1; - while (total < len) - { - n = send(Link[link_id]->handle, buf + total, bytes_left, 0); - if (n == -1) + if ( n == 0) { - if(Link[link_id]->in_use==0) - veejay_msg(VEEJAY_MSG_ERROR, "Link not in use"); - return -1; + return 0; } total += n; bytes_left -= n; - } - return 0; + } + return total; } - -int _vj_server_free_slot(int type) +int _vj_server_free_slot(vj_server *vje) { + vj_link **Link = (vj_link**) vje->link; int i; - if (type == 1) { - for (i = 0; i < VJ_MAX_CONNECTIONS; i++) { - if (StatusLink[i]->in_use == 0) - return i; - } - } - if (type == 0) { - for (i = 0; i < VJ_MAX_CONNECTIONS; i++) { - if (Link[i]->in_use == 0) - return i; - } - } - return -1; -} -int _vj_server_new_client(int socket_fd) -{ - int entry = _vj_server_free_slot(0); - if (entry == -1) + for (i = 0; i < VJ_MAX_CONNECTIONS; i++) { - veejay_msg(VEEJAY_MSG_ERROR, "Cannot take more connections"); - return -1; + if (Link[i]->in_use == 0) + return i; + } + return VJ_MAX_CONNECTIONS; +} + +int _vj_server_new_client(vj_server *vje, int socket_fd) +{ + int entry = _vj_server_free_slot(vje); + vj_link **Link = (vj_link**) vje->link; + if (entry == VJ_MAX_CONNECTIONS) + { + veejay_msg(VEEJAY_MSG_ERROR, "Cannot take more connections (max %d allowed)", VJ_MAX_CONNECTIONS); + return VJ_MAX_CONNECTIONS; } Link[entry]->handle = socket_fd; Link[entry]->in_use = 1; - veejay_msg(VEEJAY_MSG_INFO, "Registered new client %d (socket %d)",entry,socket_fd); - return 0; + FD_SET( socket_fd, &(vje->fds) ); + FD_SET( socket_fd, &(vje->wds) ); + vje->nr_of_links ++; + + return entry; } int _vj_server_del_client(vj_server * vje, int link_id) { - if (Link[link_id]->in_use == 1) { - Link[link_id]->in_use = 0; - close(Link[link_id]->handle); - FD_CLR(Link[link_id]->handle, &(vje->master)); - veejay_msg(VEEJAY_MSG_INFO,"Closed command socket with Link %d",link_id); - return 0; + vj_link **Link = (vj_link**) vje->link; + if(Link[link_id]->in_use ) + { + Link[link_id]->in_use = 0; + FD_CLR( Link[link_id]->handle, &(vje->fds) ); + FD_CLR( Link[link_id]->handle, &(vje->wds) ); + close(Link[link_id]->handle); + vje->nr_of_links --; + return 1; } - return -1; + return 0; } -int vj_server_must_send(vj_server *vje) { - return vje->nr_of_links; +void vj_server_close_connection(vj_server *vje, int link_id ) +{ + _vj_server_del_client( vje, link_id ); } int vj_server_poll(vj_server * vje) { - struct timeval t; - int i; - //memset(t, 0, sizeof(struct timeval)); - t.tv_sec = 0; - t.tv_usec = 0; - vje->current = vje->master; - if (select(vje->nr_of_connections + 1, &(vje->current), NULL, NULL, &t) - == -1 ) - return 0; - for(i=0; i <= vje->nr_of_connections; i++) - { - if(FD_ISSET(i, &(vje->current))) return 1; - } - return 0; -} + int status; + struct timeval t; + memset( &t, 0, sizeof(t)); -int vj_server_adv_poll(vj_server *vje , int *links) { - struct timeval t; - int i=0; - int ret_val = 0; - t.tv_sec = 0; - t.tv_usec = 0; - vje->current = vje->master; - if(select(vje->nr_of_connections +1, &(vje->current),NULL,NULL,&t) == -1) return -1; - for(i=0; i < vje->nr_of_links; i++) - { - if(FD_ISSET( Link[i]->handle,&(vje->current) ) ) - { - links[ret_val] = i; - } - if(Link[i]->in_use) links[ret_val] = i; - ret_val++; - } - - return ret_val; -} -#define RECV_SIZE (1024*256) -static unsigned char *recv_buffer; -int vj_server_init() -{ - // 1 mb buffer - recv_buffer = (unsigned char*) malloc(sizeof(unsigned char) * RECV_SIZE); - if(!recv_buffer) return 0; - bzero( recv_buffer, RECV_SIZE ); - return 1; -} + FD_ZERO( &(vje->fds) ); + FD_ZERO( &(vje->wds) ); + FD_SET( vje->handle, &(vje->fds) ); + FD_SET( vje->handle, &(vje->wds) ); -static int _which_link(int socket) -{ int i; - for(i=0; i < VJ_MAX_CONNECTIONS;i++) + for( i = 0; i < vje->nr_of_links ; i ++ ) { - if(Link[i]->handle == socket) return i; + vj_link **Link= (vj_link**) vje->link; + FD_SET( Link[i]->handle, &(vje->fds) ); + FD_SET( Link[i]->handle, &(vje->wds) ); } - return -1; + + status = select(vje->nr_of_connections + 1, &(vje->fds), &(vje->wds), 0, &t); + if( status > 0 ) + { + return 1; + } + return 0; } -int vj_server_update(vj_server * vje) +int _vj_server_parse_msg( vj_server *vje,int link_id, char *buf, int buf_len ) { - int i; - - //if (link_id < 0 || link_id > VJ_MAX_CONNECTIONS) - // return -1; - /* try new connections */ - for (i = 0; i <= vje->nr_of_connections; i++) { - if (FD_ISSET(i, &(vje->current)) && i == vje->handle) { - int addr_len = sizeof(vje->remote); - int fd = - accept(vje->handle, (struct sockaddr *) &(vje->remote), - &addr_len); - if (fd != -1) { - FD_SET(fd, &(vje->master)); - if (vje->nr_of_connections < fd) { - vje->nr_of_connections = fd; - } - if (_vj_server_new_client(fd) != 0) { - close(fd); - FD_CLR(fd, &(vje->master)); - veejay_msg(VEEJAY_MSG_ERROR,"Cannot establish connection with caller"); - } - - return 0; - } - } - else + int i = 0; + int num_msg = 0; + vj_link **Link = (vj_link**) vje->link; + vj_message **v = Link[link_id]->m_queue; + while( i < buf_len ) { - if(FD_ISSET(i, &(vje->current))) - { + if( buf[i] == 'V' && buf[i+4] == 'D' ) + { + int len, n; + n = sscanf( buf + ( i + 1 ), "%03d", &len ); + if( n == 1) + { + i += 5; // skip header + v[num_msg]->len = len; - int id = _which_link(i); - if ( id >= 0 && Link[id]->in_use) { - int n; - int nread = RECV_SIZE; - int n_msg = 0; - int j = 0; - m_queue_write = 0; - m_queue_read = 0; - /* try to read lots of bytes bytes */ - n = recv(Link[id]->handle, recv_buffer, nread, 0); - if(n<=0) - { - veejay_msg(VEEJAY_MSG_ERROR, "Closing connection with %d (recv <= 0)", - i); - _vj_server_del_client(vje,id); - return -1; - } - while(j < n) { - vj_message *raw = Link[id]->m_queue[m_queue_write]; - if(recv_buffer[j] == 'V' && recv_buffer[j+4] == 'D') - { - int m_len = 0; - if(sscanf(recv_buffer+(j+1), "%03d",&m_len)>0) - { - j+=5; /* skip header */ - if(m_queue_write < VJ_MAX_PENDING_MSG) - { - raw->header[0] = recv_buffer[0]; - raw->data[0] = recv_buffer[4]; - raw->data_len = m_len; - bzero(raw->message, MESSAGE_SIZE); - strncpy( raw->message, recv_buffer+j, m_len ); - j+= m_len; - n_msg ++; - m_queue_write ++; - } - } - } - else - { - j++; + if(v[num_msg]->msg != NULL) + free( v[num_msg]->msg ); + + v[num_msg]->msg = (char*)strndup( buf + i , len ); + i += len; + num_msg ++; } - } - bzero(recv_buffer, j); - return n_msg; } - - } + i ++; } - } - return -1; + Link[link_id]->n_queued = num_msg; + Link[link_id]->n_retrieved = 0; + return num_msg; } - -int vj_server_status_check(vj_server * vje) +int vj_server_new_connection(vj_server *vje) { - int i = 0; - - for (i = 0; i <= vje->nr_of_connections; i++) { - if (FD_ISSET(i, &(vje->current))) { - if (i == vje->handle) { + if( FD_ISSET( vje->handle, &(vje->fds) ) ) + { int addr_len = sizeof(vje->remote); - int fd = - accept(vje->handle, (struct sockaddr *) &(vje->remote), - &addr_len); - if (fd != -1) { - int entry = -1; - FD_SET(fd, &(vje->master)); - if (vje->nr_of_connections < fd) { - vje->nr_of_connections = fd; - } - entry = _vj_server_free_slot(1); - if (entry != -1) { - StatusLink[entry]->handle = fd; - StatusLink[entry]->in_use = 1; - vje->nr_of_links++; - return 0; - } else { + int n = 0; + int fd = accept( vje->handle, (struct sockaddr*) &(vje->remote), &addr_len ); + if(fd == -1) + { + veejay_msg(VEEJAY_MSG_ERROR, "Error accepting connection"); + return -1; + } + + if( vje->nr_of_connections < fd ) vje->nr_of_connections = fd; + + n = _vj_server_new_client(vje, fd); + if( n == VJ_MAX_CONNECTIONS ) + { close(fd); - FD_CLR(fd, &(vje->master)); - vje->nr_of_links--; - if(vje->nr_of_links<0) vje->nr_of_links=0; - } + veejay_msg(VEEJAY_MSG_ERROR, "Cannot establish connection"); + return -1; } - return -1; - } + // veejay_msg(VEEJAY_MSG_DEBUG, "New connection socket %d", fd); + return n; } - } - return -1; -} -int vj_server_status_send(vj_server *vje, char *buf, int len) -{ - int n; - int i; - unsigned int total = -1; - unsigned int bytes_left = len; - - if (len <= 0) return -1; - - for(i=0; i < VJ_MAX_CONNECTIONS; i++) { - if (StatusLink[i]->in_use == 1 ) { - //if( FD_ISSET( StatusLink[i]->handle, &(vje->master)) ) { - //} - total = 0; - bytes_left = len; - while (total < len) { - n = send(StatusLink[i]->handle, buf + total, bytes_left, - 0); - if (n <= 0) { - close(StatusLink[i]->handle); - StatusLink[i]->in_use = 0; - FD_CLR( StatusLink[i]->handle, &(vje->master)); - vje->nr_of_links--; - veejay_msg(VEEJAY_MSG_INFO, "Closed status connection with Link %d",i); - return -1; - } - total += n; - bytes_left -= n; - } - } - } - return total; -} -/* close_link : close a status link (1) or normal (0) */ -void vj_server_close_link(vj_server * vje, int link_id, int type) -{ - if (link_id >= 0 && link_id < VJ_MAX_CONNECTIONS) { - if (type == 0) { - if (Link[link_id]->in_use == 1) { - close(Link[link_id]->handle); - Link[link_id]->in_use = 0; - FD_CLR(Link[link_id]->handle, &(vje->master)); - veejay_msg(VEEJAY_MSG_INFO,"Closed command socket with link %d",link_id); - } - } - if (type == 1) { - if (StatusLink[link_id]->in_use == 1) { - close(StatusLink[link_id]->handle); - StatusLink[link_id]->in_use = 0; - FD_CLR(StatusLink[link_id]->handle, &(vje->master)); - veejay_msg(VEEJAY_MSG_INFO,"Closed status socket with link %d",link_id); - } - } - - } -} - - -void vj_server_shutdown(vj_server *vje, int type) { - int link_id; - int i; - - if(type==1) { - for(link_id=0; link_id < VJ_MAX_CONNECTIONS; link_id++) { - if(Link[link_id]->in_use==1) close(Link[link_id]->handle); - } - close(vje->handle); - for (i = 0; i <= VJ_MAX_CONNECTIONS; i++) { - int j; - for(j=0; j < (VJ_MAX_PENDING_MSG+1); j++) { - if(Link[i]->m_queue[j]) free(Link[i]->m_queue[j]); - } - if(Link[i]->m_queue) free(Link[i]->m_queue); - if(Link[i]) free(Link[i]); - } - free(Link); - } - - if(type==0) { - for(link_id=0; link_id < VJ_MAX_CONNECTIONS; link_id++) { - if(StatusLink[link_id]->in_use==1) close(StatusLink[link_id]->handle); - } - close(vje->handle); - for (i = 0; i <= VJ_MAX_CONNECTIONS; i++) { - free(StatusLink[i]); - } - free(StatusLink); - } -} - -/* retrieve_msg: - keep repeating this function until all messages have been parsed. - it copies the string in the message queue to dst, - and sets the m_queue_read ready for next call. - return 0 on success, -1 on failure (no more messages) */ -int vj_server_retrieve_msg(int link_id, char *dst) -{ - if (Link[link_id]->in_use == 0) return -1; - - if (m_queue_read < m_queue_write) { - if (Link[link_id]->m_queue[m_queue_read]->data_len > 0) { - strncpy(dst, - Link[link_id]->m_queue[m_queue_read]->message, - Link[link_id]->m_queue[m_queue_read]->data_len); - dst[Link[link_id]->m_queue[m_queue_read]->data_len] = '\0'; - Link[link_id]->m_queue[m_queue_read]->data_len = 0; - bzero(Link[link_id]->m_queue[m_queue_read]->message, - MESSAGE_SIZE); - m_queue_read ++; - return (m_queue_write - m_queue_read ); - } - } - return -1; /* no messages */ } + +int vj_server_update( vj_server *vje, int id ) +{ + // new connection ? + vj_link **Link = (vj_link**) vje->link; + int sock_fd = Link[id]->handle; + + if( FD_ISSET( sock_fd, &(vje->fds)) ) + { + int n; + if( !Link[id]->in_use ) + { + veejay_msg(VEEJAY_MSG_ERROR, "Link %d not in use but socket changed!", id); + return 0; + } + + n = recv( sock_fd, recv_buffer, RECV_SIZE, 0 ); + if( n <= 0 ) + { + veejay_msg(VEEJAY_MSG_INFO, "Closing connection num %d", id ); + _vj_server_del_client( vje, id ); + return 0; + } + + return _vj_server_parse_msg( vje, id, recv_buffer,n ); + } + return 0; +} + +void vj_server_shutdown(vj_server *vje) +{ + int j,i; + vj_link **Link = (vj_link**) vje->link; + + for(i=0; i < VJ_MAX_CONNECTIONS; i++) + { + if(Link[i]->in_use) + close(Link[i]->handle); + for( j = 0; j < VJ_MAX_PENDING_MSG; j ++ ) + { + if(Link[i]->m_queue[j]->msg ) free( Link[i]->m_queue[j]->msg ); + if(Link[i]->m_queue[j] ) free( Link[i]->m_queue[j] ); + } + if( Link[i]->m_queue ) free( Link[i]->m_queue ); + if( Link[i] ) free( Link[i] ); + } + close(vje->handle); + free(Link); +} + +int vj_server_retrieve_msg(vj_server *vje, int id, char *dst ) +{ + vj_link **Link = (vj_link**) vje->link; + if (Link[id]->in_use == 0) + return 0; + + int index = Link[id]->n_retrieved; + char *msg; + int len; + + if( index == Link[id]->n_queued ) + { + return 0; // done + } + msg = Link[id]->m_queue[index]->msg; + len = Link[id]->m_queue[index]->len; + + strncpy( dst, msg, len ); + + index ++; + + Link[id]->n_retrieved = index; + + return 1; +} +