Kernel Thread implementation
From:
Matthew Bloch <mattbee@...>
Date:
2002-08-03 14:48:29 UTC
List:
ruby-core #291
Hi there; after struggling for a little while against the unpredictability of
which Ruby calls will block the whole process and which won't :-) I decided
to try a Thread implementation which uses fork and pipes, and KThread seems
to work quite well as a drop-in replacement.
It's currently running on a web crawler app that I mentioned with much higher
reliability than the Ruby internal threading, though obviously it has
disadvantages compared to internal threads: namely shared variables don't
work! But using KMutex (Mutex which works between KThreads) and
KSharedObject (like DRb, where several KThreads can share an instance of an
object) this isn't much of a problem. Also, because it relies on IO.pipe,
Kernel.fork and File#flock I'm not sure how Windows-friendly this code is.
I'd be interested to hear any comments on this, and suggestions on how best to
distribute the code.
--
Matthew > http://www.soup-kitchen.net/
> ICQ 19482073
Attachments (1)
kthread.rb
(5.14 KB, text/x-c++src)
# Use exactly as standard Mutex class, but works between KThreads; current
# but doesn't remove temporary files after use.
#
class KMutex
def initialize
@lockname = "/tmp/ruby.kmutex.#{$$}.#{self.id}"
f = File.open(@lockname, "w").close
@locked = false
#ObjectSpace.define_finalizer(self, Proc.new { File.delete(@lockname) })
end
def lock(nonblock=false)
st = Time.now
lock = File.open(@lockname)
locked = nil
begin
# Bodge because flock syscall can block!
locked = lock.flock(File::LOCK_EX | File::LOCK_NB) == 0
sleep rand if !nonblock && !locked
end while !nonblock && !locked
@lock = lock
return locked
end
def unlock
raise "Not Locked" if @lock.nil?
@lock.flock(File::LOCK_UN)
@lock.close
@lock = nil
end
def locked?
return true if !@lock.nil?
l = File.open(@lockname)
locked = l.flock(File::LOCK_NB)
l.flock(File::LOCK_UN) if locked == 0
return locked == false
end
def try_lock
lock(true)
end
def synchronize(*args)
self.lock
begin
yield(*args)
ensure
self.unlock
end
end
end
# Kernel Thread implementation -- use almost exactly as Ruby's native thread
# but without shared variables.
#
class KThread
EXIT_RESULT = "R"
EXIT_EXCEPTION = "E"
EXIT_CRASH = "X"
#@@dir = ""
@@threads = []
# First-time startup code
Kernel.at_exit {
@@threads.each { |pid|
Process.kill("TERM", pid)
}
}
# KThread.test_io_buffer_assumption
private
def KThread.new_child
#@@dir = "/tmp/kthread.#{$$}.#{Time.now.tv_sec}.#{Time.now.usec}"
#Dir.mkdir(@@dir)
@@threads.clear
end
#private
#def KThread.tmpdir
# @@dir
#end
KThread.new_child
private
def initialize(*args)
@finishing_lock = KMutex.new
@res_read, res_write = IO.pipe
@pid = Process.fork do
# Close the parent's end of the pipe
@res_read.close
# Kill off other internal threads because the calling program should
# be able to mix Threads and KThreads.
#
Thread.critical {
Thread.list.each { |thr|
next if thr == Thread.current
begin
thr.exit
thr.join
rescue Exception => ex
end
}
}
# Make sure this thread has no idea of its KThread siblings (FIXME: why
# does this have no effect if it's in the above Thread.critical block?
#
KThread.new_child
# Now run the Ruby code and write its results to the pipe
#
begin
res = yield(args)
res_write.write EXIT_RESULT
Marshal.dump(res, res_write)
rescue Exception => ex
print "#{ex.type}\n"
res_write.write EXIT_EXCEPTION
Marshal.dump(ex, res_write)
end
end
# Close child's end of the pipe
res_write.close
@@threads << @pid
@result = nil
end
private
def check_finished(block)
@finishing_lock.synchronize do
return true if @pid.nil?
if !block
sel = Kernel::select([@res_read], nil, [@res_read], 0)
return false if sel.nil?
end
# Result element 1: Exit status: normal, exception or child died
@result = [@res_read.read(1)]
# Result element 2: Return object
case @result[0]
when EXIT_RESULT, EXIT_EXCEPTION
@result << Marshal.load(@res_read)
when nil
@result << nil
else
raise "child returned '#{code}' type '#{code.type}'"
end
# Result element 3: Result from wait() system call
@result << Process.waitpid2(@pid, 0)
@@threads.delete(@pid)
@pid = nil
return true
end
end
public
def join
check_finished(true)
raise @result[1] if @result[0] == EXIT_EXCEPTION
return @result[1]
end
public
def status
# FIXME: sleeping?
return check_finished(false) ? false : "run"
end
public
def alive?
return !check_finished(false)
end
end
class KSharedObject
# A utility class which allows you to control a single instance
# of an object from multiple kthreads, with access properly synchronized.
# You can either use it like so:
#
# KSharedObject.new(object)
#
# for an existing object, or to generate the object after the fork has taken
# place, use:
#
# KSharedObject.new(ClassName, :new, arg1, arg2)
# KSharedObject.new(DBI, :connect, *args)
#
# Method requests called on the KSharedObject will be passed to the instance
# of the object, and results passed back. This will happen through normal
# Ruby marshalling, so beware. But this is enough to share DBI handles
# etc.
#
private
def initialize
@lock = KMutex.new
@qrd, @qwr = IO.pipe
@ard, @awr = IO.pipe
@thread = KThread.new {
@qwr.close
@ard.close
obj = yield
loop {
sym = Marshal.restore(@qrd)
dargs = Marshal.restore(@qrd)
answer = obj.send(sym, *dargs)
@awr.write(Marshal.dump(answer))
}
}
@qrd.close
@awr.close
end
private
def method_missing(sym, *args)
@lock.synchronize {
@qwr.write(Marshal.dump(sym))
@qwr.write(Marshal.dump(args))
answer = Marshal.restore(@ard)
return answer
}
end
end