From: Terrance S. <ts...@us...> - 2007-04-24 18:35:37
|
Update of /cvsroot/xsb/XSB/emu In directory sc8-pr-cvs10.sourceforge.net:/tmp/cvs-serv8249 Modified Files: context.h thread_defs_xsb.h thread_xsb.c thread_xsb.h Log Message: First pass at message queues. Index: context.h =================================================================== RCS file: /cvsroot/xsb/XSB/emu/context.h,v retrieving revision 1.54 retrieving revision 1.55 diff -u -r1.54 -r1.55 --- context.h 23 Feb 2007 20:17:04 -0000 1.54 +++ context.h 24 Apr 2007 18:35:31 -0000 1.55 @@ -100,7 +100,12 @@ // char ccall_error_backtrace }; - +typedef struct Message_Queue_Cell *MQ_Cell_Ptr; +typedef struct Message_Queue_Cell { + MQ_Cell_Ptr next; + MQ_Cell_Ptr prev; + int size; +} MQ_Cell; #define MAX_RETRACTED_CLAUSES 20 @@ -380,6 +385,9 @@ pthread_mutex_t _xsb_query_mut; struct ccall_error_t _ccall_error; + /********** Message Queue State **********/ + MQ_Cell_Ptr _current_mq_cell; + /************ Pointers to cursor information used by odbc_xsb.c context-local cursor table ***********/ @@ -592,6 +600,8 @@ #define xsb_query_mut (th-> _xsb_query_mut) #define ccall_error (th-> _ccall_error) +#define current_mq_cell (th->_current_mq_cell) + #define retracted_buffer (th->_retracted_buffer) #define OldestCl (th->_OldestCl) #define NewestCl (th->_NewestCl) Index: thread_defs_xsb.h =================================================================== RCS file: /cvsroot/xsb/XSB/emu/thread_defs_xsb.h,v retrieving revision 1.28 retrieving revision 1.29 diff -u -r1.28 -r1.29 --- thread_defs_xsb.h 25 Jan 2007 20:33:56 -0000 1.28 +++ thread_defs_xsb.h 24 Apr 2007 18:35:31 -0000 1.29 @@ -69,7 +69,15 @@ #define PTHREAD_SETCONCURRENCY 29 #define PTHREAD_GETCONCURRENCY 30 -#define SET_XSB_READY 31 +#define SET_XSB_READY 31 + +#define MESSAGE_QUEUE_CREATE 32 +#define THREAD_SEND_MESSAGE 33 +#define THREAD_TRY_MESSAGE 34 +#define THREAD_RETRY_MESSAGE 35 +#define THREAD_ACCEPT_MESSAGE 36 + +#define PRINT_MESSAGE_QUEUE 40 #define MAX_THREADS 1024 Index: thread_xsb.c =================================================================== RCS file: /cvsroot/xsb/XSB/emu/thread_xsb.c,v retrieving revision 1.70 retrieving revision 1.71 diff -u -r1.70 -r1.71 --- thread_xsb.c 12 Mar 2007 16:06:20 -0000 1.70 +++ thread_xsb.c 24 Apr 2007 18:35:31 -0000 1.71 @@ -25,6 +25,7 @@ #include <time.h> #include <math.h> #include <signal.h> +#include <string.h> #include "xsb_debug.h" #include "xsb_config.h" @@ -75,6 +76,7 @@ void set_init_pdl_size(int); void set_init_complstack_size(int); Cell copy_term_from_thread( th_context *th, th_context *from, Cell arg1 ); +int copy_term_to_message_queue(th_context *th, Cell arg1); /* Used to create detached thread -- process global. */ pthread_attr_t detached_attr_gl; @@ -1093,6 +1095,151 @@ break; } + case MESSAGE_QUEUE_CREATE: { + XSB_MQ_Ptr xsb_mq; + int declared_size; + + xsb_mq = (XSB_MQ_Ptr) mem_alloc(sizeof(XSB_MQ),THREAD_SPACE); + xsb_mq->first_message = 0; + xsb_mq->last_message = 0; + xsb_mq->size = 0; + if ((declared_size = ptoc_int(CTXTc 3)) == 0) + xsb_mq->max_size = DEFAULT_MQ_SIZE; + else xsb_mq->max_size = declared_size; + + pthread_mutex_init(&xsb_mq->mq_mutex, NULL ) ; + pthread_cond_init( &xsb_mq->mq_has_free_cells, NULL ); + pthread_cond_init( &xsb_mq->mq_has_messages, NULL ); + + ctop_int(CTXTc 2,(int) xsb_mq); + break; + } + + case THREAD_SEND_MESSAGE: { + XSB_MQ_Ptr message_queue = (XSB_MQ_Ptr) ptoc_int(CTXTc 2); + MQ_Cell_Ptr this_cell; + + /* int i; CPtr BuffPtr; + BuffPtr = (CPtr) asrtBuff-> Buff; + for (i = 0 ; i < (asrtBuff->Size)/sizeof(CPtr) ; i++) + printf("asrtBuf (%d): %x\n",i,BuffPtr[i]); */ + + pthread_mutex_lock(&message_queue->mq_mutex); + while (message_queue->size >= message_queue->max_size) { + pthread_cond_wait(&message_queue->mq_has_free_cells,&message_queue->mq_mutex); + } + + this_cell = mem_alloc(asrtBuff->Size+sizeof(MQ_Cell),THREAD_SPACE); + this_cell->prev = message_queue->last_message; + this_cell->next = 0; + this_cell->size = asrtBuff->Size+sizeof(MQ_Cell); + /* Moves assert buffer to word just after MQ_Cell */ + memmove(this_cell+1,asrtBuff->Buff,asrtBuff->Size); + + /* BuffPtr = (CPtr) this_cell->message; + for (i = 0 ; i < (asrtBuff->Size)/sizeof(CPtr) ; i++) + printf("msgBuf (%d): %x\n",i,BuffPtr[i]); */ + if (message_queue->last_message) + (message_queue->last_message)->next = this_cell; + message_queue->last_message = this_cell; + message_queue->size++; + + if (!message_queue->first_message) { + message_queue->first_message = this_cell; + } + + pthread_mutex_unlock(&message_queue->mq_mutex); + /* Need to broadcast whenever you add a new message as + threads may be waiting at the end of a non-full queue*/ + pthread_cond_broadcast(&message_queue->mq_has_messages); + break; + } + +case THREAD_TRY_MESSAGE: { + XSB_MQ_Ptr message_queue = (XSB_MQ_Ptr) ptoc_int(CTXTc 2); + + pthread_mutex_lock(&message_queue->mq_mutex); + while (!message_queue->first_message) { + pthread_cond_wait(&message_queue->mq_has_messages,&message_queue->mq_mutex); + } + current_mq_cell = message_queue->first_message; + pcreg = (byte *)(current_mq_cell+1); + break; + } + + /* THREAD_RETRY_MESSAGE will have the lock as set up by THREAD_TRY_MESSAGE (or by + succeeding out of pthread_cond_wait() The lock will be unlocked + either by THREAD_ACCEPT_MESSAGE or by suspending in + pthread_cond_wait() */ +case THREAD_RETRY_MESSAGE: { + XSB_MQ_Ptr message_queue = (XSB_MQ_Ptr) ptoc_int(CTXTc 2); + + /* If current_mq_cell is last message, the thread has made a + traversal through the queue without finding a term that + unifies. It gives up the lock and goes to sleep. Before + it wakes again, another thread may have changed the queue + substantially -- so its old state (current_mq_cell) is + invalid. This cell may even have been reclaimed. Thus, + there is no way of relating our old position to the new + queue. All you can do is start again from the beginning + (checking, of course, that there is a beginning) */ + + if (current_mq_cell == message_queue->last_message) { + pthread_cond_wait(&message_queue->mq_has_messages,&message_queue->mq_mutex); + while (!message_queue->first_message) { + pthread_cond_wait(&message_queue->mq_has_messages,&message_queue->mq_mutex); + } + current_mq_cell = message_queue->first_message; + } + else current_mq_cell = current_mq_cell->next; + pcreg = (byte *) (current_mq_cell+1); // offset for compiled code. + break; + } + + /* Broadcasts whenever it goes from "full" to "not_full" so that + writers will be awakened. */ +case THREAD_ACCEPT_MESSAGE: { + XSB_MQ_Ptr message_queue = (XSB_MQ_Ptr) ptoc_int(CTXTc 2); + + /* Take MQ_Cell out of chain, and clear its contents */ + if (message_queue->first_message == current_mq_cell) + message_queue->first_message = current_mq_cell->next; + + if (message_queue->last_message == current_mq_cell) + message_queue->last_message = current_mq_cell->prev; + + if (current_mq_cell->prev) + (current_mq_cell->prev)->next = (current_mq_cell->next); + + if (current_mq_cell->next) + (current_mq_cell->next)->prev = (current_mq_cell->prev); + + mem_dealloc(current_mq_cell,current_mq_cell->size,THREAD_SPACE); + + message_queue->size--; + if (message_queue->size+1 == message_queue->max_size) { + pthread_mutex_unlock(&message_queue->mq_mutex); + pthread_cond_broadcast(&message_queue->mq_has_free_cells); + } + else pthread_mutex_unlock(&message_queue->mq_mutex); + break; + } + + case PRINT_MESSAGE_QUEUE: { + XSB_MQ_Ptr xsb_mq = (XSB_MQ_Ptr) ptoc_int(CTXTc 2); + MQ_Cell_Ptr cur_cell; + + printf("first message %p last_message %p size %d\n", + xsb_mq->first_message,xsb_mq->last_message,xsb_mq->size); + cur_cell = xsb_mq->first_message; + while (cur_cell != 0) { + printf("cell %p next %p, prev %p, size %d\n", + cur_cell,cur_cell->next,cur_cell->prev,cur_cell->size); + cur_cell = cur_cell->next; + } + break; + } + default: rc = 0 ; /* Keep compiler happy */ xsb_abort( "[THREAD] Invalid thread operation requested %d",request_num); Index: thread_xsb.h =================================================================== RCS file: /cvsroot/xsb/XSB/emu/thread_xsb.h,v retrieving revision 1.28 retrieving revision 1.29 diff -u -r1.28 -r1.29 --- thread_xsb.h 25 Jan 2007 20:33:56 -0000 1.28 +++ thread_xsb.h 24 Apr 2007 18:35:31 -0000 1.29 @@ -139,6 +139,20 @@ #endif #ifdef SHARED_COMPL_TABLES int get_waiting_for_tid( int t ); + +#define DEFAULT_MQ_SIZE 100 + +typedef struct XSB_Message_Queue { + pthread_mutex_t mq_mutex; + pthread_cond_t mq_has_free_cells; + pthread_cond_t mq_has_messages; + MQ_Cell_Ptr first_message; + MQ_Cell_Ptr last_message; + int size; + int max_size; +} XSB_MQ; +typedef XSB_MQ *XSB_MQ_Ptr; + #endif #define ENSURE_ONE_THREAD() \ @@ -146,7 +160,6 @@ xsb_abort( "Operation is permitted only when a single thread is active" ) ; \ } - /* TLS: the mt engine does not yet work for enable no cygwin, but this allows random and srandom to be used. They're both defined in |