[ruby-dev:43103] Re: pthread_cond を用いたConditionVariable
From:
keiju@... (石塚圭樹)
Date:
2011-01-25 11:45:23 UTC
List:
ruby-dev #43103
けいじゅ@いしつかです. In [ruby-dev:43097] the message: "[ruby-dev:43097] Re: pthread_cond を 用いたConditionVariable", on Jan/25 07:22(JST) KOSAKI Motohiro writes: >> rb_cv_wait_forever()に以下の3行足したら、deadlock 検知してくれました。 >> >> th->vm->sleeper++; >> rb_check_deadlock(th->vm); >> if (RUBY_VM_INTERRUPTED(th)) >> goto out; > >わはは。大嘘でした。CVはlock離すのと寝るのとを一息にやるので >false positive うみますね。 こちらで試したら, deadlockの検知もできるようになりました. 位置の問題か しら. というわけで: >1. deadlockのチェックができていない. thread->vm->sleeper の辺りが怪し > いと思ったのですが, うまくいきませんでした. こちらは解決しました. >2. ConditionVariable.wait(mutex, timeout)でtimeoutの指定されているとき, > timeoutで終了しない. 私の環境ではオリジナルのConditionVariableでも > timeoutしないのですが... こちらも, 解決しました. 私のテストスクリプトが間違っていました(^^;;; trunkからの差分を添付します. __ ---------------------------------------------------->> 石塚 圭樹 <<--- ---------------------------------->> e-mail: keiju@ishitsuka.com <<---
Attachments (1)
thread.diff
(10.7 KB, text/x-diff)
diff --git a/thread.c b/thread.c
index 69fc299..5566ad6 100644
--- a/thread.c
+++ b/thread.c
@@ -351,7 +351,7 @@ rb_thread_terminate_all(void)
/* unlock all locking mutexes */
if (th->keeping_mutexes) {
- rb_mutex_unlock_all(th->keeping_mutexes, GET_THREAD());
+ rb_mutex_unlock_all(th->keeping_mutexes, GET_THREAD());
}
thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
@@ -3494,6 +3494,218 @@ rb_barrier_destroy(VALUE self)
return rb_mutex_unlock(mutex);
}
+VALUE rb_cConditionVariable;
+
+typedef struct rb_cv_struct
+{
+ rb_thread_lock_t lock;
+ rb_thread_cond_t cond;
+} cv_t;
+
+#define GetCVPtr(obj, tobj) \
+ TypedData_Get_Struct((obj), cv_t, &cv_data_type, (tobj))
+
+#define cv_mark NULL
+
+static void
+cv_free(void *ptr)
+{
+ if (ptr) {
+ cv_t *cv = ptr;
+ native_mutex_destroy(&cv->lock);
+ native_cond_destroy(&cv->cond);
+ }
+ ruby_xfree(ptr);
+}
+
+static size_t
+cv_memsize(const void *ptr)
+{
+ return ptr ? sizeof(cv_t) : 0;
+}
+
+static const rb_data_type_t cv_data_type = {
+ "cv",
+ {cv_mark, cv_free, cv_memsize,},
+};
+
+static VALUE
+cv_alloc(VALUE klass)
+{
+ VALUE volatile obj;
+ cv_t *cv;
+
+ obj = TypedData_Make_Struct(klass, cv_t, &cv_data_type, cv);
+ native_mutex_initialize(&cv->lock);
+ native_cond_initialize(&cv->cond);
+ return obj;
+}
+
+/*
+ * call-seq:
+ * ConditionVariable.new -> condition_variable
+ *
+ * Creates a new ConditionVariable
+ */
+static VALUE
+cv_initialize(VALUE self)
+{
+ return self;
+}
+
+VALUE
+cv_new(void)
+{
+ return cv_alloc(rb_cConditionVariable);
+}
+
+struct cv_wait_arg {
+ VALUE mutex;
+ cv_t *cv;
+ struct timeval *tv;
+ enum rb_thread_status prev_status;
+ struct rb_unblock_callback oldubf;
+};
+
+static VALUE
+rb_cv_wait_forever(struct cv_wait_arg *arg)
+{
+ rb_thread_t *th = GET_THREAD();
+
+ th->transition_for_lock = 1;
+ BLOCKING_REGION_CORE({
+ native_mutex_lock(&arg->cv->lock);
+ th->transition_for_lock = 0;
+ native_cond_wait(&arg->cv->cond, &arg->cv->lock);
+ th->transition_for_lock = 1;
+ native_mutex_unlock(&arg->cv->lock);
+ });
+ th->transition_for_lock = 0;
+}
+
+static VALUE
+rb_cv_wait_for(struct cv_wait_arg *arg)
+{
+ struct timespec ts;
+ struct timeval tvn;
+ rb_thread_t *th = GET_THREAD();
+ int r;
+
+ gettimeofday(&tvn, NULL);
+ ts.tv_sec = tvn.tv_sec + arg->tv->tv_sec;
+ ts.tv_nsec = (tvn.tv_usec + arg->tv->tv_usec) * 1000;
+ if (ts.tv_nsec >= PER_NANO){
+ ts.tv_sec += 1;
+ ts.tv_nsec -= PER_NANO;
+ }
+
+ th->transition_for_lock = 1;
+ BLOCKING_REGION_CORE({
+ native_mutex_lock(&arg->cv->lock);
+ th->transition_for_lock = 0;
+ native_cond_timedwait(&arg->cv->cond, &arg->cv->lock, &ts);
+ th->transition_for_lock = 1;
+ native_mutex_unlock(&arg->cv->lock);
+ });
+ th->transition_for_lock = 0;
+}
+
+static VALUE
+rb_cv_wait_ensure(struct cv_wait_arg *arg)
+{
+ rb_thread_t *th = GET_THREAD();
+
+ if (th->status == THREAD_STOPPED_FOREVER) {
+ th->status = arg->prev_status;
+ }
+ reset_unblock_function(th, &arg->oldubf);
+ th->vm->sleeper--;
+ rb_mutex_lock(arg->mutex);
+}
+
+static void
+rb_cv_wait_interrupt(void *ptr)
+{
+ cv_t *cv = (cv_t *)ptr;
+ native_mutex_lock(&cv->lock);
+ native_cond_broadcast(&cv->cond);
+ native_mutex_unlock(&cv->lock);
+}
+
+VALUE
+rb_cv_wait(int argc, VALUE *argv, VALUE self)
+{
+ VALUE mutex;
+ VALUE timeout;
+ cv_t *cv;
+ struct timeval t;
+ struct cv_wait_arg arg;
+
+ rb_thread_t *th = GET_THREAD();
+
+ GetCVPtr(self, cv);
+
+ rb_scan_args(argc, argv, "11", &mutex, &timeout);
+
+ if (!NIL_P(timeout)) {
+ t = rb_time_interval(timeout);
+ }
+
+ arg.mutex = mutex;
+ arg.cv = cv;
+ arg.tv = &t;
+
+ rb_mutex_unlock(mutex);
+ arg.prev_status = th->status;
+
+ set_unblock_function(th, rb_cv_wait_interrupt, cv, &arg.oldubf);
+
+ if (!NIL_P(timeout)) {
+ th->status = THREAD_STOPPED;
+ }
+ else {
+ th->status = THREAD_STOPPED_FOREVER;
+ th->vm->sleeper++;
+ rb_check_deadlock(th->vm);
+ if (RUBY_VM_INTERRUPTED(th)) {
+ return self;
+ }
+ }
+
+ if (NIL_P(timeout)) {
+ rb_ensure(rb_cv_wait_forever, (VALUE)&arg, rb_cv_wait_ensure, (VALUE)&arg);
+ }
+ else {
+ rb_ensure(rb_cv_wait_for, (VALUE)&arg, rb_cv_wait_ensure, (VALUE)&arg);
+ }
+ return self;
+}
+
+VALUE
+rb_cv_signal(VALUE self)
+{
+ cv_t *cv;
+ GetCVPtr(self, cv);
+
+ native_mutex_lock(&cv->lock);
+ native_cond_signal(&cv->cond);
+ native_mutex_unlock(&cv->lock);
+ return self;
+}
+
+VALUE
+rb_cv_broadcast(VALUE self)
+{
+ cv_t *cv;
+ GetCVPtr(self, cv);
+
+ native_mutex_lock(&cv->lock);
+ native_cond_broadcast(&cv->cond);
+ native_mutex_unlock(&cv->lock);
+ return self;
+}
+
+
/* variables for recursive traversals */
static ID recursive_key;
@@ -4382,6 +4594,13 @@ Init_Thread(void)
rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
+ rb_cConditionVariable = rb_define_class("XConditionVariable", rb_cObject);
+ rb_define_alloc_func(rb_cConditionVariable, cv_alloc);
+ rb_define_method(rb_cConditionVariable, "initialize", cv_initialize, 0);
+ rb_define_method(rb_cConditionVariable, "wait", rb_cv_wait, -1);
+ rb_define_method(rb_cConditionVariable, "signal", rb_cv_signal, 0);
+ rb_define_method(rb_cConditionVariable, "broadcast", rb_cv_broadcast, 0);
+
recursive_key = rb_intern("__recursive_key__");
rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
@@ -351,7 +351,7 @@ rb_thread_terminate_all(void)
/* unlock all locking mutexes */
if (th->keeping_mutexes) {
- rb_mutex_unlock_all(th->keeping_mutexes, GET_THREAD());
+ rb_mutex_unlock_all(th->keeping_mutexes, GET_THREAD());
}
thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
@@ -3494,6 +3494,207 @@ rb_barrier_destroy(VALUE self)
return rb_mutex_unlock(mutex);
}
+VALUE rb_cConditionVariable;
+
+typedef struct rb_cv_struct
+{
+ rb_thread_lock_t lock;
+ rb_thread_cond_t cond;
+} cv_t;
+
+#define GetCVPtr(obj, tobj) \
+ TypedData_Get_Struct((obj), cv_t, &cv_data_type, (tobj))
+
+#define cv_mark NULL
+
+static void
+cv_free(void *ptr)
+{
+ if (ptr) {
+ cv_t *cv = ptr;
+ native_mutex_destroy(&cv->lock);
+ native_cond_destroy(&cv->cond);
+ }
+ ruby_xfree(ptr);
+}
+
+static size_t
+cv_memsize(const void *ptr)
+{
+ return ptr ? sizeof(cv_t) : 0;
+}
+
+static const rb_data_type_t cv_data_type = {
+ "cv",
+ {cv_mark, cv_free, cv_memsize,},
+};
+
+static VALUE
+cv_alloc(VALUE klass)
+{
+ VALUE volatile obj;
+ cv_t *cv;
+
+ obj = TypedData_Make_Struct(klass, cv_t, &cv_data_type, cv);
+ native_mutex_initialize(&cv->lock);
+ native_cond_initialize(&cv->cond);
+ return obj;
+}
+
+/*
+ * call-seq:
+ * ConditionVariable.new -> condition_variable
+ *
+ * Creates a new ConditionVariable
+ */
+static VALUE
+cv_initialize(VALUE self)
+{
+ return self;
+}
+
+VALUE
+cv_new(void)
+{
+ return cv_alloc(rb_cConditionVariable);
+}
+
+struct cv_wait_arg {
+ VALUE mutex;
+ cv_t *cv;
+ struct timeval *tv;
+ enum rb_thread_status prev_status;
+ struct rb_unblock_callback oldubf;
+};
+
+static VALUE
+rb_cv_wait_forever(struct cv_wait_arg *arg)
+{
+ rb_thread_t *th = GET_THREAD();
+
+ th->transition_for_lock = 1;
+ BLOCKING_REGION_CORE({
+ native_mutex_lock(&arg->cv->lock);
+ th->transition_for_lock = 0;
+ native_cond_wait(&arg->cv->cond, &arg->cv->lock);
+ th->transition_for_lock = 1;
+ native_mutex_unlock(&arg->cv->lock);
+ });
+ th->transition_for_lock = 0;
+}
+
+static VALUE
+rb_cv_wait_for(struct cv_wait_arg *arg)
+{
+ struct timespec ts;
+ struct timeval tvn;
+ rb_thread_t *th = GET_THREAD();
+ int r;
+
+ gettimeofday(&tvn, NULL);
+ ts.tv_sec = tvn.tv_sec + arg->tv->tv_sec;
+ ts.tv_nsec = (tvn.tv_usec + arg->tv->tv_usec) * 1000;
+ if (ts.tv_nsec >= PER_NANO){
+ ts.tv_sec += 1;
+ ts.tv_nsec -= PER_NANO;
+ }
+
+ th->transition_for_lock = 1;
+ BLOCKING_REGION_CORE({
+ native_mutex_lock(&arg->cv->lock);
+ th->transition_for_lock = 0;
+ native_cond_timedwait(&arg->cv->cond, &arg->cv->lock, &ts);
+ th->transition_for_lock = 1;
+ native_mutex_unlock(&arg->cv->lock);
+ });
+ th->transition_for_lock = 0;
+}
+
+static VALUE
+rb_cv_wait_ensure(struct cv_wait_arg *arg)
+{
+ rb_thread_t *th = GET_THREAD();
+
+ if (th->status == THREAD_STOPPED_FOREVER) {
+ th->status = arg->prev_status;
+ }
+ reset_unblock_function(th, &arg->oldubf);
+ rb_mutex_lock(arg->mutex);
+}
+
+static void
+rb_cv_wait_interrupt(void *ptr)
+{
+ cv_t *cv = (cv_t *)ptr;
+ native_mutex_lock(&cv->lock);
+ native_cond_broadcast(&cv->cond);
+ native_mutex_unlock(&cv->lock);
+}
+
+VALUE
+rb_cv_wait(int argc, VALUE *argv, VALUE self)
+{
+ VALUE mutex;
+ VALUE timeout;
+ cv_t *cv;
+ struct timeval t;
+ struct cv_wait_arg arg;
+
+ rb_thread_t *th = GET_THREAD();
+
+ GetCVPtr(self, cv);
+
+ rb_scan_args(argc, argv, "11", &mutex, &timeout);
+
+ if (!NIL_P(timeout)) {
+ t = rb_time_interval(timeout);
+ }
+
+ arg.mutex = mutex;
+ arg.cv = cv;
+ arg.tv = &t;
+
+ rb_mutex_unlock(mutex);
+
+ arg.prev_status = th->status;
+
+ set_unblock_function(th, rb_cv_wait_interrupt, cv, &arg.oldubf);
+ th->status = THREAD_STOPPED_FOREVER;
+
+ if (NIL_P(timeout)) {
+ rb_ensure(rb_cv_wait_forever, (VALUE)&arg, rb_cv_wait_ensure, (VALUE)&arg);
+ }
+ else {
+ rb_ensure(rb_cv_wait_for, (VALUE)&arg, rb_cv_wait_ensure, (VALUE)&arg);
+ }
+ return self;
+}
+
+VALUE
+rb_cv_signal(VALUE self)
+{
+ cv_t *cv;
+ GetCVPtr(self, cv);
+
+ native_mutex_lock(&cv->lock);
+ native_cond_signal(&cv->cond);
+ native_mutex_unlock(&cv->lock);
+ return self;
+}
+
+VALUE
+rb_cv_broadcast(VALUE self)
+{
+ cv_t *cv;
+ GetCVPtr(self, cv);
+
+ native_mutex_lock(&cv->lock);
+ native_cond_broadcast(&cv->cond);
+ native_mutex_unlock(&cv->lock);
+ return self;
+}
+
+
/* variables for recursive traversals */
static ID recursive_key;
@@ -4382,6 +4583,13 @@ Init_Thread(void)
rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
+ rb_cConditionVariable = rb_define_class("XConditionVariable", rb_cObject);
+ rb_define_alloc_func(rb_cConditionVariable, cv_alloc);
+ rb_define_method(rb_cConditionVariable, "initialize", cv_initialize, 0);
+ rb_define_method(rb_cConditionVariable, "wait", rb_cv_wait, -1);
+ rb_define_method(rb_cConditionVariable, "signal", rb_cv_signal, 0);
+ rb_define_method(rb_cConditionVariable, "broadcast", rb_cv_broadcast, 0);
+
recursive_key = rb_intern("__recursive_key__");
rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);