nkeynes@1245 | 1 | /**
|
nkeynes@1245 | 2 | * $Id$
|
nkeynes@1245 | 3 | *
|
nkeynes@1245 | 4 | * Bounded, blocking queue for inter-thread communication.
|
nkeynes@1245 | 5 | *
|
nkeynes@1245 | 6 | * Copyright (c) 2012 Nathan Keynes.
|
nkeynes@1245 | 7 | *
|
nkeynes@1245 | 8 | * This program is free software; you can redistribute it and/or modify
|
nkeynes@1245 | 9 | * it under the terms of the GNU General Public License as published by
|
nkeynes@1245 | 10 | * the Free Software Foundation; either version 2 of the License, or
|
nkeynes@1245 | 11 | * (at your option) any later version.
|
nkeynes@1245 | 12 | *
|
nkeynes@1245 | 13 | * This program is distributed in the hope that it will be useful,
|
nkeynes@1245 | 14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of
|
nkeynes@1245 | 15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
nkeynes@1245 | 16 | * GNU General Public License for more details.
|
nkeynes@1245 | 17 | */
|
nkeynes@1245 | 18 |
|
nkeynes@1245 | 19 | #include <assert.h>
|
nkeynes@1245 | 20 | #include <pthread.h>
|
nkeynes@1245 | 21 | #include "tqueue.h"
|
nkeynes@1245 | 22 |
|
nkeynes@1245 | 23 | #define TQUEUE_LENGTH 64
|
nkeynes@1245 | 24 |
|
nkeynes@1245 | 25 | typedef struct {
|
nkeynes@1245 | 26 | tqueue_callback callback;
|
nkeynes@1245 | 27 | void *data;
|
nkeynes@1245 | 28 | gboolean synchronous;
|
nkeynes@1245 | 29 | } tqueue_entry;
|
nkeynes@1245 | 30 |
|
nkeynes@1245 | 31 | struct {
|
nkeynes@1245 | 32 | pthread_mutex_t mutex;
|
nkeynes@1245 | 33 | pthread_cond_t consumer_wait;
|
nkeynes@1245 | 34 | pthread_cond_t producer_sync_wait;
|
nkeynes@1245 | 35 | pthread_cond_t producer_full_wait;
|
nkeynes@1245 | 36 | int head; /* next item returned by dequeue */
|
nkeynes@1245 | 37 | int tail; /* next item filled in by enqueue */
|
nkeynes@1245 | 38 | int last_result; /* Result value of last dequeued callback */
|
nkeynes@1245 | 39 | tqueue_entry tqueue[TQUEUE_LENGTH];
|
nkeynes@1245 | 40 | } tqueue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, -1};
|
nkeynes@1245 | 41 |
|
nkeynes@1245 | 42 | /************** Producer thread **************/
|
nkeynes@1245 | 43 | #define TQUEUE_EMPTY() (tqueue.head == tqueue.tail)
|
nkeynes@1273 | 44 | #define TQUEUE_FULL() ((tqueue.head == tqueue.tail+1) || (tqueue.head == 0 && tqueue.tail == (TQUEUE_LENGTH-1)))
|
nkeynes@1245 | 45 |
|
nkeynes@1245 | 46 | static void tqueue_enqueue( tqueue_callback callback, void *data, gboolean sync )
|
nkeynes@1245 | 47 | {
|
nkeynes@1245 | 48 | assert( !TQUEUE_FULL() );
|
nkeynes@1245 | 49 | tqueue.tqueue[tqueue.tail].callback = callback;
|
nkeynes@1245 | 50 | tqueue.tqueue[tqueue.tail].data = data;
|
nkeynes@1245 | 51 | tqueue.tqueue[tqueue.tail].synchronous = sync;
|
nkeynes@1245 | 52 | tqueue.tail++;
|
nkeynes@1273 | 53 | if( tqueue.tail == TQUEUE_LENGTH )
|
nkeynes@1273 | 54 | tqueue.tail = 0;
|
nkeynes@1245 | 55 | }
|
nkeynes@1245 | 56 |
|
nkeynes@1245 | 57 | /**
|
nkeynes@1245 | 58 | * Add a message to the UI queue and return immediately.
|
nkeynes@1245 | 59 | */
|
nkeynes@1245 | 60 | void tqueue_post_message( tqueue_callback callback, void *data )
|
nkeynes@1245 | 61 | {
|
nkeynes@1245 | 62 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 63 | if( TQUEUE_FULL() ) {
|
nkeynes@1245 | 64 | /* Wait for the queue to clear */
|
nkeynes@1245 | 65 | pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
|
nkeynes@1245 | 66 | }
|
nkeynes@1245 | 67 | tqueue_enqueue( callback, data, FALSE );
|
nkeynes@1245 | 68 | pthread_cond_signal(&tqueue.consumer_wait);
|
nkeynes@1245 | 69 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1245 | 70 | }
|
nkeynes@1245 | 71 |
|
nkeynes@1245 | 72 | /**
|
nkeynes@1245 | 73 | * Add a message to the UI queue and wait for it to be handled.
|
nkeynes@1245 | 74 | * @return the result from the handler function.
|
nkeynes@1245 | 75 | */
|
nkeynes@1245 | 76 | int tqueue_send_message( tqueue_callback callback, void *data )
|
nkeynes@1245 | 77 | {
|
nkeynes@1273 | 78 | int result;
|
nkeynes@1245 | 79 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 80 | if( TQUEUE_FULL() ) {
|
nkeynes@1245 | 81 | /* Wait for the queue to clear */
|
nkeynes@1245 | 82 | pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
|
nkeynes@1245 | 83 | }
|
nkeynes@1245 | 84 | tqueue_enqueue( callback, data, TRUE );
|
nkeynes@1245 | 85 | pthread_cond_signal(&tqueue.consumer_wait);
|
nkeynes@1245 | 86 | pthread_cond_wait(&tqueue.producer_sync_wait, &tqueue.mutex);
|
nkeynes@1273 | 87 | result = tqueue.last_result;
|
nkeynes@1245 | 88 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1273 | 89 | return result;
|
nkeynes@1245 | 90 | }
|
nkeynes@1245 | 91 |
|
nkeynes@1245 | 92 | /************** Consumer thread **************/
|
nkeynes@1245 | 93 |
|
nkeynes@1245 | 94 | /* Note: must be called with mutex locked */
|
nkeynes@1245 | 95 | static void tqueue_process_loop() {
|
nkeynes@1245 | 96 | while( !TQUEUE_EMPTY() ) {
|
nkeynes@1245 | 97 | gboolean wasFull = TQUEUE_FULL();
|
nkeynes@1245 | 98 | tqueue_callback callback = tqueue.tqueue[tqueue.head].callback;
|
nkeynes@1245 | 99 | void *data = tqueue.tqueue[tqueue.head].data;
|
nkeynes@1245 | 100 | gboolean sync = tqueue.tqueue[tqueue.head].synchronous;
|
nkeynes@1245 | 101 | tqueue.head++;
|
nkeynes@1273 | 102 | if( tqueue.head == TQUEUE_LENGTH )
|
nkeynes@1273 | 103 | tqueue.head = 0;
|
nkeynes@1245 | 104 |
|
nkeynes@1245 | 105 | if( wasFull ) {
|
nkeynes@1245 | 106 | pthread_cond_signal( &tqueue.producer_full_wait );
|
nkeynes@1245 | 107 | }
|
nkeynes@1245 | 108 |
|
nkeynes@1245 | 109 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1245 | 110 | int result = callback(data);
|
nkeynes@1245 | 111 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 112 | if( sync ) {
|
nkeynes@1245 | 113 | tqueue.last_result = result;
|
nkeynes@1245 | 114 | pthread_cond_signal( &tqueue.producer_sync_wait );
|
nkeynes@1245 | 115 | }
|
nkeynes@1245 | 116 | }
|
nkeynes@1245 | 117 | }
|
nkeynes@1245 | 118 |
|
nkeynes@1245 | 119 | /**
|
nkeynes@1245 | 120 | * Process all messages in the queue, if any.
|
nkeynes@1245 | 121 | */
|
nkeynes@1245 | 122 | void tqueue_process_all()
|
nkeynes@1245 | 123 | {
|
nkeynes@1245 | 124 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 125 | if( !TQUEUE_EMPTY() ) {
|
nkeynes@1245 | 126 | tqueue_process_loop();
|
nkeynes@1245 | 127 | }
|
nkeynes@1245 | 128 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1245 | 129 | }
|
nkeynes@1245 | 130 |
|
nkeynes@1245 | 131 | /**
|
nkeynes@1245 | 132 | * Process the first message in the queue. If no messages are on the
|
nkeynes@1245 | 133 | * queue, waits for the next one to be queued and then processes it.
|
nkeynes@1245 | 134 | */
|
nkeynes@1245 | 135 | void tqueue_process_wait()
|
nkeynes@1245 | 136 | {
|
nkeynes@1245 | 137 | pthread_mutex_lock(&tqueue.mutex);
|
nkeynes@1245 | 138 | if( TQUEUE_EMPTY() ) {
|
nkeynes@1245 | 139 | pthread_cond_wait( &tqueue.consumer_wait, &tqueue.mutex );
|
nkeynes@1245 | 140 | }
|
nkeynes@1245 | 141 | tqueue_process_loop();
|
nkeynes@1245 | 142 | pthread_mutex_unlock(&tqueue.mutex);
|
nkeynes@1245 | 143 | }
|