なかだです。

At Wed, 23 May 2007 17:02:15 +0900,
Nobuyoshi Nakada wrote in [ruby-dev:30809]:
> > 次のスクリプトのようなQueueを待ってるスレッドがある状態で
> > メインスレッドが終了するケースで
> >  [BUG] queue 0x424010 freed with live thread(s) waiting
> > となりました。
> 
> r12068でwait_condvar()が常にmutexを再ロックするようになったせい
> のようです。

[ruby-dev:30809]でも、ロックしたままforkすると[BUG]になります。


Index: eval.c =================================================================== --- eval.c (revision 12390) +++ eval.c (working copy) @@ -11214,8 +11214,17 @@ rb_thread_wakeup(thread) VALUE thread; { + if (!RTEST(rb_thread_wakeup_alive(thread))) + rb_raise(rb_eThreadError, "killed thread"); + return thread; +} + +VALUE +rb_thread_wakeup_alive(thread) + VALUE thread; +{ rb_thread_t th = rb_thread_check(thread); if (th->status == THREAD_KILLED) - rb_raise(rb_eThreadError, "killed thread"); + return Qnil; rb_thread_ready(th); @@ -11292,5 +11301,5 @@ rb_thread_kill(thread) kill_thread(th, 0); - return thread; + return thread; } @@ -11645,4 +11654,13 @@ rb_thread_abort_exc_set(thread, val) +enum rb_thread_status +rb_thread_status(thread) + VALUE thread; +{ + rb_thread_t th = rb_thread_check(thread); + return th->status; +} + + /* * call-seq: @@ -12121,5 +12139,5 @@ rb_thread_value(thread) static VALUE -rb_thread_status(thread) +rb_thread_status_name(thread) VALUE thread; { @@ -12148,5 +12166,5 @@ rb_thread_status(thread) */ -static VALUE +VALUE rb_thread_alive_p(thread) VALUE thread; @@ -12982,5 +13000,5 @@ Init_Thread() rb_define_method(rb_cThread, "exit!", rb_thread_kill_bang, 0); rb_define_method(rb_cThread, "value", rb_thread_value, 0); - rb_define_method(rb_cThread, "status", rb_thread_status, 0); + rb_define_method(rb_cThread, "status", rb_thread_status_name, 0); rb_define_method(rb_cThread, "join", rb_thread_join_m, -1); rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0); Index: intern.h =================================================================== --- intern.h (revision 12390) +++ intern.h (working copy) @@ -205,8 +205,11 @@ void rb_thread_polling _((void)); void rb_thread_sleep _((int)); void rb_thread_sleep_forever _((void)); +enum rb_thread_status rb_thread_status _((VALUE)); VALUE rb_thread_stop _((void)); VALUE rb_thread_wakeup _((VALUE)); +VALUE rb_thread_wakeup_alive _((VALUE)); VALUE rb_thread_run _((VALUE)); VALUE rb_thread_kill _((VALUE)); +VALUE rb_thread_alive_p _((VALUE)); VALUE rb_thread_create _((VALUE (*)(ANYARGS), void*)); void rb_thread_interrupt _((void)); Index: ext/thread/thread.c =================================================================== --- ext/thread/thread.c (revision 12390) +++ ext/thread/thread.c (working copy) @@ -13,4 +13,5 @@ #include <intern.h> #include <rubysig.h> +#include <node.h> static VALUE rb_cMutex; @@ -208,6 +211,5 @@ static VALUE wake_thread(VALUE thread) { - return rb_rescue2(rb_thread_wakeup, thread, - NULL, Qundef, rb_eThreadError, 0); + return rb_thread_wakeup_alive(thread); } @@ -215,6 +217,8 @@ static VALUE run_thread(VALUE thread) { - return rb_rescue2(rb_thread_run, thread, - NULL, Qundef, rb_eThreadError, 0); + thread = wake_thread(thread); + if (RTEST(thread) && !rb_thread_critical) + rb_thread_schedule(); + return thread; } @@ -226,5 +230,7 @@ wake_one(List *list) waking = Qnil; while (list->entries && !RTEST(waking)) { - waking = wake_thread(shift_list(list)); + waking = shift_list(list); + if (waking == Qundef) break; + waking = wake_thread(waking); } @@ -267,8 +273,15 @@ assert_no_survivors(List *waiting, const { Entry *entry; + VALUE ths = 0; + for (entry = waiting->entries; entry; entry = entry->next) { - if (RTEST(wake_thread(entry->value))) { - rb_bug("%s %p freed with live thread(s) waiting", label, addr); - } + if (RTEST(wake_thread(entry->value))) { + if (!ths) ths = rb_ary_new(); + rb_ary_push(ths, entry->value); + } + } + if (ths) { + rb_bug("%s %p freed with live thread(s) %s waiting", + label, addr, RSTRING_PTR(rb_inspect(ths))); } } @@ -304,4 +317,6 @@ typedef struct _Mutex { } Mutex; +#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner)) + static void mark_mutex(Mutex *mutex) @@ -362,5 +377,5 @@ rb_mutex_locked_p(VALUE self) Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); - return RTEST(mutex->owner) ? Qtrue : Qfalse; + return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse; } @@ -381,5 +396,5 @@ rb_mutex_try_lock(VALUE self) Data_Get_Struct(self, Mutex, mutex); - if (RTEST(mutex->owner)) + if (MUTEX_LOCKED_P(mutex)) return Qfalse; @@ -404,9 +419,18 @@ lock_mutex(Mutex *mutex) rb_thread_critical = 1; - while (RTEST(mutex->owner)) { - wait_list(&mutex->waiting); - rb_thread_critical = 1; + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + } + else { + push_list(&mutex->waiting, current); + do { + rb_thread_stop(); + rb_thread_critical = 1; + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + break; + } + } while (mutex->owner != current); } - mutex->owner = current; rb_thread_critical = 0; @@ -423,4 +447,20 @@ rb_mutex_lock(VALUE self) } +static VALUE +relock_mutex(Mutex *mutex) +{ + VALUE current = rb_thread_current(); + + switch (rb_thread_status(current)) { + case THREAD_RUNNABLE: + case THREAD_STOPPED: + lock_mutex(mutex); + break; + default: + break; + } + return Qundef; +} + /* * Document-method: unlock @@ -435,14 +475,10 @@ unlock_mutex_inner(Mutex *mutex) VALUE waking; - if (!RTEST(mutex->owner)) { - rb_raise(rb_eThreadError, "not owner"); - } - if (mutex->owner != rb_thread_current()) { rb_raise(rb_eThreadError, "not owner"); } - mutex->owner = Qnil; waking = wake_one(&mutex->waiting); + mutex->owner = waking; return waking; @@ -463,12 +499,9 @@ unlock_mutex(Mutex *mutex) rb_thread_critical = 1; waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0); - - if (waking == Qundef) { + if (!RTEST(waking)) { return Qfalse; } - if (RTEST(waking)) { - run_thread(waking); - } + run_thread(waking); return Qtrue; @@ -516,11 +550,9 @@ rb_mutex_exclusive_unlock(VALUE self) waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0); - if (waking == Qundef) { + if (waking == Qundef || !RTEST(waking)) { return Qnil; } - if (RTEST(waking)) { - run_thread(waking); - } + run_thread(waking); return self; @@ -634,4 +666,6 @@ static void wait_condvar(ConditionVariable *condvar, Mutex *mutex) { + VALUE waking; + rb_thread_critical = 1; if (rb_thread_current() != mutex->owner) { @@ -639,6 +673,9 @@ wait_condvar(ConditionVariable *condvar, rb_raise(rb_eThreadError, "not owner of the synchronization mutex"); } - unlock_mutex_inner(mutex); - rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex, (VALUE)mutex); + waking = unlock_mutex_inner(mutex); + if (RTEST(waking)) { + wake_thread(waking); + } + rb_ensure(wait_list, (VALUE)&condvar->waiting, relock_mutex, (VALUE)mutex); } @@ -836,8 +874,8 @@ rb_queue_marshal_load(VALUE self, VALUE array = rb_marshal_load(data); if (TYPE(array) != T_ARRAY) { - rb_raise(rb_eRuntimeError, "expected Array of queue data"); + rb_raise(rb_eTypeError, "expected Array of queue data"); } if (RARRAY(array)->len < 1) { - rb_raise(rb_eRuntimeError, "missing capacity value"); + rb_raise(rb_eArgError, "missing capacity value"); } queue->capacity = NUM2ULONG(rb_ary_shift(array));
-- --- 僕の前にBugはない。 --- 僕の後ろにBugはできる。 中田 伸悦