nkeynes@1245: /** nkeynes@1245: * $Id$ nkeynes@1245: * nkeynes@1245: * Bounded, blocking queue for inter-thread communication. nkeynes@1245: * nkeynes@1245: * Copyright (c) 2012 Nathan Keynes. nkeynes@1245: * nkeynes@1245: * This program is free software; you can redistribute it and/or modify nkeynes@1245: * it under the terms of the GNU General Public License as published by nkeynes@1245: * the Free Software Foundation; either version 2 of the License, or nkeynes@1245: * (at your option) any later version. nkeynes@1245: * nkeynes@1245: * This program is distributed in the hope that it will be useful, nkeynes@1245: * but WITHOUT ANY WARRANTY; without even the implied warranty of nkeynes@1245: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the nkeynes@1245: * GNU General Public License for more details. nkeynes@1245: */ nkeynes@1245: nkeynes@1245: #include nkeynes@1245: #include nkeynes@1245: #include "tqueue.h" nkeynes@1245: nkeynes@1245: #define TQUEUE_LENGTH 64 nkeynes@1245: nkeynes@1245: typedef struct { nkeynes@1245: tqueue_callback callback; nkeynes@1245: void *data; nkeynes@1245: gboolean synchronous; nkeynes@1245: } tqueue_entry; nkeynes@1245: nkeynes@1245: struct { nkeynes@1245: pthread_mutex_t mutex; nkeynes@1245: pthread_cond_t consumer_wait; nkeynes@1245: pthread_cond_t producer_sync_wait; nkeynes@1245: pthread_cond_t producer_full_wait; nkeynes@1245: int head; /* next item returned by dequeue */ nkeynes@1245: int tail; /* next item filled in by enqueue */ nkeynes@1245: int last_result; /* Result value of last dequeued callback */ nkeynes@1245: tqueue_entry tqueue[TQUEUE_LENGTH]; nkeynes@1245: } tqueue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, -1}; nkeynes@1245: nkeynes@1245: /************** Producer thread **************/ nkeynes@1245: #define TQUEUE_EMPTY() (tqueue.head == tqueue.tail) nkeynes@1245: #define TQUEUE_FULL() ((tqueue.head == tqueue.tail+1) || (tqueue.head == 0 && tqueue.tail == TQUEUE_LENGTH)) nkeynes@1245: nkeynes@1245: static void tqueue_enqueue( tqueue_callback callback, void *data, gboolean sync ) nkeynes@1245: { nkeynes@1245: assert( !TQUEUE_FULL() ); nkeynes@1245: tqueue.tqueue[tqueue.tail].callback = callback; nkeynes@1245: tqueue.tqueue[tqueue.tail].data = data; nkeynes@1245: tqueue.tqueue[tqueue.tail].synchronous = sync; nkeynes@1245: tqueue.tail++; nkeynes@1245: } nkeynes@1245: nkeynes@1245: /** nkeynes@1245: * Add a message to the UI queue and return immediately. nkeynes@1245: */ nkeynes@1245: void tqueue_post_message( tqueue_callback callback, void *data ) nkeynes@1245: { nkeynes@1245: pthread_mutex_lock(&tqueue.mutex); nkeynes@1245: if( TQUEUE_FULL() ) { nkeynes@1245: /* Wait for the queue to clear */ nkeynes@1245: pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex); nkeynes@1245: } nkeynes@1245: tqueue_enqueue( callback, data, FALSE ); nkeynes@1245: pthread_cond_signal(&tqueue.consumer_wait); nkeynes@1245: pthread_mutex_unlock(&tqueue.mutex); nkeynes@1245: } nkeynes@1245: nkeynes@1245: /** nkeynes@1245: * Add a message to the UI queue and wait for it to be handled. nkeynes@1245: * @return the result from the handler function. nkeynes@1245: */ nkeynes@1245: int tqueue_send_message( tqueue_callback callback, void *data ) nkeynes@1245: { nkeynes@1245: pthread_mutex_lock(&tqueue.mutex); nkeynes@1245: if( TQUEUE_FULL() ) { nkeynes@1245: /* Wait for the queue to clear */ nkeynes@1245: pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex); nkeynes@1245: } nkeynes@1245: tqueue_enqueue( callback, data, TRUE ); nkeynes@1245: pthread_cond_signal(&tqueue.consumer_wait); nkeynes@1245: pthread_cond_wait(&tqueue.producer_sync_wait, &tqueue.mutex); nkeynes@1245: return tqueue.last_result; nkeynes@1245: pthread_mutex_unlock(&tqueue.mutex); nkeynes@1245: } nkeynes@1245: nkeynes@1245: /************** Consumer thread **************/ nkeynes@1245: nkeynes@1245: /* Note: must be called with mutex locked */ nkeynes@1245: static void tqueue_process_loop() { nkeynes@1245: while( !TQUEUE_EMPTY() ) { nkeynes@1245: gboolean wasFull = TQUEUE_FULL(); nkeynes@1245: tqueue_callback callback = tqueue.tqueue[tqueue.head].callback; nkeynes@1245: void *data = tqueue.tqueue[tqueue.head].data; nkeynes@1245: gboolean sync = tqueue.tqueue[tqueue.head].synchronous; nkeynes@1245: tqueue.head++; nkeynes@1245: nkeynes@1245: if( wasFull ) { nkeynes@1245: pthread_cond_signal( &tqueue.producer_full_wait ); nkeynes@1245: } nkeynes@1245: nkeynes@1245: pthread_mutex_unlock(&tqueue.mutex); nkeynes@1245: int result = callback(data); nkeynes@1245: pthread_mutex_lock(&tqueue.mutex); nkeynes@1245: if( sync ) { nkeynes@1245: tqueue.last_result = result; nkeynes@1245: pthread_cond_signal( &tqueue.producer_sync_wait ); nkeynes@1245: } nkeynes@1245: } nkeynes@1245: } nkeynes@1245: nkeynes@1245: /** nkeynes@1245: * Process all messages in the queue, if any. nkeynes@1245: */ nkeynes@1245: void tqueue_process_all() nkeynes@1245: { nkeynes@1245: pthread_mutex_lock(&tqueue.mutex); nkeynes@1245: if( !TQUEUE_EMPTY() ) { nkeynes@1245: tqueue_process_loop(); nkeynes@1245: } nkeynes@1245: pthread_mutex_unlock(&tqueue.mutex); nkeynes@1245: } nkeynes@1245: nkeynes@1245: /** nkeynes@1245: * Process the first message in the queue. If no messages are on the nkeynes@1245: * queue, waits for the next one to be queued and then processes it. nkeynes@1245: */ nkeynes@1245: void tqueue_process_wait() nkeynes@1245: { nkeynes@1245: pthread_mutex_lock(&tqueue.mutex); nkeynes@1245: if( TQUEUE_EMPTY() ) { nkeynes@1245: pthread_cond_wait( &tqueue.consumer_wait, &tqueue.mutex ); nkeynes@1245: } nkeynes@1245: tqueue_process_loop(); nkeynes@1245: pthread_mutex_unlock(&tqueue.mutex); nkeynes@1245: }