Search
lxdream.org :: lxdream/src/tqueue.c
lxdream 0.9.1
released Jun 29
Download Now
filename src/tqueue.c
changeset 1245:01e0020adf88
next1273:32b2a340f8b3
author nkeynes
date Sun Mar 04 21:10:12 2012 +1000 (12 years ago)
permissions -rw-r--r--
last change Move glsl loading into common gl code, and set a display capability flag
view annotate diff log raw
     1 /**
     2  * $Id$
     3  *
     4  * Bounded, blocking queue for inter-thread communication.
     5  *
     6  * Copyright (c) 2012 Nathan Keynes.
     7  *
     8  * This program is free software; you can redistribute it and/or modify
     9  * it under the terms of the GNU General Public License as published by
    10  * the Free Software Foundation; either version 2 of the License, or
    11  * (at your option) any later version.
    12  *
    13  * This program is distributed in the hope that it will be useful,
    14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
    15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    16  * GNU General Public License for more details.
    17  */
    19 #include <assert.h>
    20 #include <pthread.h>
    21 #include "tqueue.h"
    23 #define TQUEUE_LENGTH 64
    25 typedef struct {
    26     tqueue_callback callback;
    27     void *data;
    28     gboolean synchronous;
    29 } tqueue_entry;
    31 struct {
    32     pthread_mutex_t mutex;
    33     pthread_cond_t consumer_wait;
    34     pthread_cond_t producer_sync_wait;
    35     pthread_cond_t producer_full_wait;
    36     int head;  /* next item returned by dequeue */
    37     int tail;  /* next item filled in by enqueue */
    38     int last_result; /* Result value of last dequeued callback */
    39     tqueue_entry tqueue[TQUEUE_LENGTH];
    40 } tqueue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, -1};
    42 /************** Producer thread **************/
    43 #define TQUEUE_EMPTY() (tqueue.head == tqueue.tail)
    44 #define TQUEUE_FULL() ((tqueue.head == tqueue.tail+1) || (tqueue.head == 0 && tqueue.tail == TQUEUE_LENGTH))
    46 static void tqueue_enqueue( tqueue_callback callback, void *data, gboolean sync )
    47 {
    48     assert( !TQUEUE_FULL() );
    49     tqueue.tqueue[tqueue.tail].callback = callback;
    50     tqueue.tqueue[tqueue.tail].data = data;
    51     tqueue.tqueue[tqueue.tail].synchronous = sync;
    52     tqueue.tail++;
    53 }
    55 /**
    56  * Add a message to the UI queue and return immediately.
    57  */
    58 void tqueue_post_message( tqueue_callback callback, void *data )
    59 {
    60     pthread_mutex_lock(&tqueue.mutex);
    61     if( TQUEUE_FULL() ) {
    62         /* Wait for the queue to clear */
    63         pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
    64     }
    65     tqueue_enqueue( callback, data, FALSE );
    66     pthread_cond_signal(&tqueue.consumer_wait);
    67     pthread_mutex_unlock(&tqueue.mutex);
    68 }
    70 /**
    71  * Add a message to the UI queue and wait for it to be handled.
    72  * @return the result from the handler function.
    73  */
    74 int tqueue_send_message( tqueue_callback callback, void *data )
    75 {
    76     pthread_mutex_lock(&tqueue.mutex);
    77     if( TQUEUE_FULL() ) {
    78         /* Wait for the queue to clear */
    79         pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
    80     }
    81     tqueue_enqueue( callback, data, TRUE );
    82     pthread_cond_signal(&tqueue.consumer_wait);
    83     pthread_cond_wait(&tqueue.producer_sync_wait, &tqueue.mutex);
    84     return tqueue.last_result;
    85     pthread_mutex_unlock(&tqueue.mutex);
    86 }
    88 /************** Consumer thread **************/
    90 /* Note: must be called with mutex locked */
    91 static void tqueue_process_loop() {
    92     while( !TQUEUE_EMPTY() ) {
    93         gboolean wasFull = TQUEUE_FULL();
    94         tqueue_callback callback = tqueue.tqueue[tqueue.head].callback;
    95         void *data = tqueue.tqueue[tqueue.head].data;
    96         gboolean sync = tqueue.tqueue[tqueue.head].synchronous;
    97         tqueue.head++;
    99         if( wasFull ) {
   100             pthread_cond_signal( &tqueue.producer_full_wait );
   101         }
   103         pthread_mutex_unlock(&tqueue.mutex);
   104         int result = callback(data);
   105         pthread_mutex_lock(&tqueue.mutex);
   106         if( sync ) {
   107             tqueue.last_result = result;
   108             pthread_cond_signal( &tqueue.producer_sync_wait );
   109         }
   110     }
   111 }
   113 /**
   114  * Process all messages in the queue, if any.
   115  */
   116 void tqueue_process_all()
   117 {
   118     pthread_mutex_lock(&tqueue.mutex);
   119     if( !TQUEUE_EMPTY() ) {
   120         tqueue_process_loop();
   121     }
   122     pthread_mutex_unlock(&tqueue.mutex);
   123 }
   125 /**
   126  * Process the first message in the queue. If no messages are on the
   127  * queue, waits for the next one to be queued and then processes it.
   128  */
   129 void tqueue_process_wait()
   130 {
   131     pthread_mutex_lock(&tqueue.mutex);
   132     if( TQUEUE_EMPTY() ) {
   133         pthread_cond_wait( &tqueue.consumer_wait, &tqueue.mutex );
   134     }
   135     tqueue_process_loop();
   136     pthread_mutex_unlock(&tqueue.mutex);
   137 }
.