4 * Bounded, blocking queue for inter-thread communication.
6 * Copyright (c) 2012 Nathan Keynes.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
23 #define TQUEUE_LENGTH 64
26 tqueue_callback callback;
32 pthread_mutex_t mutex;
33 pthread_cond_t consumer_wait;
34 pthread_cond_t producer_sync_wait;
35 pthread_cond_t producer_full_wait;
36 int head; /* next item returned by dequeue */
37 int tail; /* next item filled in by enqueue */
38 int last_result; /* Result value of last dequeued callback */
39 tqueue_entry tqueue[TQUEUE_LENGTH];
40 } tqueue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, -1};
42 /************** Producer thread **************/
43 #define TQUEUE_EMPTY() (tqueue.head == tqueue.tail)
44 #define TQUEUE_FULL() ((tqueue.head == tqueue.tail+1) || (tqueue.head == 0 && tqueue.tail == (TQUEUE_LENGTH-1)))
46 static void tqueue_enqueue( tqueue_callback callback, void *data, gboolean sync )
48 assert( !TQUEUE_FULL() );
49 tqueue.tqueue[tqueue.tail].callback = callback;
50 tqueue.tqueue[tqueue.tail].data = data;
51 tqueue.tqueue[tqueue.tail].synchronous = sync;
53 if( tqueue.tail == TQUEUE_LENGTH )
58 * Add a message to the UI queue and return immediately.
60 void tqueue_post_message( tqueue_callback callback, void *data )
62 pthread_mutex_lock(&tqueue.mutex);
64 /* Wait for the queue to clear */
65 pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
67 tqueue_enqueue( callback, data, FALSE );
68 pthread_cond_signal(&tqueue.consumer_wait);
69 pthread_mutex_unlock(&tqueue.mutex);
73 * Add a message to the UI queue and wait for it to be handled.
74 * @return the result from the handler function.
76 int tqueue_send_message( tqueue_callback callback, void *data )
79 pthread_mutex_lock(&tqueue.mutex);
81 /* Wait for the queue to clear */
82 pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
84 tqueue_enqueue( callback, data, TRUE );
85 pthread_cond_signal(&tqueue.consumer_wait);
86 pthread_cond_wait(&tqueue.producer_sync_wait, &tqueue.mutex);
87 result = tqueue.last_result;
88 pthread_mutex_unlock(&tqueue.mutex);
92 /************** Consumer thread **************/
94 /* Note: must be called with mutex locked */
95 static void tqueue_process_loop() {
96 while( !TQUEUE_EMPTY() ) {
97 gboolean wasFull = TQUEUE_FULL();
98 tqueue_callback callback = tqueue.tqueue[tqueue.head].callback;
99 void *data = tqueue.tqueue[tqueue.head].data;
100 gboolean sync = tqueue.tqueue[tqueue.head].synchronous;
102 if( tqueue.head == TQUEUE_LENGTH )
106 pthread_cond_signal( &tqueue.producer_full_wait );
109 pthread_mutex_unlock(&tqueue.mutex);
110 int result = callback(data);
111 pthread_mutex_lock(&tqueue.mutex);
113 tqueue.last_result = result;
114 pthread_cond_signal( &tqueue.producer_sync_wait );
120 * Process all messages in the queue, if any.
122 void tqueue_process_all()
124 pthread_mutex_lock(&tqueue.mutex);
125 if( !TQUEUE_EMPTY() ) {
126 tqueue_process_loop();
128 pthread_mutex_unlock(&tqueue.mutex);
132 * Process the first message in the queue. If no messages are on the
133 * queue, waits for the next one to be queued and then processes it.
135 void tqueue_process_wait()
137 pthread_mutex_lock(&tqueue.mutex);
138 if( TQUEUE_EMPTY() ) {
139 pthread_cond_wait( &tqueue.consumer_wait, &tqueue.mutex );
141 tqueue_process_loop();
142 pthread_mutex_unlock(&tqueue.mutex);
.