Search
lxdream.org :: lxdream/src/tqueue.c
lxdream 0.9.1
released Jun 29
Download Now
filename src/tqueue.c
changeset 1273:32b2a340f8b3
prev1245:01e0020adf88
author nkeynes
date Fri May 29 18:47:05 2015 +1000 (6 years ago)
permissions -rw-r--r--
last change Fix test case
view annotate diff log raw
     1 /**
     2  * $Id$
     3  *
     4  * Bounded, blocking queue for inter-thread communication.
     5  *
     6  * Copyright (c) 2012 Nathan Keynes.
     7  *
     8  * This program is free software; you can redistribute it and/or modify
     9  * it under the terms of the GNU General Public License as published by
    10  * the Free Software Foundation; either version 2 of the License, or
    11  * (at your option) any later version.
    12  *
    13  * This program is distributed in the hope that it will be useful,
    14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
    15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    16  * GNU General Public License for more details.
    17  */
    19 #include <assert.h>
    20 #include <pthread.h>
    21 #include "tqueue.h"
    23 #define TQUEUE_LENGTH 64
    25 typedef struct {
    26     tqueue_callback callback;
    27     void *data;
    28     gboolean synchronous;
    29 } tqueue_entry;
    31 struct {
    32     pthread_mutex_t mutex;
    33     pthread_cond_t consumer_wait;
    34     pthread_cond_t producer_sync_wait;
    35     pthread_cond_t producer_full_wait;
    36     int head;  /* next item returned by dequeue */
    37     int tail;  /* next item filled in by enqueue */
    38     int last_result; /* Result value of last dequeued callback */
    39     tqueue_entry tqueue[TQUEUE_LENGTH];
    40 } tqueue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, -1};
    42 /************** Producer thread **************/
    43 #define TQUEUE_EMPTY() (tqueue.head == tqueue.tail)
    44 #define TQUEUE_FULL() ((tqueue.head == tqueue.tail+1) || (tqueue.head == 0 && tqueue.tail == (TQUEUE_LENGTH-1)))
    46 static void tqueue_enqueue( tqueue_callback callback, void *data, gboolean sync )
    47 {
    48     assert( !TQUEUE_FULL() );
    49     tqueue.tqueue[tqueue.tail].callback = callback;
    50     tqueue.tqueue[tqueue.tail].data = data;
    51     tqueue.tqueue[tqueue.tail].synchronous = sync;
    52     tqueue.tail++;
    53     if( tqueue.tail == TQUEUE_LENGTH )
    54         tqueue.tail = 0;
    55 }
    57 /**
    58  * Add a message to the UI queue and return immediately.
    59  */
    60 void tqueue_post_message( tqueue_callback callback, void *data )
    61 {
    62     pthread_mutex_lock(&tqueue.mutex);
    63     if( TQUEUE_FULL() ) {
    64         /* Wait for the queue to clear */
    65         pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
    66     }
    67     tqueue_enqueue( callback, data, FALSE );
    68     pthread_cond_signal(&tqueue.consumer_wait);
    69     pthread_mutex_unlock(&tqueue.mutex);
    70 }
    72 /**
    73  * Add a message to the UI queue and wait for it to be handled.
    74  * @return the result from the handler function.
    75  */
    76 int tqueue_send_message( tqueue_callback callback, void *data )
    77 {
    78     int result;
    79     pthread_mutex_lock(&tqueue.mutex);
    80     if( TQUEUE_FULL() ) {
    81         /* Wait for the queue to clear */
    82         pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
    83     }
    84     tqueue_enqueue( callback, data, TRUE );
    85     pthread_cond_signal(&tqueue.consumer_wait);
    86     pthread_cond_wait(&tqueue.producer_sync_wait, &tqueue.mutex);
    87     result = tqueue.last_result;
    88     pthread_mutex_unlock(&tqueue.mutex);
    89     return result;
    90 }
    92 /************** Consumer thread **************/
    94 /* Note: must be called with mutex locked */
    95 static void tqueue_process_loop() {
    96     while( !TQUEUE_EMPTY() ) {
    97         gboolean wasFull = TQUEUE_FULL();
    98         tqueue_callback callback = tqueue.tqueue[tqueue.head].callback;
    99         void *data = tqueue.tqueue[tqueue.head].data;
   100         gboolean sync = tqueue.tqueue[tqueue.head].synchronous;
   101         tqueue.head++;
   102         if( tqueue.head == TQUEUE_LENGTH )
   103             tqueue.head = 0;
   105         if( wasFull ) {
   106             pthread_cond_signal( &tqueue.producer_full_wait );
   107         }
   109         pthread_mutex_unlock(&tqueue.mutex);
   110         int result = callback(data);
   111         pthread_mutex_lock(&tqueue.mutex);
   112         if( sync ) {
   113             tqueue.last_result = result;
   114             pthread_cond_signal( &tqueue.producer_sync_wait );
   115         }
   116     }
   117 }
   119 /**
   120  * Process all messages in the queue, if any.
   121  */
   122 void tqueue_process_all()
   123 {
   124     pthread_mutex_lock(&tqueue.mutex);
   125     if( !TQUEUE_EMPTY() ) {
   126         tqueue_process_loop();
   127     }
   128     pthread_mutex_unlock(&tqueue.mutex);
   129 }
   131 /**
   132  * Process the first message in the queue. If no messages are on the
   133  * queue, waits for the next one to be queued and then processes it.
   134  */
   135 void tqueue_process_wait()
   136 {
   137     pthread_mutex_lock(&tqueue.mutex);
   138     if( TQUEUE_EMPTY() ) {
   139         pthread_cond_wait( &tqueue.consumer_wait, &tqueue.mutex );
   140     }
   141     tqueue_process_loop();
   142     pthread_mutex_unlock(&tqueue.mutex);
   143 }
.