From: marcandre-ruby-core@... Date: 2020-12-19T15:30:51+00:00 Subject: [ruby-core:101542] [Ruby master Feature#17378] Ractor#receive with filtering like other actor langauge Issue #17378 has been updated by marcandre (Marc-Andre Lafortune). Eregon (Benoit Daloze) wrote in #note-10: > Removing eagerly seems like it will inherently be problematic if there are multiple Thread calling `Ractor.receive {}`. It is imperative that a message is not processed twice though. You can change "removing" to "reserving" if that helps. Effect is same. > Re-adding a message in the Queue seems quite hacky, and it might mess up the message ordering, which would be a serious bug. Not a bug. Ordering can not be made deterministic with multi-thread & filtering: Message B is sent for thread B. Independently, message A-1 and then A-2 are sent to be consumed by thread A. Thread A calls `receive_if`, B is reserved and yielded. Thread B calls `receive_if`, A-1 is reserved and yielded. Thread C calls `receive_if`, A-2 is reserved and yielded. Thread A, C rejects message => message A-2 is yielded and accepted by A. I believe this is unavoidable and not a real issue. If order is important, it will be to the programmers to design their prototal this way, for example by passing a `not_before_id` parameter (that can be used in filtering). That being said, replace "Re-adding" by "putting back in its place" if it helps. Order would be more often maintained, but can not always be done. ---------------------------------------- Feature #17378: Ractor#receive with filtering like other actor langauge https://bugs.ruby-lang.org/issues/17378#change-89324 * Author: ko1 (Koichi Sasada) * Status: Closed * Priority: Normal ---------------------------------------- With discussion with @marcandre, we found good extension for `Ractor.receive` with block. ```ruby Ractor.receive do |msg| if msg is match to condition? true else false end end ``` This block iterates incoming queue's values and the value is passed in `msg`. If the passed value is matched you want to process, the block should return true and the value will be removed from the queue. Otherwise (returning falsy value), the value remains in the queue, and other `Ractor.receive` accesses it again. If there is not more values in queue, the interpreter sleeps until new values are received. ```ruby r = Ractor.new do |;r| loop do r, msg = Ractor.receive r << msg r << 1 r << 2 r << 3 end end r.send([Ractor.current, :ping]) # incoming queue: [:ping, 1, 2, 3] (3 is at the last) Ractor.receive #=> :ping # without a block # incoming queue: [1, 2, 3] p Ractor.receive{|msg| msg == 2} #=> 2 # incoming queue: [1, 3] begin # exception in the block doesn't touch the queue p Ractor.receive{|msg| raise "err"} rescue => e p e.message #=> "err" end # incoming queue: [1, 3] p Ractor.receive #=> 1 # incoming queue: [3] p Ractor.receive #=> 3 ``` With multiple server, we can make it: ```ruby morning_server = Ractor.new do loop do task = Proc.new{|obj| "Good morning #{obj}"} r, req = Ractor.receive res = task.(req) r.send([Ractor.current, res]) end end evening_server = Ractor.new do loop do task = Proc.new{|obj| "Good evening #{obj}"} r, req = Ractor.receive res = task.(req) r.send([Ractor.current, res]) end end morning_server << [Ractor.current, 'ko1'] morning_server << [Ractor.current, 'ko2'] morning_server << [Ractor.current, 'ko3'] evening_server << [Ractor.current, 'ko1'] evening_server << [Ractor.current, 'ko2'] evening_server << [Ractor.current, 'ko3'] def receive r Ractor.receive{|(from, msg)| r == from }[1] end p receive(morning_server) #=> "Good morning ko1" p receive(evening_server) #=> "Good evening ko1" p receive(morning_server) #=> "Good morning ko2" p receive(evening_server) #=> "Good evening ko2" p receive(morning_server) #=> "Good morning ko3" p receive(evening_server) #=> "Good evening ko3" ``` -- https://bugs.ruby-lang.org/ Unsubscribe: