[57cd6e]: src / c / threads / queue.d Maximize Restore History

Download this file

queue.d    377 lines (346 with data), 11.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
/* -*- mode: c; c-basic-offset: 8 -*- */
/*
queue.d -- waiting queue for threads.
*/
/*
Copyright (c) 2011, Juan Jose Garcia Ripoll.
ECL is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
See file '../Copyright' for full details.
*/
#ifdef HAVE_SCHED_H
#include <sched.h>
#endif
#include <signal.h>
#include <ecl/ecl.h>
#include <ecl/internal.h>
#include "threads/ecl_atomics.h"
void ECL_INLINE
ecl_process_yield()
{
#if defined(HAVE_SCHED_H)
sched_yield();
#elif defined(ECL_WINDOWS_THREADS)
Sleep(0);
#else
ecl_musleep(0.0, 1);*/
#endif
}
void ECL_INLINE
ecl_get_spinlock(cl_env_ptr the_env, cl_object *lock)
{
cl_object own_process = the_env->own_process;
while (!AO_compare_and_swap_full((AO_t*)lock, (AO_t)Cnil,
(AO_t)own_process)) {
ecl_process_yield();
}
}
void ECL_INLINE
ecl_giveup_spinlock(cl_object *lock)
{
AO_store((AO_t*)lock, (AO_t)Cnil);
}
static ECL_INLINE void
wait_queue_nconc(cl_env_ptr the_env, cl_object q, cl_object new_tail)
{
ecl_get_spinlock(the_env, &q->queue.spinlock);
q->queue.list = ecl_nconc(q->queue.list, new_tail);
ecl_giveup_spinlock(&q->queue.spinlock);
}
static ECL_INLINE cl_object
wait_queue_pop_all(cl_env_ptr the_env, cl_object q)
{
cl_object output;
ecl_disable_interrupts_env(the_env);
{
ecl_get_spinlock(the_env, &q->queue.spinlock);
output = q->queue.list;
q->queue.list = Cnil;
ecl_giveup_spinlock(&q->queue.spinlock);
}
ecl_enable_interrupts_env(the_env);
return output;
}
static ECL_INLINE void
wait_queue_delete(cl_env_ptr the_env, cl_object q, cl_object item)
{
ecl_get_spinlock(the_env, &q->queue.spinlock);
q->queue.list = ecl_delete_eq(item, q->queue.list);
ecl_giveup_spinlock(&q->queue.spinlock);
}
/*----------------------------------------------------------------------
* THREAD SCHEDULER & WAITING
*/
static cl_object
bignum_set_time(cl_object bignum, struct ecl_timeval *time)
{
_ecl_big_set_index(bignum, time->tv_sec);
_ecl_big_mul_ui(bignum, bignum, 1000);
_ecl_big_add_ui(bignum, bignum, (time->tv_usec + 999) / 1000);
return bignum;
}
static cl_object
elapsed_time(struct ecl_timeval *start)
{
cl_object delta_big = _ecl_big_register0();
cl_object aux_big = _ecl_big_register1();
struct ecl_timeval now;
ecl_get_internal_real_time(&now);
bignum_set_time(aux_big, start);
bignum_set_time(delta_big, &now);
_ecl_big_sub(delta_big, delta_big, aux_big);
_ecl_big_register_free(aux_big);
return delta_big;
}
static double
waiting_time(cl_index iteration, struct ecl_timeval *start)
{
/* Waiting time is smaller than 0.10 s */
double time;
cl_object top = MAKE_FIXNUM(10 * 1000);
cl_object delta_big = elapsed_time(start);
_ecl_big_div_ui(delta_big, delta_big, iteration);
if (ecl_number_compare(delta_big, top) < 0) {
time = ecl_to_double(delta_big) * 1.5;
} else {
time = 0.10;
}
_ecl_big_register_free(delta_big);
return time;
}
static cl_object
ecl_wait_on_timed(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), cl_object o)
{
volatile const cl_env_ptr the_env = env;
volatile cl_object own_process = the_env->own_process;
volatile cl_object record;
volatile cl_object output;
cl_fixnum iteration = 0;
struct ecl_timeval start;
ecl_get_internal_real_time(&start);
/* This spinlock is here because the default path (fair) is
* too slow */
for (iteration = 0; iteration < 10; iteration++) {
cl_object output = condition(the_env,o);
if (output != Cnil)
return output;
}
/* 0) We reserve a record for the queue. In order to avoid
* using the garbage collector, we reuse records */
record = own_process->process.queue_record;
unlikely_if (record == Cnil) {
record = ecl_list1(own_process);
} else {
own_process->process.queue_record = Cnil;
}
ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', Cnil);
CL_UNWIND_PROTECT_BEGIN(the_env) {
/* 2) Now we add ourselves to the queue. In order to
* avoid a call to the GC, we try to reuse records. */
print_lock("adding to queue", o);
wait_queue_nconc(the_env, o, record);
ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', Ct);
ecl_check_pending_interrupts(the_env);
/* 3) Unlike the sigsuspend() implementation, this
* implementation does not block signals and the
* wakeup event might be lost before the sleep
* function is invoked. We must thus spin over short
* intervals of time to ensure that we check the
* condition periodically. */
do {
ecl_musleep(waiting_time(iteration++, &start), 1);
} while (Null(output = condition(the_env, o)));
ecl_bds_unwind1(the_env);
} CL_UNWIND_PROTECT_EXIT {
/* 4) At this point we wrap up. We remove ourselves
* from the queue and unblock the lisp interrupt
* signal. Note that we recover the cons for later use.*/
cl_object firstone = o->queue.list;
wait_queue_delete(the_env, o, own_process);
own_process->process.waiting_for = Cnil;
own_process->process.queue_record = record;
ECL_RPLACD(record, Cnil);
/* 5) When this process exits, it may be because it
* aborts (which we know because output == Cnil), or
* because the condition is satisfied. In both cases
* we allow the first in the queue to test again its
* condition. This is needed for objects, such as
* semaphores, where the condition may be satisfied
* more than once. */
if (/*Null(output) &&*/ (firstone == record)) {
ecl_wakeup_waiters(the_env, o, ECL_WAKEUP_ONE);
}
} CL_UNWIND_PROTECT_END;
ecl_bds_unwind1(the_env);
return output;
}
/**********************************************************************
* BLOCKING WAIT QUEUE ALGORITHM
*
* This object keeps a list of processes waiting for a condition to
* happen. The queue is ordered and the only processes that check for
* the condition are
* - The first process to arrive to the queue,
* - Each process which is awoken.
* - The first process after the list of awoken processes.
*
* The idea is that this will ensure some fairness when unblocking the
* processes, which is important for abstractions such as mutexes or
* semaphores, where we want equal sharing of resources among processes.
*
* This also implies that the waiting processes depend on others to signal
* when to check for a condition. This happens in two situations
* - External code that changes the fields of the queue object
* must signal ecl_wakeup_waiters() (See mutex.d, semaphore.d, etc)
* - When a process exits ecl_wait_on() it always resignals the next
* process in the queue, because a condition may be satisfied more
* than once (for instance when a semaphore is changed, more than
* one process may be released)
*
* The critical part of this algorithm is the fact that processes
* communicating the change of conditions may do so before, during or
* after a process has been registered. Since we do not want those signals
* to be lost, a proper ordering of steps is required.
*/
cl_object
ecl_wait_on(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), cl_object o)
{
#if defined(HAVE_SIGPROCMASK)
volatile const cl_env_ptr the_env = env;
volatile cl_object own_process = the_env->own_process;
volatile cl_object record;
volatile sigset_t original;
volatile cl_object output;
/* 0) We reserve a record for the queue. In order to avoid
* using the garbage collector, we reuse records */
record = own_process->process.queue_record;
unlikely_if (record == Cnil) {
record = ecl_list1(own_process);
} else {
own_process->process.queue_record = Cnil;
}
/* 1) First we block lisp interrupt signals. This ensures that
* any awake signal that is issued from here is not lost. */
{
int code = ecl_option_values[ECL_OPT_THREAD_INTERRUPT_SIGNAL];
sigset_t empty;
sigemptyset(&empty);
sigaddset(&empty, code);
pthread_sigmask(SIG_BLOCK, &empty, &original);
}
/* 2) Now we add ourselves to the queue. */
wait_queue_nconc(the_env, o, record);
CL_UNWIND_PROTECT_BEGIN(the_env) {
/* 3) At this point we may receive signals, but we
* might have missed a wakeup event if that happened
* between 0) and 2), which is why we start with the
* check*/
if (o->queue.list != record ||
Null(output = condition(the_env, o)))
{
print_lock("suspending %p", o, o);
do {
/* This will wait until we get a signal that
* demands some code being executed. Note that
* this includes our communication signals and
* the signals used by the GC. Note also that
* as a consequence we might throw / return
* which is why need to protect it all with
* UNWIND-PROTECT. */
sigsuspend(&original);
} while (Null(output = condition(the_env, o)));
}
} CL_UNWIND_PROTECT_EXIT {
/* 4) At this point we wrap up. We remove ourselves
* from the queue and unblock the lisp interrupt
* signal. Note that we recover the cons for later use.*/
cl_object firstone = o->queue.list;
wait_queue_delete(the_env, o, own_process);
own_process->process.waiting_for = Cnil;
own_process->process.queue_record = record;
ECL_RPLACD(record, Cnil);
/* 5) When this process exits, it may be because it
* aborts (which we know because output == Cnil), or
* because the condition is satisfied. In both cases
* we allow the first in the queue to test again its
* condition. This is needed for objects, such as
* semaphores, where the condition may be satisfied
* more than once. */
if (/*Null(output) &&*/ (firstone == record)) {
ecl_wakeup_waiters(the_env, o, ECL_WAKEUP_ONE);
}
/* 6) Restoring signals is done last, to ensure that
* all cleanup steps are performed. */
pthread_sigmask(SIG_SETMASK, &original, NULL);
} CL_UNWIND_PROTECT_END;
return output;
#else
return ecl_wait_on_timed(env, condition, o);
#endif
}
void
ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags)
{
if (Null(q->queue.list))
return;
ecl_disable_interrupts_env(the_env);
ecl_get_spinlock(the_env, &q->queue.spinlock);
{
/* We scan the list of waiting processes, awaking one
* or more, depending on flags. In running through the list
* we eliminate zombie processes --- they should not be here
* because of the UNWIND-PROTECT in ecl_wait_on(), but
* sometimes shit happens */
cl_object *tail, l;
for (tail = &q->queue.list; (l = *tail) != Cnil; ) {
cl_object p = ECL_CONS_CAR(l);
if (p->process.phase == ECL_PROCESS_INACTIVE ||
p->process.phase == ECL_PROCESS_EXITING) {
print_lock("removing %p", q, p);
*tail = ECL_CONS_CDR(l);
} else {
/* If the process is active, we then
* simply awake it with a signal.*/
print_lock("awaking %p", q, p);
if (flags & ECL_WAKEUP_RESET_FLAG)
p->process.waiting_for = Cnil;
if (flags & ECL_WAKEUP_KILL)
mp_process_kill(p);
else
ecl_wakeup_process(p);
if (!(flags & ECL_WAKEUP_ALL))
break;
tail = &ECL_CONS_CDR(l);
}
}
}
ecl_giveup_spinlock(&q->queue.spinlock);
ecl_process_yield();
}
#undef print_lock
void
print_lock(char *prefix, cl_object l, ...)
{
static cl_object lock = Cnil;
va_list args;
va_start(args, l);
return;
if (l == Cnil || FIXNUMP(l->lock.name)) {
cl_env_ptr env = ecl_process_env();
ecl_get_spinlock(env, &lock);
printf("\n%ld\t", fix(env->own_process->process.name));
vprintf(prefix, args);
if (l != Cnil) {
cl_object p = l->lock.queue_list;
while (p != Cnil) {
printf(" %lx", fix(ECL_CONS_CAR(p)->process.name));
p = ECL_CONS_CDR(p);
}
}
fflush(stdout);
ecl_giveup_spinlock(&lock);
}
}
/*#define print_lock(a,b,c) (void)0*/