4 * Bounded, blocking queue for inter-thread communication.
6 * Copyright (c) 2012 Nathan Keynes.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
23 #define TQUEUE_LENGTH 64
26 tqueue_callback callback;
32 pthread_mutex_t mutex;
33 pthread_cond_t consumer_wait;
34 pthread_cond_t producer_sync_wait;
35 pthread_cond_t producer_full_wait;
36 int head; /* next item returned by dequeue */
37 int tail; /* next item filled in by enqueue */
38 int last_result; /* Result value of last dequeued callback */
39 tqueue_entry tqueue[TQUEUE_LENGTH];
40 } tqueue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, -1};
42 /************** Producer thread **************/
43 #define TQUEUE_EMPTY() (tqueue.head == tqueue.tail)
44 #define TQUEUE_FULL() ((tqueue.head == tqueue.tail+1) || (tqueue.head == 0 && tqueue.tail == TQUEUE_LENGTH))
46 static void tqueue_enqueue( tqueue_callback callback, void *data, gboolean sync )
48 assert( !TQUEUE_FULL() );
49 tqueue.tqueue[tqueue.tail].callback = callback;
50 tqueue.tqueue[tqueue.tail].data = data;
51 tqueue.tqueue[tqueue.tail].synchronous = sync;
56 * Add a message to the UI queue and return immediately.
58 void tqueue_post_message( tqueue_callback callback, void *data )
60 pthread_mutex_lock(&tqueue.mutex);
62 /* Wait for the queue to clear */
63 pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
65 tqueue_enqueue( callback, data, FALSE );
66 pthread_cond_signal(&tqueue.consumer_wait);
67 pthread_mutex_unlock(&tqueue.mutex);
71 * Add a message to the UI queue and wait for it to be handled.
72 * @return the result from the handler function.
74 int tqueue_send_message( tqueue_callback callback, void *data )
76 pthread_mutex_lock(&tqueue.mutex);
78 /* Wait for the queue to clear */
79 pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
81 tqueue_enqueue( callback, data, TRUE );
82 pthread_cond_signal(&tqueue.consumer_wait);
83 pthread_cond_wait(&tqueue.producer_sync_wait, &tqueue.mutex);
84 return tqueue.last_result;
85 pthread_mutex_unlock(&tqueue.mutex);
88 /************** Consumer thread **************/
90 /* Note: must be called with mutex locked */
91 static void tqueue_process_loop() {
92 while( !TQUEUE_EMPTY() ) {
93 gboolean wasFull = TQUEUE_FULL();
94 tqueue_callback callback = tqueue.tqueue[tqueue.head].callback;
95 void *data = tqueue.tqueue[tqueue.head].data;
96 gboolean sync = tqueue.tqueue[tqueue.head].synchronous;
100 pthread_cond_signal( &tqueue.producer_full_wait );
103 pthread_mutex_unlock(&tqueue.mutex);
104 int result = callback(data);
105 pthread_mutex_lock(&tqueue.mutex);
107 tqueue.last_result = result;
108 pthread_cond_signal( &tqueue.producer_sync_wait );
114 * Process all messages in the queue, if any.
116 void tqueue_process_all()
118 pthread_mutex_lock(&tqueue.mutex);
119 if( !TQUEUE_EMPTY() ) {
120 tqueue_process_loop();
122 pthread_mutex_unlock(&tqueue.mutex);
126 * Process the first message in the queue. If no messages are on the
127 * queue, waits for the next one to be queued and then processes it.
129 void tqueue_process_wait()
131 pthread_mutex_lock(&tqueue.mutex);
132 if( TQUEUE_EMPTY() ) {
133 pthread_cond_wait( &tqueue.consumer_wait, &tqueue.mutex );
135 tqueue_process_loop();
136 pthread_mutex_unlock(&tqueue.mutex);
.