Search
lxdream.org :: lxdream/src/tqueue.c
lxdream 0.9.1
released Jun 29
Download Now
filename src/tqueue.c
changeset 1273:32b2a340f8b3
prev1245:01e0020adf88
author nkeynes
date Fri May 29 18:47:05 2015 +1000 (6 years ago)
permissions -rw-r--r--
last change Fix test case
file annotate diff log raw
nkeynes@1245
     1
/**
nkeynes@1245
     2
 * $Id$
nkeynes@1245
     3
 *
nkeynes@1245
     4
 * Bounded, blocking queue for inter-thread communication.
nkeynes@1245
     5
 *
nkeynes@1245
     6
 * Copyright (c) 2012 Nathan Keynes.
nkeynes@1245
     7
 *
nkeynes@1245
     8
 * This program is free software; you can redistribute it and/or modify
nkeynes@1245
     9
 * it under the terms of the GNU General Public License as published by
nkeynes@1245
    10
 * the Free Software Foundation; either version 2 of the License, or
nkeynes@1245
    11
 * (at your option) any later version.
nkeynes@1245
    12
 *
nkeynes@1245
    13
 * This program is distributed in the hope that it will be useful,
nkeynes@1245
    14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
nkeynes@1245
    15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
nkeynes@1245
    16
 * GNU General Public License for more details.
nkeynes@1245
    17
 */
nkeynes@1245
    18
nkeynes@1245
    19
#include <assert.h>
nkeynes@1245
    20
#include <pthread.h>
nkeynes@1245
    21
#include "tqueue.h"
nkeynes@1245
    22
nkeynes@1245
    23
#define TQUEUE_LENGTH 64
nkeynes@1245
    24
nkeynes@1245
    25
typedef struct {
nkeynes@1245
    26
    tqueue_callback callback;
nkeynes@1245
    27
    void *data;
nkeynes@1245
    28
    gboolean synchronous;
nkeynes@1245
    29
} tqueue_entry;
nkeynes@1245
    30
nkeynes@1245
    31
struct {
nkeynes@1245
    32
    pthread_mutex_t mutex;
nkeynes@1245
    33
    pthread_cond_t consumer_wait;
nkeynes@1245
    34
    pthread_cond_t producer_sync_wait;
nkeynes@1245
    35
    pthread_cond_t producer_full_wait;
nkeynes@1245
    36
    int head;  /* next item returned by dequeue */
nkeynes@1245
    37
    int tail;  /* next item filled in by enqueue */
nkeynes@1245
    38
    int last_result; /* Result value of last dequeued callback */
nkeynes@1245
    39
    tqueue_entry tqueue[TQUEUE_LENGTH];
nkeynes@1245
    40
} tqueue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, -1};
nkeynes@1245
    41
nkeynes@1245
    42
/************** Producer thread **************/
nkeynes@1245
    43
#define TQUEUE_EMPTY() (tqueue.head == tqueue.tail)
nkeynes@1273
    44
#define TQUEUE_FULL() ((tqueue.head == tqueue.tail+1) || (tqueue.head == 0 && tqueue.tail == (TQUEUE_LENGTH-1)))
nkeynes@1245
    45
nkeynes@1245
    46
static void tqueue_enqueue( tqueue_callback callback, void *data, gboolean sync )
nkeynes@1245
    47
{
nkeynes@1245
    48
    assert( !TQUEUE_FULL() );
nkeynes@1245
    49
    tqueue.tqueue[tqueue.tail].callback = callback;
nkeynes@1245
    50
    tqueue.tqueue[tqueue.tail].data = data;
nkeynes@1245
    51
    tqueue.tqueue[tqueue.tail].synchronous = sync;
nkeynes@1245
    52
    tqueue.tail++;
nkeynes@1273
    53
    if( tqueue.tail == TQUEUE_LENGTH )
nkeynes@1273
    54
        tqueue.tail = 0;
nkeynes@1245
    55
}
nkeynes@1245
    56
nkeynes@1245
    57
/**
nkeynes@1245
    58
 * Add a message to the UI queue and return immediately.
nkeynes@1245
    59
 */
nkeynes@1245
    60
void tqueue_post_message( tqueue_callback callback, void *data )
nkeynes@1245
    61
{
nkeynes@1245
    62
    pthread_mutex_lock(&tqueue.mutex);
nkeynes@1245
    63
    if( TQUEUE_FULL() ) {
nkeynes@1245
    64
        /* Wait for the queue to clear */
nkeynes@1245
    65
        pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
nkeynes@1245
    66
    }
nkeynes@1245
    67
    tqueue_enqueue( callback, data, FALSE );
nkeynes@1245
    68
    pthread_cond_signal(&tqueue.consumer_wait);
nkeynes@1245
    69
    pthread_mutex_unlock(&tqueue.mutex);
nkeynes@1245
    70
}
nkeynes@1245
    71
