[ruby-dev:42656] [Request for Comment] avoid timer thread
From:
SASADA Koichi <ko1@...>
Date:
2010-11-29 02:53:03 UTC
List:
ruby-dev #42656
Hi,
I attached a patch to avoid timer thread. Your reviews are welcome.
- Timer thread -> Interrupt thread
- Two tasks
- For Signal delivery
- For Guarantee to interrupt ruby threads
- wake up if needed
- Communicate with POSIX semaphore.
- Communicate with Signal handlers
- Communicate with Ruby threads
- Rarely wake-up (to reduce power consumption :))
- Ruby thread context switch timing is known by setitimer (SIGVTALRM)
- set a timer using setitimer() if there is
at least one waiting thread
- disable a timer if there are no waiting threads
Known issues:
- MacOS X (Darwin Kernel Version 9.8.0) doesn't have sem_timedwait().
- workaround for MacOSX user:
Replace sem_timedwait() with usleep(10000).
- No Windows support
- stick with some tests on several environments
----
Japanese / にほんご
タイマスレッドを無くすパッチを書いてみました。結局、ヘルパースレッド的な
ものは残っているけれど、あんまり走りません。レビュー頂けると幸いです。
--
// SASADA Koichi at atdot dot net
Attachments (1)
avoid_timerthread.patch
(15.2 KB, text/x-diff)
Index: thread_pthread.c
===================================================================
--- thread_pthread.c (revision 29964)
+++ thread_pthread.c (working copy)
@@ -11,6 +11,7 @@
#ifdef THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
+#include <semaphore.h>
#include "gc.h"
#ifdef HAVE_SYS_RESOURCE_H
@@ -48,6 +49,20 @@ gvl_show_waiting_threads(rb_vm_t *vm)
}
}
+static void
+native_set_timer(long usec)
+{
+ struct itimerval it;
+ int r;
+
+ it.it_interval.tv_sec = 0;
+ it.it_interval.tv_usec = usec; /* 10ms */
+ it.it_value.tv_sec = 0;
+ it.it_value.tv_usec = usec;
+ r = setitimer(ITIMER_VIRTUAL, &it, 0);
+ if (r != 0) rb_bug_errno("setitimer", r);
+}
+
#if !GVL_SIMPLE_LOCK
static void
gvl_waiting_push(rb_vm_t *vm, rb_thread_t *th)
@@ -64,6 +79,11 @@ gvl_waiting_push(rb_vm_t *vm, rb_thread_
}
th = vm->gvl.waiting_threads;
vm->gvl.waiting++;
+
+ if (vm->gvl.waiting == 1) {
+ /* invoke timer */
+ native_set_timer(10000 /* 10ms */);
+ }
}
static void
@@ -71,6 +91,11 @@ gvl_waiting_shift(rb_vm_t *vm, rb_thread
{
vm->gvl.waiting_threads = vm->gvl.waiting_threads->native_thread_data.gvl_next;
vm->gvl.waiting--;
+
+ if (vm->gvl.waiting == 0) {
+ /* stop timer */
+ native_set_timer(0);
+ }
}
#endif
@@ -85,7 +110,6 @@ gvl_acquire(rb_vm_t *vm, rb_thread_t *th
if (GVL_DEBUG) fprintf(stderr, "gvl acquire (%p): sleep\n", th);
gvl_waiting_push(vm, th);
if (GVL_DEBUG) gvl_show_waiting_threads(vm);
-
while (vm->gvl.acquired != 0 || vm->gvl.waiting_threads != th) {
native_cond_wait(&th->native_thread_data.gvl_cond, &vm->gvl.lock);
}
@@ -297,9 +321,10 @@ static rb_thread_lock_t signal_thread_li
static pthread_key_t ruby_native_thread_key;
static void
-null_func(int i)
+vtalrm_handler(int i)
{
- /* null */
+ rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */
+ RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread);
}
static rb_thread_t *
@@ -314,7 +339,14 @@ ruby_thread_set_native(rb_thread_t *th)
return pthread_setspecific(ruby_native_thread_key, th) == 0;
}
-static void native_thread_init(rb_thread_t *th);
+static void
+native_thread_init(rb_thread_t *th)
+{
+ native_cond_initialize(&th->native_thread_data.sleep_cond);
+ native_cond_initialize(&th->native_thread_data.gvl_cond);
+ ruby_thread_set_native(th);
+ posix_signal(SIGVTALRM, vtalrm_handler);
+}
void
Init_native_thread(void)
@@ -325,15 +357,6 @@ Init_native_thread(void)
th->thread_id = pthread_self();
native_thread_init(th);
native_mutex_initialize(&signal_thread_list_lock);
- posix_signal(SIGVTALRM, null_func);
-}
-
-static void
-native_thread_init(rb_thread_t *th)
-{
- native_cond_initialize(&th->native_thread_data.sleep_cond);
- native_cond_initialize(&th->native_thread_data.gvl_cond);
- ruby_thread_set_native(th);
}
static void
@@ -536,8 +559,6 @@ thread_start_func_1(void *th_ptr)
return 0;
}
-void rb_thread_create_control_thread(void);
-
struct cached_thread_entry {
volatile rb_thread_t **th_area;
pthread_cond_t *cond;
@@ -816,6 +837,8 @@ struct signal_thread_list {
struct signal_thread_list *next;
};
+sem_t interrupt_sem;
+
#ifndef __CYGWIN__
static struct signal_thread_list signal_thread_list_anchor = {
0, 0, 0,
@@ -870,6 +893,7 @@ add_signal_thread_list(rb_thread_t *th)
th->native_thread_data.signal_thread_list = list;
});
}
+ sem_post(&interrupt_sem);
}
#endif
@@ -896,9 +920,9 @@ remove_signal_thread_list(rb_thread_t *t
}
}
-static pthread_t timer_thread_id;
-static pthread_cond_t timer_thread_cond = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t timer_thread_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_t interrupt_thread_id;
+static pthread_cond_t interrupt_thread_cond = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t interrupt_thread_lock = PTHREAD_MUTEX_INITIALIZER;
static struct timespec *
get_ts(struct timespec *ts, unsigned long nsec)
@@ -915,22 +939,40 @@ get_ts(struct timespec *ts, unsigned lon
}
static void *
-thread_timer(void *dummy)
+interrupt_thread_func(void *dummy)
{
struct timespec ts;
- native_mutex_lock(&timer_thread_lock);
- native_cond_broadcast(&timer_thread_cond);
-#define WAIT_FOR_10MS() native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, get_ts(&ts, PER_NANO/100))
+ /* sync with creater thread */
+ native_mutex_lock(&interrupt_thread_lock);
+ native_cond_broadcast(&interrupt_thread_cond);
+ native_mutex_unlock(&interrupt_thread_lock);
+
while (system_working > 0) {
- int err = WAIT_FOR_10MS();
- if (err == ETIMEDOUT);
- else if (err == 0 || err == EINTR) {
- if (rb_signal_buff_size() == 0) break;
+#if !defined(__CYGWIN__) && !defined(__SYMBIAN32__)
+ if (signal_thread_list_anchor.next) {
+ struct timespec ts;
+ sem_timedwait(&interrupt_sem, get_ts(&ts, PER_NANO/100));
+ }
+ else
+#endif
+ sem_wait(&interrupt_sem);
+
+ switch (errno) {
+ case EINVAL:
+ case EDEADLK:
+ rb_bug_errno("interrupt_thread_core/sem_wait or sem_timedwait", errno);
+ }
+
+ if (rb_signal_buff_size() > 0) {
+ /* signal deliverly */
+ rb_vm_t *vm = GET_VM();
+ rb_threadptr_check_signal(vm->main_thread);
}
- else rb_bug_errno("thread_timer/timedwait", err);
#if !defined(__CYGWIN__) && !defined(__SYMBIAN32__)
+ /* Send a signal to interrupt specific ruby thread */
+ /* to cancel blocking region */
if (signal_thread_list_anchor.next) {
FGLOCK(&signal_thread_list_lock, {
struct signal_thread_list *list;
@@ -942,59 +984,59 @@ thread_timer(void *dummy)
});
}
#endif
- timer_thread_function(dummy);
}
- native_mutex_unlock(&timer_thread_lock);
return NULL;
}
-static void
-rb_thread_create_timer_thread(void)
+/* called from signal handler */
+void
+rb_thread_set_signal(void)
{
- rb_enable_interrupt();
+ sem_post(&interrupt_sem);
+}
- if (!timer_thread_id) {
+static void
+rb_thread_create_interrupt_thread(void)
+{
+ if (!interrupt_thread_id) {
pthread_attr_t attr;
int err;
+ sem_init(&interrupt_sem, 0, 0);
+
pthread_attr_init(&attr);
#ifdef PTHREAD_STACK_MIN
pthread_attr_setstacksize(&attr,
PTHREAD_STACK_MIN + (THREAD_DEBUG ? BUFSIZ : 0));
#endif
- native_mutex_lock(&timer_thread_lock);
- err = pthread_create(&timer_thread_id, &attr, thread_timer, 0);
+ native_mutex_lock(&interrupt_thread_lock);
+ err = pthread_create(&interrupt_thread_id, &attr, interrupt_thread_func, 0);
if (err != 0) {
- native_mutex_unlock(&timer_thread_lock);
+ native_mutex_unlock(&interrupt_thread_lock);
fprintf(stderr, "[FATAL] Failed to create timer thread (errno: %d)\n", err);
exit(EXIT_FAILURE);
}
- native_cond_wait(&timer_thread_cond, &timer_thread_lock);
- native_mutex_unlock(&timer_thread_lock);
+
+ native_cond_wait(&interrupt_thread_cond, &interrupt_thread_lock);
+ native_mutex_unlock(&interrupt_thread_lock);
}
rb_disable_interrupt(); /* only timer thread recieve signal */
}
static int
-native_stop_timer_thread(void)
+native_stop_interrupt_thread(void)
{
int stopped;
- native_mutex_lock(&timer_thread_lock);
stopped = --system_working <= 0;
- if (stopped) {
- native_cond_signal(&timer_thread_cond);
- }
- native_mutex_unlock(&timer_thread_lock);
- if (stopped) {
- native_thread_join(timer_thread_id);
- }
+ sem_post(&interrupt_sem);
+ native_thread_join(interrupt_thread_id);
return stopped;
}
static void
-native_reset_timer_thread(void)
+native_reset_interrupt_thread(void)
{
- timer_thread_id = 0;
+ interrupt_thread_id = 0;
}
#ifdef HAVE_SIGALTSTACK
Index: vm_core.h
===================================================================
--- vm_core.h (revision 29961)
+++ vm_core.h (working copy)
@@ -641,9 +641,10 @@ VALUE rb_vm_make_proc(rb_thread_t *th, c
VALUE rb_vm_make_env_object(rb_thread_t *th, rb_control_frame_t *cfp);
void rb_vm_gvl_destroy(rb_vm_t *vm);
-void rb_thread_start_timer_thread(void);
-void rb_thread_stop_timer_thread(void);
-void rb_thread_reset_timer_thread(void);
+void rb_thread_start_interrupt_thread(void);
+void rb_thread_stop_interrupt_thread(void);
+void rb_thread_reset_interrupt_thread(void);
+
void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
int ruby_thread_has_gvl_p(void);
VALUE rb_make_backtrace(void);
Index: thread.c
===================================================================
--- thread.c (revision 29961)
+++ thread.c (working copy)
@@ -193,7 +193,6 @@ rb_thread_s_debug_set(VALUE self, VALUE
#endif
NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start,
VALUE *register_stack_start));
-static void timer_thread_function(void *);
#if defined(_WIN32)
#include "thread_win32.c"
@@ -367,7 +366,7 @@ rb_thread_terminate_all(void)
}
POP_TAG();
}
- rb_thread_stop_timer_thread();
+ rb_thread_stop_interrupt_thread();
}
static void
@@ -2684,48 +2683,25 @@ rb_threadptr_check_signal(rb_thread_t *m
}
}
-static void
-timer_thread_function(void *arg)
-{
- rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */
-
- /* for time slice */
- RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread);
-
- /* check signal */
- rb_threadptr_check_signal(vm->main_thread);
-
-#if 0
- /* prove profiler */
- if (vm->prove_profile.enable) {
- rb_thread_t *th = vm->running_thread;
-
- if (vm->during_gc) {
- /* GC prove profiling */
- }
- }
-#endif
-}
-
void
-rb_thread_stop_timer_thread(void)
+rb_thread_stop_interrupt_thread(void)
{
- if (timer_thread_id && native_stop_timer_thread()) {
- native_reset_timer_thread();
+ if (interrupt_thread_id && native_stop_interrupt_thread()) {
+ native_reset_interrupt_thread();
}
}
void
-rb_thread_reset_timer_thread(void)
+rb_thread_reset_interrupt_thread(void)
{
- native_reset_timer_thread();
+ native_reset_interrupt_thread();
}
void
-rb_thread_start_timer_thread(void)
+rb_thread_start_interrupt_thread(void)
{
system_working = 1;
- rb_thread_create_timer_thread();
+ rb_thread_create_interrupt_thread();
}
static int
@@ -4389,7 +4365,7 @@ Init_Thread(void)
}
}
- rb_thread_create_timer_thread();
+ rb_thread_create_interrupt_thread();
(void)native_mutex_trylock;
}
Index: eval.c
===================================================================
--- eval.c (revision 29961)
+++ eval.c (working copy)
@@ -35,7 +35,6 @@ VALUE rb_eSysStackError;
/* initialize ruby */
void rb_clear_trace_func(void);
-void rb_thread_stop_timer_thread(void);
void rb_call_inits(void);
void Init_heap(void);
@@ -118,8 +117,6 @@ ruby_finalize(void)
ruby_finalize_1();
}
-void rb_thread_stop_timer_thread(void);
-
int
ruby_cleanup(volatile int ex)
{
@@ -160,7 +157,7 @@ ruby_cleanup(volatile int ex)
ex = error_handle(ex);
ruby_finalize_1();
POP_TAG();
- rb_thread_stop_timer_thread();
+ rb_thread_stop_interrupt_thread();
#if EXIT_SUCCESS != 0 || EXIT_FAILURE != 1
switch (ex) {
Index: process.c
===================================================================
--- process.c (revision 29961)
+++ process.c (working copy)
@@ -989,16 +989,12 @@ proc_detach(VALUE obj, VALUE pid)
char *strtok();
#endif
-void rb_thread_stop_timer_thread(void);
-void rb_thread_start_timer_thread(void);
-void rb_thread_reset_timer_thread(void);
-
static int forked_child = 0;
#define before_exec() \
- (rb_enable_interrupt(), (void)(forked_child ? 0 : (rb_thread_stop_timer_thread(), 1)))
+ (rb_enable_interrupt(), (void)(forked_child ? 0 : (rb_thread_stop_interrupt_thread(), 1)))
#define after_exec() \
- (rb_thread_reset_timer_thread(), rb_thread_start_timer_thread(), forked_child = 0, rb_disable_interrupt())
+ (rb_thread_reset_interrupt_thread(), rb_thread_start_interrupt_thread(), forked_child = 0, rb_disable_interrupt())
#define before_fork() before_exec()
#define after_fork() (GET_THREAD()->thrown_errinfo = 0, after_exec())
Index: signal.c
===================================================================
--- signal.c (revision 29961)
+++ signal.c (working copy)
@@ -522,11 +522,15 @@ ruby_nativethread_signal(int signum, sig
#endif
#endif
+void rb_thread_set_signal(void);
+
static RETSIGTYPE
sighandler(int sig)
{
ATOMIC_INC(signal_buff.cnt[sig]);
ATOMIC_INC(signal_buff.size);
+ rb_thread_set_signal();
+
#if !defined(BSD_SIGNAL) && !defined(POSIX_SIGNAL)
ruby_signal(sig, sighandler);
#endif
@@ -556,8 +560,6 @@ rb_disable_interrupt(void)
#if USE_TRAP_MASK
sigset_t mask;
sigfillset(&mask);
- sigdelset(&mask, SIGVTALRM);
- sigdelset(&mask, SIGSEGV);
pthread_sigmask(SIG_SETMASK, &mask, NULL);
#endif
}
@@ -1057,6 +1059,44 @@ int ruby_enable_coredump = 0;
#define ruby_enable_coredump 0
#endif
+#if HAVE_PTHREAD
+#include <pthread.h>
+#include <signal.h>
+
+static VALUE
+sigset2hash(sigset_t *set)
+{
+ VALUE hash = rb_hash_new();
+ const struct signals *sigs;
+
+ for (sigs = siglist; sigs->signm; sigs++) {
+ if (sigs->signo == 0) continue;
+ rb_hash_aset(hash, rb_str_new2(sigs->signm), sigismember(set, sigs->signo) ? Qtrue : Qfalse);
+ fprintf(stderr, "signal: SIG%-6s is %d\n", sigs->signm, sigismember(set, sigs->signo));
+ }
+
+ return hash;
+}
+
+static VALUE
+sigmask_of_current_thread(VALUE klass)
+{
+ sigset_t oldset;
+ int r;
+ r = pthread_sigmask(0, NULL, &oldset);
+ if (r != 0) {rb_bug_errno("pthread_sigmask", r);}
+ return sigset2hash(&oldset);
+}
+
+static VALUE
+sigmask_of_current_process(VALUE klass)
+{
+ sigset_t oldset;
+ sigprocmask(0, NULL, &oldset);
+ return sigset2hash(&oldset);
+}
+#endif
+
/*
* Many operating systems allow signals to be sent to running
* processes. Some signals have a defined effect on the process, while
@@ -1102,6 +1142,11 @@ Init_signal(void)
rb_define_global_function("trap", sig_trap, -1);
rb_define_module_function(mSignal, "trap", sig_trap, -1);
rb_define_module_function(mSignal, "list", sig_list, 0);
+
+#if HAVE_PTHREAD
+ rb_define_module_function(mSignal, "sigmask_of_current_thread", sigmask_of_current_thread, 0);
+ rb_define_module_function(mSignal, "sigmask_of_current_process", sigmask_of_current_process, 0);
+#endif
rb_define_method(rb_eSignal, "initialize", esignal_init, -1);
rb_define_method(rb_eSignal, "signo", esignal_signo, 0);
Mon Nov 29 11:38:42 2010 Koichi Sasada <ko1@atdot.net>
* eval.c:
* eval.c (ruby_cleanup):
* eval.c (ruby_finalize):
* process.c:
* process.c (proc_detach):
* signal.c:
* signal.c (Init_signal):
* signal.c (rb_disable_interrupt):
* signal.c (ruby_nativethread_signal):
* thread.c:
* thread.c (Init_Thread):
* thread.c (rb_thread_s_debug_set):
* thread.c (rb_thread_terminate_all):
* thread.c (rb_threadptr_check_signal):
* thread_pthread.c:
* thread_pthread.c (Init_native_thread):
* thread_pthread.c (add_signal_thread_list):
* thread_pthread.c (get_ts):
* thread_pthread.c (gvl_acquire):
* thread_pthread.c (gvl_show_waiting_threads):
* thread_pthread.c (gvl_waiting_push):
* thread_pthread.c (gvl_waiting_shift):
* thread_pthread.c (remove_signal_thread_list):
* thread_pthread.c (ruby_thread_set_native):
* thread_pthread.c (thread_start_func_1):
* thread_pthread.c (thread_timer):
* vm_core.h:
* vm_core.h (rb_vm_make_proc):