[ruby-core:120906] [Ruby master Feature#21121] Ractor channels

Issue #21121 has been reported by luke-gru (Luke Gruber). ---------------------------------------- Feature #21121: Ractor channels https://bugs.ruby-lang.org/issues/21121 * Author: luke-gru (Luke Gruber) * Status: Open ---------------------------------------- # Motivation: It would be nice be able to `Ractor.yield` in a non-blocking way. Right now a `Ractor.yield` blocks until another ractor calls `r.take` on the yielding ractor. This is bad in the following scenario: ```ruby main = Ractor.current rs = 10.times.map do Ractor.new(main) do |m| ret = [] loop do # do a bunch of work that takes a while, then: begin obj = m.take rescue Ractor::ClosedError end if obj ret << obj else break end end ret end end 50.times do |i| Ractor.yield(i) # this will block until some ractor calls take on us, but it could be a while if there is processing before the `take` call. end main.close_outgoing # Ideally, we could do some work in main ractor that takes some time while the other ractors do their processing. But we're blocking right now # during all the calls to `Ractor.yield`. # Finally, get the results while rs.any? r, obj = Ractor.select(*rs) $stderr.puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` I'd like other ractors to be able to do work in a "fire and forget" kind of way. It isn't possible right now due to the limitations of `Ractor.yield` and `Ractor#take`. What we need is some kind of `yield` buffer so the yielder doesn't block. I propose that Ractor channels would allow this type of programming to work. ## Example using channels: ```ruby chan = Ractor::Channel.new # We could specify buffer size like Go or it could be dynamically growable rs = 10.times.map do Ractor.new(chan) do |c| ret = [] loop do obj, _closed = c.receive # returns `[nil, true]` if `c` is closed and its buffer is empty. It blocks if the buffer is empty and `c` is not closed. if obj ret << obj else break end end ret end end 50.times do |i| chan.send(i) # non-blocking, fills a buffer and only wakes a receiver if there is one end chan.close # Do some processing while ractors do their work. # Then, collect the results while rs.any? r, obj = Ractor.select(*rs) puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` With an API similar to this, we could have "fire and forget" processing with ractors. -- https://bugs.ruby-lang.org/

Issue #21121 has been updated by tenderlovemaking (Aaron Patterson). I think adding a channel object would be very helpful. I'm looking through shared data structures in Rails, trying to understand what we need to change for Ractor safety. One popular data structure is a Concurrent::Map, and there are various instances used as caches. In order to implement a similar data structure with Ractors I end up writing my own channel: ```ruby class Ractor::Channel def self.new Ractor.new do loop do Ractor.yield Ractor.receive end end end end class Map def initialize @r = Ractor.new { cache = { "int" => "integer", "bool" => "boolean" } loop do channel, key = Ractor.receive val = cache[key] ||= "not found #{key}" # Send back to requester channel.send val end } freeze end def [] key c = Ractor::Channel.new @r.send([c, key]) c.take end end map = Map.new p map["int"] ``` I would really like it if channels were built-in. ---------------------------------------- Feature #21121: Ractor channels https://bugs.ruby-lang.org/issues/21121#change-111791 * Author: luke-gru (Luke Gruber) * Status: Open ---------------------------------------- # Motivation: It would be nice be able to `Ractor.yield` in a non-blocking way. Right now a `Ractor.yield` blocks until another ractor calls `r.take` on the yielding ractor. This is bad in the following scenario: ```ruby main = Ractor.current rs = 10.times.map do Ractor.new(main) do |m| ret = [] loop do # do a bunch of work that takes a while, then: begin obj = m.take rescue Ractor::ClosedError end if obj ret << obj else break end end ret end end 50.times do |i| Ractor.yield(i) # this will block until some ractor calls take on us, but it could be a while if there is processing before the `take` call. end main.close_outgoing # Ideally, we could do some work in main ractor that takes some time while the other ractors do their processing. But we're blocking right now # during all the calls to `Ractor.yield`. # Finally, get the results while rs.any? r, obj = Ractor.select(*rs) $stderr.puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` I'd like other ractors to be able to do work in a "fire and forget" kind of way. It isn't possible right now due to the limitations of `Ractor.yield` and `Ractor#take`. What we need is some kind of `yield` buffer so the yielder doesn't block. I propose that Ractor channels would allow this type of programming to work. ## Example using channels: ```ruby chan = Ractor::Channel.new # We could specify buffer size like Go or it could be dynamically growable rs = 10.times.map do Ractor.new(chan) do |c| ret = [] loop do obj, _closed = c.receive # returns `[nil, true]` if `c` is closed and its buffer is empty. It blocks if the buffer is empty and `c` is not closed. if obj ret << obj else break end end ret end end 50.times do |i| chan.send(i) # non-blocking, fills a buffer and only wakes a receiver if there is one end chan.close # Do some processing while ractors do their work. # Then, collect the results while rs.any? r, obj = Ractor.select(*rs) puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` With an API similar to this, we could have "fire and forget" processing with ractors. -- https://bugs.ruby-lang.org/

Issue #21121 has been updated by luke-gru (Luke Gruber). I made a PoC branch here: https://github.com/luke-gru/ruby/commits/ractor_channels but it's not totally ready yet. ---------------------------------------- Feature #21121: Ractor channels https://bugs.ruby-lang.org/issues/21121#change-111826 * Author: luke-gru (Luke Gruber) * Status: Open ---------------------------------------- # Motivation: It would be nice be able to `Ractor.yield` in a non-blocking way. Right now a `Ractor.yield` blocks until another ractor calls `r.take` on the yielding ractor. This is bad in the following scenario: ```ruby main = Ractor.current rs = 10.times.map do Ractor.new(main) do |m| ret = [] loop do # do a bunch of work that takes a while, then: begin obj = m.take rescue Ractor::ClosedError end if obj ret << obj else break end end ret end end 50.times do |i| Ractor.yield(i) # this will block until some ractor calls take on us, but it could be a while if there is processing before the `take` call. end main.close_outgoing # Ideally, we could do some work in main ractor that takes some time while the other ractors do their processing. But we're blocking right now # during all the calls to `Ractor.yield`. # Finally, get the results while rs.any? r, obj = Ractor.select(*rs) $stderr.puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` I'd like other ractors to be able to do work in a "fire and forget" kind of way. It isn't possible right now due to the limitations of `Ractor.yield` and `Ractor#take`. What we need is some kind of `yield` buffer so the yielder doesn't block. I propose that Ractor channels would allow this type of programming to work. ## Example using channels: ```ruby chan = Ractor::Channel.new # We could specify buffer size like Go or it could be dynamically growable rs = 10.times.map do Ractor.new(chan) do |c| ret = [] loop do obj, _closed = c.receive # returns `[nil, true]` if `c` is closed and its buffer is empty. It blocks if the buffer is empty and `c` is not closed. if obj ret << obj else break end end ret end end 50.times do |i| chan.send(i) # non-blocking, fills a buffer and only wakes a receiver if there is one end chan.close # Do some processing while ractors do their work. # Then, collect the results while rs.any? r, obj = Ractor.select(*rs) puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` With an API similar to this, we could have "fire and forget" processing with ractors. -- https://bugs.ruby-lang.org/

Issue #21121 has been updated by shan (Shannon Skipper). This seems really handy. I wonder if it would be worth considering having an optional self-closing block variant? ``` ruby Ractor::Channel.new do |channel| # Automatically ensure `channel.close_incoming` and `channel.close_outgoing` on block close. end ``` An example with a block (not implementing the `buffer` argument) might look something like: ``` ruby class Ractor module Channel def self.new(&block) channel = Ractor.new do loop do Ractor.yield Ractor.receive end end if block_given? message = block.call(channel) channel.close_incoming channel.close_outgoing return message end channel end end end class Map def initialize @reactor = Ractor.new do cache = {} loop do channel, payload = Ractor.receive channel << cache.public_send(*payload) end end freeze end def []=(key, value) pipe __method__, key, value end %i[[] delete key? value?].each do |meth| define_method(meth) { |key| pipe meth, key } end %i[clear empty? keys length size to_a to_h values].each do |meth| define_method(meth) { pipe meth } end private def pipe(*payload) Ractor::Channel.new do |channel| @reactor << [channel, payload] channel.take end end end ``` ---------------------------------------- Feature #21121: Ractor channels https://bugs.ruby-lang.org/issues/21121#change-111990 * Author: luke-gru (Luke Gruber) * Status: Open ---------------------------------------- # Motivation: It would be nice be able to `Ractor.yield` in a non-blocking way. Right now a `Ractor.yield` blocks until another ractor calls `r.take` on the yielding ractor. This is bad in the following scenario: ```ruby main = Ractor.current rs = 10.times.map do Ractor.new(main) do |m| ret = [] loop do # do a bunch of work that takes a while, then: begin obj = m.take rescue Ractor::ClosedError end if obj ret << obj else break end end ret end end 50.times do |i| Ractor.yield(i) # this will block until some ractor calls take on us, but it could be a while if there is processing before the `take` call. end main.close_outgoing # Ideally, we could do some work in main ractor that takes some time while the other ractors do their processing. But we're blocking right now # during all the calls to `Ractor.yield`. # Finally, get the results while rs.any? r, obj = Ractor.select(*rs) $stderr.puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` I'd like other ractors to be able to do work in a "fire and forget" kind of way. It isn't possible right now due to the limitations of `Ractor.yield` and `Ractor#take`. What we need is some kind of `yield` buffer so the yielder doesn't block. I propose that Ractor channels would allow this type of programming to work. ## Example using channels: ```ruby chan = Ractor::Channel.new # We could specify buffer size like Go or it could be dynamically growable rs = 10.times.map do Ractor.new(chan) do |c| ret = [] loop do obj, _closed = c.receive # returns `[nil, true]` if `c` is closed and its buffer is empty. It blocks if the buffer is empty and `c` is not closed. if obj ret << obj else break end end ret end end 50.times do |i| chan.send(i) # non-blocking, fills a buffer and only wakes a receiver if there is one end chan.close # Do some processing while ractors do their work. # Then, collect the results while rs.any? r, obj = Ractor.select(*rs) puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` With an API similar to this, we could have "fire and forget" processing with ractors. -- https://bugs.ruby-lang.org/

Issue #21121 has been updated by luke-gru (Luke Gruber). Ruby internals uses this same trick to simplify getting `require` calls to work in non-main ractors. The code is here: https://github.com/ruby/ruby/blob/master/ractor.c#L4009. It simply starts a new thread on the main ractor, calls the require there and then sends a message over that pseudo-channel to inform the calling ractor that it's done. I don't know if that was where you got the idea or great minds just think alike :) Obviously it's a bit of a hack and is less performant than if there were proper channels in the language, but it gets the job done for now. ---------------------------------------- Feature #21121: Ractor channels https://bugs.ruby-lang.org/issues/21121#change-112265 * Author: luke-gru (Luke Gruber) * Status: Open ---------------------------------------- # Motivation: It would be nice be able to `Ractor.yield` in a non-blocking way. Right now a `Ractor.yield` blocks until another ractor calls `r.take` on the yielding ractor. This is bad in the following scenario: ```ruby main = Ractor.current rs = 10.times.map do Ractor.new(main) do |m| ret = [] loop do # do a bunch of work that takes a while, then: begin obj = m.take rescue Ractor::ClosedError end if obj ret << obj else break end end ret end end 50.times do |i| Ractor.yield(i) # this will block until some ractor calls take on us, but it could be a while if there is processing before the `take` call. end main.close_outgoing # Ideally, we could do some work in main ractor that takes some time while the other ractors do their processing. But we're blocking right now # during all the calls to `Ractor.yield`. # Finally, get the results while rs.any? r, obj = Ractor.select(*rs) $stderr.puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` I'd like other ractors to be able to do work in a "fire and forget" kind of way. It isn't possible right now due to the limitations of `Ractor.yield` and `Ractor#take`. What we need is some kind of `yield` buffer so the yielder doesn't block. I propose that Ractor channels would allow this type of programming to work. ## Example using channels: ```ruby chan = Ractor::Channel.new # We could specify buffer size like Go or it could be dynamically growable rs = 10.times.map do Ractor.new(chan) do |c| ret = [] loop do obj, _closed = c.receive # returns `[nil, true]` if `c` is closed and its buffer is empty. It blocks if the buffer is empty and `c` is not closed. if obj ret << obj else break end end ret end end 50.times do |i| chan.send(i) # non-blocking, fills a buffer and only wakes a receiver if there is one end chan.close # Do some processing while ractors do their work. # Then, collect the results while rs.any? r, obj = Ractor.select(*rs) puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` With an API similar to this, we could have "fire and forget" processing with ractors. -- https://bugs.ruby-lang.org/

Issue #21121 has been updated by matz (Yukihiro Matsumoto). Status changed from Open to Closed Superseded by #21266 ---------------------------------------- Feature #21121: Ractor channels https://bugs.ruby-lang.org/issues/21121#change-112710 * Author: luke-gru (Luke Gruber) * Status: Closed ---------------------------------------- # Motivation: It would be nice be able to `Ractor.yield` in a non-blocking way. Right now a `Ractor.yield` blocks until another ractor calls `r.take` on the yielding ractor. This is bad in the following scenario: ```ruby main = Ractor.current rs = 10.times.map do Ractor.new(main) do |m| ret = [] loop do # do a bunch of work that takes a while, then: begin obj = m.take rescue Ractor::ClosedError end if obj ret << obj else break end end ret end end 50.times do |i| Ractor.yield(i) # this will block until some ractor calls take on us, but it could be a while if there is processing before the `take` call. end main.close_outgoing # Ideally, we could do some work in main ractor that takes some time while the other ractors do their processing. But we're blocking right now # during all the calls to `Ractor.yield`. # Finally, get the results while rs.any? r, obj = Ractor.select(*rs) $stderr.puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` I'd like other ractors to be able to do work in a "fire and forget" kind of way. It isn't possible right now due to the limitations of `Ractor.yield` and `Ractor#take`. What we need is some kind of `yield` buffer so the yielder doesn't block. I propose that Ractor channels would allow this type of programming to work. ## Example using channels: ```ruby chan = Ractor::Channel.new # We could specify buffer size like Go or it could be dynamically growable rs = 10.times.map do Ractor.new(chan) do |c| ret = [] loop do obj, _closed = c.receive # returns `[nil, true]` if `c` is closed and its buffer is empty. It blocks if the buffer is empty and `c` is not closed. if obj ret << obj else break end end ret end end 50.times do |i| chan.send(i) # non-blocking, fills a buffer and only wakes a receiver if there is one end chan.close # Do some processing while ractors do their work. # Then, collect the results while rs.any? r, obj = Ractor.select(*rs) puts "Ractor #{r} got #{obj}" rs.delete(r) end ``` With an API similar to this, we could have "fire and forget" processing with ractors. -- https://bugs.ruby-lang.org/
participants (4)
-
luke-gru (Luke Gruber)
-
matz (Yukihiro Matsumoto)
-
shan (Shannon Skipper)
-
tenderlovemaking (Aaron Patterson)