Kernel Thread implementation

From: Matthew Bloch <mattbee@...>
Date: 2002-08-03 14:48:29 UTC
List: ruby-core #291
Hi there; after struggling for a little while against the unpredictability of 
which Ruby calls will block the whole process and which won't :-) I decided 
to try a Thread implementation which uses fork and pipes, and KThread seems 
to work quite well as a drop-in replacement.

It's currently running on a web crawler app that I mentioned with much higher 
reliability than the Ruby internal threading, though obviously it has 
disadvantages compared to internal threads: namely shared variables don't 
work!  But using KMutex (Mutex which works between KThreads) and 
KSharedObject (like DRb, where several KThreads can share an instance of an 
object) this isn't much of a problem.  Also, because it relies on IO.pipe, 
Kernel.fork and File#flock I'm not sure how Windows-friendly this code is.

I'd be interested to hear any comments on this, and suggestions on how best to 
distribute the code.

-- 
Matthew       > http://www.soup-kitchen.net/
              > ICQ 19482073

Attachments (1)

kthread.rb (5.14 KB, text/x-c++src)
# Use exactly as standard Mutex class, but works between KThreads; current
# but doesn't remove temporary files after use.
# 
class KMutex
  def initialize
    @lockname = "/tmp/ruby.kmutex.#{$$}.#{self.id}"
    f = File.open(@lockname, "w").close
    @locked = false
    #ObjectSpace.define_finalizer(self, Proc.new { File.delete(@lockname) })
  end
  
  def lock(nonblock=false)
    st = Time.now
    lock = File.open(@lockname)
    locked = nil
    begin
      # Bodge because flock syscall can block!
      locked = lock.flock(File::LOCK_EX | File::LOCK_NB) == 0
      sleep rand if !nonblock && !locked
    end while !nonblock && !locked
    @lock = lock
    return locked
  end
  
  def unlock
    raise "Not Locked" if @lock.nil?
    @lock.flock(File::LOCK_UN)
    @lock.close
    @lock = nil
  end
  
  def locked?
    return true if !@lock.nil?
    l = File.open(@lockname)
    locked = l.flock(File::LOCK_NB)
    l.flock(File::LOCK_UN) if locked == 0
    return locked == false
  end

  def try_lock
    lock(true)
  end
  
  def synchronize(*args)
    self.lock
    begin
      yield(*args)
    ensure
      self.unlock
    end
  end
  
end

# Kernel Thread implementation -- use almost exactly as Ruby's native thread
# but without shared variables.
#
class KThread

  EXIT_RESULT    = "R"
  EXIT_EXCEPTION = "E"
  EXIT_CRASH     = "X"

  #@@dir     = ""
  @@threads = []

  # First-time startup code
  Kernel.at_exit {
    @@threads.each { |pid|
      Process.kill("TERM", pid)
    }
  }

  # KThread.test_io_buffer_assumption
      
  private
  def KThread.new_child
    #@@dir = "/tmp/kthread.#{$$}.#{Time.now.tv_sec}.#{Time.now.usec}"
    #Dir.mkdir(@@dir)
    @@threads.clear
  end
  
  #private
  #def KThread.tmpdir
  #  @@dir
  #end

  KThread.new_child
  
  private
  def initialize(*args)
    @finishing_lock = KMutex.new
    @res_read, res_write = IO.pipe
    @pid = Process.fork do
    
      # Close the parent's end of the pipe
      @res_read.close
    
      # Kill off other internal threads because the calling program should
      # be able to mix Threads and KThreads.
      #
      Thread.critical {
        Thread.list.each { |thr|
	  next if thr == Thread.current
	  begin
	    thr.exit
	    thr.join
	  rescue Exception => ex
	  end
	}
      }
      
      # Make sure this thread has no idea of its KThread siblings (FIXME: why 
      # does this have no effect if it's in the above Thread.critical block?
      #
      KThread.new_child
      
      # Now run the Ruby code and write its results to the pipe
      #
      begin
        res = yield(args)
	res_write.write EXIT_RESULT
	Marshal.dump(res, res_write)
      rescue Exception => ex
        print "#{ex.type}\n"
        res_write.write EXIT_EXCEPTION
        Marshal.dump(ex, res_write)
      end
    end
    
    # Close child's end of the pipe
    res_write.close
    
    @@threads << @pid
    @result = nil
  end
  
  private
  def check_finished(block)
    @finishing_lock.synchronize do
      return true if @pid.nil?

      if !block
	sel = Kernel::select([@res_read], nil, [@res_read], 0)
	return false if sel.nil?
      end

      # Result element 1: Exit status: normal, exception or child died
      @result = [@res_read.read(1)]

      # Result element 2: Return object
      case @result[0]
	when EXIT_RESULT, EXIT_EXCEPTION
	  @result << Marshal.load(@res_read)
	when nil
	  @result << nil
	else
          raise "child returned '#{code}' type '#{code.type}'"
      end

      # Result element 3: Result from wait() system call
      @result << Process.waitpid2(@pid, 0)

      @@threads.delete(@pid)
      @pid = nil

      return true
    end
  end
  
  public
  def join
    check_finished(true)
    raise @result[1] if @result[0] == EXIT_EXCEPTION
    return @result[1]
  end
  
  public
  def status
    # FIXME: sleeping?
    return check_finished(false) ? false : "run"
  end
  
  public
  def alive?
    return !check_finished(false)
  end
      
end

class KSharedObject
  # A utility class which allows you to control a single instance
  # of an object from multiple kthreads, with access properly synchronized.
  # You can either use it like so:
  #
  #    KSharedObject.new(object)
  #
  # for an existing object, or to generate the object after the fork has taken
  # place, use:
  #
  #    KSharedObject.new(ClassName, :new, arg1, arg2)
  #    KSharedObject.new(DBI, :connect, *args)
  #
  # Method requests called on the KSharedObject will be passed to the instance
  # of the object, and results passed back.  This will happen through normal
  # Ruby marshalling, so beware.  But this is enough to share DBI handles
  # etc.
  #
  private
  def initialize
    @lock = KMutex.new
    @qrd, @qwr = IO.pipe
    @ard, @awr = IO.pipe
        
    @thread = KThread.new {

      @qwr.close
      @ard.close

      obj = yield

      loop {
	sym   = Marshal.restore(@qrd)
	dargs = Marshal.restore(@qrd)
	answer = obj.send(sym, *dargs)
	@awr.write(Marshal.dump(answer))
      }
    }

    @qrd.close
    @awr.close      
  end
  
  private
  def method_missing(sym, *args)
    @lock.synchronize {
      @qwr.write(Marshal.dump(sym))
      @qwr.write(Marshal.dump(args))
      answer = Marshal.restore(@ard)
      return answer
    }
  end
end

In This Thread

Prev Next