nkeynes@1245
    72
/**
nkeynes@1245
    73
 * Add a message to the UI queue and wait for it to be handled.
nkeynes@1245
    74
 * @return the result from the handler function.
nkeynes@1245
    75
 */
nkeynes@1245
    76
int tqueue_send_message( tqueue_callback callback, void *data )
nkeynes@1245
    77
{
nkeynes@1273
    78
    int result;
nkeynes@1245
    79
    pthread_mutex_lock(&tqueue.mutex);
nkeynes@1245
    80
    if( TQUEUE_FULL() ) {
nkeynes@1245
    81
        /* Wait for the queue to clear */
nkeynes@1245
    82
        pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
nkeynes@1245
    83
    }
nkeynes@1245
    84
    tqueue_enqueue( callback, data, TRUE );
nkeynes@1245
    85
    pthread_cond_signal(&tqueue.consumer_wait);
nkeynes@1245
    86
    pthread_cond_wait(&tqueue.producer_sync_wait, &tqueue.mutex);
nkeynes@1273
    87
    result = tqueue.last_result;
nkeynes@1245
    88
    pthread_mutex_unlock(&tqueue.mutex);
nkeynes@1273
    89
    return result;
nkeynes@1245
    90
}
nkeynes@1245
    91
nkeynes@1245
    92
/************** Consumer thread **************/
nkeynes@1245
    93
nkeynes@1245
    94
/* Note: must be called with mutex locked */
nkeynes@1245
    95
static void tqueue_process_loop() {
nkeynes@1245
    96
    while( !TQUEUE_EMPTY() ) {
nkeynes@1245
    97
        gboolean wasFull = TQUEUE_FULL();
nkeynes@1245
    98
        tqueue_callback callback = tqueue.tqueue[tqueue.head].callback;
nkeynes@1245
    99
        void *data = tqueue.tqueue[tqueue.head].data;
nkeynes@1245
   100
        gboolean sync = tqueue.tqueue[tqueue.head].synchronous;
nkeynes@1245
   101
        tqueue.head++;
nkeynes@1273
   102
        if( tqueue.head == TQUEUE_LENGTH )
nkeynes@1273
   103
            tqueue.head = 0;
nkeynes@1245
   104
nkeynes@1245
   105
        if( wasFull ) {
nkeynes@1245
   106
            pthread_cond_signal( &tqueue.producer_full_wait );
nkeynes@1245
   107
        }
nkeynes@1245
   108
nkeynes@1245
   109
        pthread_mutex_unlock(&tqueue.mutex);
nkeynes@1245
   110
        int result = callback(data);
nkeynes@1245
   111
        pthread_mutex_lock(&tqueue.mutex);
nkeynes@1245
   112
        if( sync ) {
nkeynes@1245
   113
            tqueue.last_result = result;
nkeynes@1245
   114
            pthread_cond_signal( &tqueue.producer_sync_wait );
nkeynes@1245
   115
        }
nkeynes@1245
   116
    }
nkeynes@1245
   117
}
nkeynes@1245
   118
nkeynes@1245
   119
/**
nkeynes@1245
   120
 * Process all messages in the queue, if any.
nkeynes@1245
   121
 */
nkeynes@1245
   122
void tqueue_process_all()
nkeynes@1245
   123
{
nkeynes@1245
   124
    pthread_mutex_lock(&tqueue.mutex);
nkeynes@1245
   125
    if( !TQUEUE_EMPTY() ) {
nkeynes@1245
   126
        tqueue_process_loop();
nkeynes@1245
   127
    }
nkeynes@1245
   128
    pthread_mutex_unlock(&tqueue.mutex);
nkeynes@1245
   129
}
nkeynes@1245
   130
nkeynes@1245
   131
/**
nkeynes@1245
   132
 * Process the first message in the queue. If no messages are on the
nkeynes@1245
   133
 * queue, waits for the next one to be queued and then processes it.
nkeynes@1245
   134
 */
nkeynes@1245
   135
void tqueue_process_wait()
nkeynes@1245
   136
{
nkeynes@1245
   137
    pthread_mutex_lock(&tqueue.mutex);
nkeynes@1245
   138
    if( TQUEUE_EMPTY() ) {
nkeynes@1245
   139
        pthread_cond_wait( &tqueue.consumer_wait, &tqueue.mutex );
nkeynes@1245
   140
    }
nkeynes@1245
   141
    tqueue_process_loop();
nkeynes@1245
   142
    pthread_mutex_unlock(&tqueue.mutex);
nkeynes@1245
   143
}
.