nkeynes@1245 | 1 | /**
|
nkeynes@1245 | 2 | * $Id$
|
nkeynes@1245 | 3 | *
|
nkeynes@1245 | 4 | * Bounded, blocking queue for inter-thread communication.
|
nkeynes@1245 | 5 | *
|
nkeynes@1245 | 6 | * Copyright (c) 2012 Nathan Keynes.
|
nkeynes@1245 | 7 | *
|
nkeynes@1245 | 8 | * This program is free software; you can redistribute it and/or modify
|
nkeynes@1245 | 9 | * it under the terms of the GNU General Public License as published by
|
nkeynes@1245 | 10 | * the Free Software Foundation; either version 2 of the License, or
|
nkeynes@1245 | 11 | * (at your option) any later version.
|
nkeynes@1245 | 12 | *
|
nkeynes@1245 | 13 | * This program is distributed in the hope that it will be useful,
|
nkeynes@1245 | 14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of
|
nkeynes@1245 | 15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
nkeynes@1245 | 16 | * GNU General Public License for more details.
|
nkeynes@1245 | 17 | */
|
nkeynes@1245 | 18 |
|
nkeynes@1245 | 19 | #include <assert.h>
|
nkeynes@1245 | 20 | #include <pthread.h>
|
nkeynes@1245 | 21 | #include "tqueue.h"
|
nkeynes@1245 | 22 |
|
nkeynes@1245 | 23 | #define TQUEUE_LENGTH 64
|
nkeynes@1245 | 24 |
|
nkeynes@1245 | 25 | typedef struct {
|
nkeynes@1245 | 26 | tqueue_callback callback;
|
nkeynes@1245 | 27 | void *data;
|
nkeynes@1245 | 28 | gboolean synchronous;
|
nkeynes@1245 | 29 | } tqueue_entry;
|
nkeynes@1245 | 30 |
|
nkeynes@1245 | 31 | struct {
|
nkeynes@1245 | 32 | pthread_mutex_t mutex;
|
nkeynes@1245 | 33 | pthread_cond_t consumer_wait;
|
nkeynes@1245 | 34 | pthread_cond_t producer_sync_wait;
|
nkeynes@1245 | 35 | pthread_cond_t producer_full_wait;
|
nkeynes@1245 | 36 | int head; /* next item returned by dequeue */
|
nkeynes@1245 | 37 | int tail; /* next item filled in by enqueue */
|
nkeynes@1245 | 38 | int last_result; /* Result value of last dequeued callback */
|
nkeynes@1245 | 39 | tqueue_entry tqueue[TQUEUE_LENGTH];
|
nkeynes@1245 | 40 | } tqueue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, -1};
|
nkeynes@1245 | 41 |
|
nkeynes@1245 | 42 | /************** Producer thread **************/
|
nkeynes@1245 | 43 | #define TQUEUE_EMPTY() (tqueue.head == tqueue.tail)
|
nkeynes@1245 | 44 | #define TQUEUE_FULL() ((tqueue.head == tqueue.tail+1) || (tqueue.head == 0 && tqueue.tail == TQUEUE_LENGTH))
|
nkeynes@1245 | 45 |
|
nkeynes@1245 | 46 | static void tqueue_enqueue( tqueue_callback callback, void *data, gboolean sync )
|
nkeynes@1245 | 47 | {
|
nkeynes@1245 | 48 | assert( !TQUEUE_FULL() );
|
nkeynes@1245 | 49 | tqueue.tqueue[tqueue.tail].callback = callback;
|
nkeynes@1245 | 50 | tqueue.tqueue[tqueue.tail].data = data;
|
nkeynes@1245 | 51 | tqueue.tqueue[tqueue.tail].synchronous = sync;
|
nkeynes@1245 | 52 | tqueue.tail++;
|
nkeynes@1245 | 53 | }
|
nkeynes@1245 | 54 |
|
nkeynes@1245 | 55 | /**
|
nkeynes@1245 | 56 | * Add a message to the UI queue and return immediately.
|
nkeynes@1245 | 57 | */
|
nkeynes@1245 | 58 | void tqueue_post_message( tqueue_callback callback, void *data )
|
nkeynes@1245 | 59 | {
|
nkeynes@1245 | 60 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 61 | if( TQUEUE_FULL() ) {
|
nkeynes@1245 | 62 | /* Wait for the queue to clear */
|
nkeynes@1245 | 63 | pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
|
nkeynes@1245 | 64 | }
|
nkeynes@1245 | 65 | tqueue_enqueue( callback, data, FALSE );
|
nkeynes@1245 | 66 | pthread_cond_signal(&tqueue.consumer_wait);
|
nkeynes@1245 | 67 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1245 | 68 | }
|
nkeynes@1245 | 69 |
|
nkeynes@1245 | 70 | /**
|
nkeynes@1245 | 71 | * Add a message to the UI queue and wait for it to be handled.
|
nkeynes@1245 | 72 | * @return the result from the handler function.
|
nkeynes@1245 | 73 | */
|
nkeynes@1245 | 74 | int tqueue_send_message( tqueue_callback callback, void *data )
|
nkeynes@1245 | 75 | {
|
nkeynes@1245 | 76 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 77 | if( TQUEUE_FULL() ) {
|
nkeynes@1245 | 78 | /* Wait for the queue to clear */
|
nkeynes@1245 | 79 | pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
|
nkeynes@1245 | 80 | }
|
nkeynes@1245 | 81 | tqueue_enqueue( callback, data, TRUE );
|
nkeynes@1245 | 82 | pthread_cond_signal(&tqueue.consumer_wait);
|
nkeynes@1245 | 83 | pthread_cond_wait(&tqueue.producer_sync_wait, &tqueue.mutex);
|
nkeynes@1245 | 84 | return tqueue.last_result;
|
nkeynes@1245 | 85 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1245 | 86 | }
|
nkeynes@1245 | 87 |
|
nkeynes@1245 | 88 | /************** Consumer thread **************/
|
nkeynes@1245 | 89 |
|
nkeynes@1245 | 90 | /* Note: must be called with mutex locked */
|
nkeynes@1245 | 91 | static void tqueue_process_loop() {
|
nkeynes@1245 | 92 | while( !TQUEUE_EMPTY() ) {
|
nkeynes@1245 | 93 | gboolean wasFull = TQUEUE_FULL();
|
nkeynes@1245 | 94 | tqueue_callback callback = tqueue.tqueue[tqueue.head].callback;
|
nkeynes@1245 | 95 | void *data = tqueue.tqueue[tqueue.head].data;
|
nkeynes@1245 | 96 | gboolean sync = tqueue.tqueue[tqueue.head].synchronous;
|
nkeynes@1245 | 97 | tqueue.head++;
|
nkeynes@1245 | 98 |
|
nkeynes@1245 | 99 | if( wasFull ) {
|
nkeynes@1245 | 100 | pthread_cond_signal( &tqueue.producer_full_wait );
|
nkeynes@1245 | 101 | }
|
nkeynes@1245 | 102 |
|
nkeynes@1245 | 103 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1245 | 104 | int result = callback(data);
|
nkeynes@1245 | 105 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 106 | if( sync ) {
|
nkeynes@1245 | 107 | tqueue.last_result = result;
|
nkeynes@1245 | 108 | pthread_cond_signal( &tqueue.producer_sync_wait );
|
nkeynes@1245 | 109 | }
|
nkeynes@1245 | 110 | }
|
nkeynes@1245 | 111 | }
|
nkeynes@1245 | 112 |
|
nkeynes@1245 | 113 | /**
|
nkeynes@1245 | 114 | * Process all messages in the queue, if any.
|
nkeynes@1245 | 115 | */
|
nkeynes@1245 | 116 | void tqueue_process_all()
|
nkeynes@1245 | 117 | {
|
nkeynes@1245 | 118 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 119 | if( !TQUEUE_EMPTY() ) {
|
nkeynes@1245 | 120 | tqueue_process_loop();
|
nkeynes@1245 | 121 | }
|
nkeynes@1245 | 122 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1245 | 123 | }
|
nkeynes@1245 | 124 |
|
nkeynes@1245 | 125 | /**
|
nkeynes@1245 | 126 | * Process the first message in the queue. If no messages are on the
|
nkeynes@1245 | 127 | * queue, waits for the next one to be queued and then processes it.
|
nkeynes@1245 | 128 | */
|
nkeynes@1245 | 129 | void tqueue_process_wait()
|
nkeynes@1245 | 130 | {
|
nkeynes@1245 | 131 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 132 | if( TQUEUE_EMPTY() ) {
|
nkeynes@1245 | 133 | pthread_cond_wait( &tqueue.consumer_wait, &tqueue.mutex );
|
nkeynes@1245 | 134 | }
|
nkeynes@1245 | 135 | tqueue_process_loop();
|
nkeynes@1245 | 136 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1245 | 137 | }
|