[#25936] [Bug:1.9] [rubygems] $LOAD_PATH includes bin directory — Nobuyoshi Nakada <nobu@...>

Hi,

10 messages 2009/10/05

[#25943] Disabling tainting — Tony Arcieri <tony@...>

Would it make sense to have a flag passed to the interpreter on startup that

16 messages 2009/10/05

[#26028] [Bug #2189] Math.atanh(1) & Math.atanh(-1) should not raise an error — Marc-Andre Lafortune <redmine@...>

Bug #2189: Math.atanh(1) & Math.atanh(-1) should not raise an error

14 messages 2009/10/10

[#26222] [Bug #2250] IO::for_fd() objects' finalization dangerously closes underlying fds — Mike Pomraning <redmine@...>

Bug #2250: IO::for_fd() objects' finalization dangerously closes underlying fds

11 messages 2009/10/22

[#26244] [Bug #2258] Kernel#require inside rb_require() inside rb_protect() inside SysV context fails — Suraj Kurapati <redmine@...>

Bug #2258: Kernel#require inside rb_require() inside rb_protect() inside SysV context fails

24 messages 2009/10/22

[#26361] [Feature #2294] [PATCH] ruby_bind_stack() to embed Ruby in coroutine — Suraj Kurapati <redmine@...>

Feature #2294: [PATCH] ruby_bind_stack() to embed Ruby in coroutine

42 messages 2009/10/27

[#26371] [Bug #2295] segmentation faults — tomer doron <redmine@...>

Bug #2295: segmentation faults

16 messages 2009/10/27

[ruby-core:26250] Re: A challenge: Enumerator#next in JRuby

From: Ken Bloom <kbloom@...>
Date: 2009-10-23 13:47:46 UTC
List: ruby-core #26250
I'm a bit late in entering this discussion of how to implement 
Enumerators using a thread on the JVM, but I wrote this code for Groovy a 
couple years ago, focused specifically on ensuring that the thread can be 
terminated and the generator garbage collected when it's no longer 
referenced in the main thread. The trick is the judicious use of
WeakRefs.


import java.util.concurrent.*;
import java.lang.ref.*;
import groovy.lang.Closure;
import java.util.*;

public class Generator<T> implements Iterator<T>, Iterable<T>{
   Semaphore availSemaphore=new Semaphore(0);
   Semaphore emptySemaphore=new Semaphore(1);

   //the thread can push one value at at time into pushedValue
   T pushedValue=null;
   
   //pull value moves it from pushedValue to pulledValue
   //until it is released by next()
   T pulledValue=null;
   boolean hasPulledValue=false;

   Thread internalThread;

   public Generator(Closure closure){
      internalThread=new GeneratorThread<T>(this,closure);
      internalThread.setDaemon(true);
      internalThread.start();
   }

   private void pullValue(){
      availSemaphore.acquireUninterruptibly();
      pulledValue=pushedValue;
      pushedValue=null;
      hasPulledValue=true;
      emptySemaphore.release();
   }

   public boolean hasNext(){
      if (!hasPulledValue)
	 pullValue();
      return emptySemaphore.availablePermits() != 2;
   }

   public T next(){
      if (!hasNext())
	 throw new NoSuchElementException("Closure has no more values");
      T retval=pulledValue;
      hasPulledValue=false;
      return retval;
   }

   public void remove(){
      throw new UnsupportedOperationException(
	 "Remove is not supported on generators");
   }

   public Iterator<T> iterator(){
      return this;
   }

   public void finalize(){
      internalThread.interrupt();
   }

   static class GeneratorThread<T> extends Thread{
      WeakReference<Generator<T>> generatorRef;
      Closure closure;
      public GeneratorThread(Generator<T> generator, Closure cl){
	 generatorRef=new WeakReference<Generator<T>>(generator);
	 closure=cl;
      }

      public void run(){
	 closure.call(new SaveClosure<T>(this));
	 Generator generator=generatorRef.get();

	 //NOTE: when the closure completes, pullValue() will block forever
	 //waiting for more available data. This release() allows it to
	 //get in one last time, and read a variable indicating that the
	 //thread has died and isn't producing any more data. one final
	 //pullValue() run will have emptySemaphore==1 and
	 //availSemaphore==1, and it will make emptySemaphore==2 thus
	 //indicating that the thread has died
	 if (generator!=null){
	    generator.availSemaphore.release();
	 }
	 //NOTE: if the generator has been garbage collected, we don't care
	 //about letting the generator pull a termination condition.
      }
   }

   static class SaveClosure<T> extends Closure{
      WeakReference<Generator<T>> generatorRef;
      Semaphore emptySemaphore;
      Semaphore availSemaphore;
      public SaveClosure(GeneratorThread<T> gt){
	 super(gt,null);
	 generatorRef=gt.generatorRef;
	 Generator<T> generator=generatorRef.get();
	 if (generator!=null){
	    emptySemaphore=generator.emptySemaphore;
	    availSemaphore=generator.availSemaphore;
	 }else{
	    throw new GeneratorDisposedException();
	 }
      }

      public void doCall(T value){
	 try{
	    emptySemaphore.acquire();
	 }catch(InterruptedException e){
	    throw new GeneratorDisposedException();
	 }
	 Generator<T> generator=generatorRef.get();
	 if (generator!=null){
	    generator.pushedValue=value;
	 }else{
	    throw new GeneratorDisposedException();
	 }
	 availSemaphore.release();
      }
   }

   /**
    * A GeneratorDisposedException is used to terminate the thread
    * that was generating values, once the Generator has been garbage
    * collected.
    */
   static public class GeneratorDisposedException extends RuntimeException{
   }
}


-- 
Chanoch (Ken) Bloom. PhD candidate. Linguistic Cognition Laboratory.
Department of Computer Science. Illinois Institute of Technology.
http://www.iit.edu/~kbloom1/


In This Thread

Prev Next