filename | src/tqueue.c |
changeset | 1273:32b2a340f8b3 |
prev | 1245:01e0020adf88 |
author | nkeynes |
date | Sat Aug 04 08:46:28 2012 +1000 (11 years ago) |
permissions | -rw-r--r-- |
last change | Handle corner case in pvr2_run_slice when we've previously slightly overrun the end of the time slice |
view | annotate | diff | log | raw |
1 /**
2 * $Id$
3 *
4 * Bounded, blocking queue for inter-thread communication.
5 *
6 * Copyright (c) 2012 Nathan Keynes.
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 */
19 #include <assert.h>
20 #include <pthread.h>
21 #include "tqueue.h"
23 #define TQUEUE_LENGTH 64
25 typedef struct {
26 tqueue_callback callback;
27 void *data;
28 gboolean synchronous;
29 } tqueue_entry;
31 struct {
32 pthread_mutex_t mutex;
33 pthread_cond_t consumer_wait;
34 pthread_cond_t producer_sync_wait;
35 pthread_cond_t producer_full_wait;
36 int head; /* next item returned by dequeue */
37 int tail; /* next item filled in by enqueue */
38 int last_result; /* Result value of last dequeued callback */
39 tqueue_entry tqueue[TQUEUE_LENGTH];
40 } tqueue = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, -1};
42 /************** Producer thread **************/
43 #define TQUEUE_EMPTY() (tqueue.head == tqueue.tail)
44 #define TQUEUE_FULL() ((tqueue.head == tqueue.tail+1) || (tqueue.head == 0 && tqueue.tail == (TQUEUE_LENGTH-1)))
46 static void tqueue_enqueue( tqueue_callback callback, void *data, gboolean sync )
47 {
48 assert( !TQUEUE_FULL() );
49 tqueue.tqueue[tqueue.tail].callback = callback;
50 tqueue.tqueue[tqueue.tail].data = data;
51 tqueue.tqueue[tqueue.tail].synchronous = sync;
52 tqueue.tail++;
53 if( tqueue.tail == TQUEUE_LENGTH )
54 tqueue.tail = 0;
55 }
57 /**
58 * Add a message to the UI queue and return immediately.
59 */
60 void tqueue_post_message( tqueue_callback callback, void *data )
61 {
62 pthread_mutex_lock(&tqueue.mutex);
63 if( TQUEUE_FULL() ) {
64 /* Wait for the queue to clear */
65 pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
66 }
67 tqueue_enqueue( callback, data, FALSE );
68 pthread_cond_signal(&tqueue.consumer_wait);
69 pthread_mutex_unlock(&tqueue.mutex);
70 }
72 /**
73 * Add a message to the UI queue and wait for it to be handled.
74 * @return the result from the handler function.
75 */
76 int tqueue_send_message( tqueue_callback callback, void *data )
77 {
78 int result;
79 pthread_mutex_lock(&tqueue.mutex);
80 if( TQUEUE_FULL() ) {
81 /* Wait for the queue to clear */
82 pthread_cond_wait(&tqueue.producer_full_wait, &tqueue.mutex);
83 }
84 tqueue_enqueue( callback, data, TRUE );
85 pthread_cond_signal(&tqueue.consumer_wait);
86 pthread_cond_wait(&tqueue.producer_sync_wait, &tqueue.mutex);
87 result = tqueue.last_result;
88 pthread_mutex_unlock(&tqueue.mutex);
89 return result;
90 }
92 /************** Consumer thread **************/
94 /* Note: must be called with mutex locked */
95 static void tqueue_process_loop() {
96 while( !TQUEUE_EMPTY() ) {
97 gboolean wasFull = TQUEUE_FULL();
98 tqueue_callback callback = tqueue.tqueue[tqueue.head].callback;
99 void *data = tqueue.tqueue[tqueue.head].data;
100 gboolean sync = tqueue.tqueue[tqueue.head].synchronous;
101 tqueue.head++;
102 if( tqueue.head == TQUEUE_LENGTH )
103 tqueue.head = 0;
105 if( wasFull ) {
106 pthread_cond_signal( &tqueue.producer_full_wait );
107 }
109 pthread_mutex_unlock(&tqueue.mutex);
110 int result = callback(data);
111 pthread_mutex_lock(&tqueue.mutex);
112 if( sync ) {
113 tqueue.last_result = result;
114 pthread_cond_signal( &tqueue.producer_sync_wait );
115 }
116 }
117 }
119 /**
120 * Process all messages in the queue, if any.
121 */
122 void tqueue_process_all()
123 {
124 pthread_mutex_lock(&tqueue.mutex);
125 if( !TQUEUE_EMPTY() ) {
126 tqueue_process_loop();
127 }
128 pthread_mutex_unlock(&tqueue.mutex);
129 }
131 /**
132 * Process the first message in the queue. If no messages are on the
133 * queue, waits for the next one to be queued and then processes it.
134 */
135 void tqueue_process_wait()
136 {
137 pthread_mutex_lock(&tqueue.mutex);
138 if( TQUEUE_EMPTY() ) {
139 pthread_cond_wait( &tqueue.consumer_wait, &tqueue.mutex );
140 }
141 tqueue_process_loop();
142 pthread_mutex_unlock(&tqueue.mutex);
143 }
.