[ruby-core:124711] [Ruby Feature#21869] Add receive_all Method to Ractor API for Message Batching
Issue #21869 has been reported by synacker (Mikhail Milovidov). ---------------------------------------- Feature #21869: Add receive_all Method to Ractor API for Message Batching https://bugs.ruby-lang.org/issues/21869 * Author: synacker (Mikhail Milovidov) * Status: Open ---------------------------------------- **Summary** The Ractor API provides an excellent mechanism for inter‑thread communication, but it currently lacks a built‑in message batching technique. I propose adding a receive_all method to enable batch processing of messages, which can significantly improve performance in high‑load scenarios. **Motivation** In distributed queued systems, processing messages one‑by‑one (as with the current receive method) can introduce unnecessary overhead. Batch processing allows: Reduced context‑switching overhead. More efficient I/O operations (e.g., fewer file writes). Better throughput in high‑concurrency environments. **Proposed Solution** Add a receive_all method to the Ractor API that: Returns all available messages in the Ractor’s mailbox at once (as an array). **Demonstration Code** Below is a benchmark comparing individual receive vs. batch receive_all: ``` ruby require 'benchmark' class RactorsTest def initialize(count) @count = count @ractor1 = Ractor.new(count, 'output1.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? message = receive file.write("Ractor 1 received message: #{message}\n") file.flush count -= 1 end end end @ractor2 = Ractor.new(count, 'output2.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? messages = receive_all messages.each do |message| file.write("Ractor 2 received message: #{message}\n") end count -= messages.length file.flush end end end end def run1 @count.times do |i| @ractor1.send("Message #{i + 1}") end @ractor1.join end def run2 @count.times do |i| @ractor2.send("Message #{i + 1}") end @ractor2.join end end records = 1_000_000 test = RactorsTest.new(records) p [:once, Benchmark.realtime { test.run1 }.round(2)] p [:all, Benchmark.realtime { test.run2 }.round(2)] ``` **Benchmark Results** On my system, receive_all shows ~4x improvement over individual receive: **Key Observations:** Ractor1 (using receive): Processes each message individually, resulting in frequent I/O calls. Ractor2 (using receive_all): Processes all queued messages at once, minimizing I/O overhead -- https://bugs.ruby-lang.org/
Issue #21869 has been updated by synacker (Mikhail Milovidov). PR in github: https://github.com/ruby/ruby/pull/16105 ---------------------------------------- Feature #21869: Add receive_all Method to Ractor API for Message Batching https://bugs.ruby-lang.org/issues/21869#change-116308 * Author: synacker (Mikhail Milovidov) * Status: Open ---------------------------------------- **Summary** The Ractor API provides an excellent mechanism for inter‑thread communication, but it currently lacks a built‑in message batching technique. I propose adding a receive_all method to enable batch processing of messages, which can significantly improve performance in high‑load scenarios. **Motivation** In distributed queued systems, processing messages one‑by‑one (as with the current receive method) can introduce unnecessary overhead. Batch processing allows: Reduced context‑switching overhead. More efficient I/O operations (e.g., fewer file writes). Better throughput in high‑concurrency environments. **Proposed Solution** Add a receive_all method to the Ractor API that: Returns all available messages in the Ractor’s mailbox at once (as an array). **Demonstration Code** Below is a benchmark comparing individual receive vs. batch receive_all: ``` ruby require 'benchmark' class RactorsTest def initialize(count) @count = count @ractor1 = Ractor.new(count, 'output1.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? message = receive file.write("Ractor 1 received message: #{message}\n") file.flush count -= 1 end end end @ractor2 = Ractor.new(count, 'output2.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? messages = receive_all messages.each do |message| file.write("Ractor 2 received message: #{message}\n") end count -= messages.length file.flush end end end end def run1 @count.times do |i| @ractor1.send("Message #{i + 1}") end @ractor1.join end def run2 @count.times do |i| @ractor2.send("Message #{i + 1}") end @ractor2.join end end records = 1_000_000 test = RactorsTest.new(records) p [:once, Benchmark.realtime { test.run1 }.round(2)] p [:all, Benchmark.realtime { test.run2 }.round(2)] ``` **Benchmark Results** On my system, receive_all shows ~4x improvement over individual receive: **Key Observations:** Ractor1 (using receive): Processes each message individually, resulting in frequent I/O calls. Ractor2 (using receive_all): Processes all queued messages at once, minimizing I/O overhead -- https://bugs.ruby-lang.org/
Issue #21869 has been updated by ko1 (Koichi Sasada). Does it block when the queue is empty or returns nil? (I think the example expects blocking) ---------------------------------------- Feature #21869: Add receive_all Method to Ractor API for Message Batching https://bugs.ruby-lang.org/issues/21869#change-116357 * Author: synacker (Mikhail Milovidov) * Status: Open ---------------------------------------- **Summary** The Ractor API provides an excellent mechanism for inter‑thread communication, but it currently lacks a built‑in message batching technique. I propose adding a receive_all method to enable batch processing of messages, which can significantly improve performance in high‑load scenarios. **Motivation** In distributed queued systems, processing messages one‑by‑one (as with the current receive method) can introduce unnecessary overhead. Batch processing allows: Reduced context‑switching overhead. More efficient I/O operations (e.g., fewer file writes). Better throughput in high‑concurrency environments. **Proposed Solution** Add a receive_all method to the Ractor API that: Returns all available messages in the Ractor’s mailbox at once (as an array). **Demonstration Code** Below is a benchmark comparing individual receive vs. batch receive_all: ``` ruby require 'benchmark' class RactorsTest def initialize(count) @count = count @ractor1 = Ractor.new(count, 'output1.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? message = receive file.write("Ractor 1 received message: #{message}\n") file.flush count -= 1 end end end @ractor2 = Ractor.new(count, 'output2.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? messages = receive_all messages.each do |message| file.write("Ractor 2 received message: #{message}\n") end count -= messages.length file.flush end end end end def run1 @count.times do |i| @ractor1.send("Message #{i + 1}") end @ractor1.join end def run2 @count.times do |i| @ractor2.send("Message #{i + 1}") end @ractor2.join end end records = 1_000_000 test = RactorsTest.new(records) p [:once, Benchmark.realtime { test.run1 }.round(2)] p [:all, Benchmark.realtime { test.run2 }.round(2)] ``` **Benchmark Results** On my system, receive_all shows ~4x improvement over individual receive: **Key Observations:** Ractor1 (using receive): Processes each message individually, resulting in frequent I/O calls. Ractor2 (using receive_all): Processes all queued messages at once, minimizing I/O overhead -- https://bugs.ruby-lang.org/
Issue #21869 has been updated by Eregon (Benoit Daloze). synacker (Mikhail Milovidov) wrote:
More efficient I/O operations (e.g., fewer file writes).
Is it? In your example you call `file.write` for each message in both cases. But you also call `file.flush` after each `file.write` in ractor1 and only only once per batch in ractor2. Could you benchmark without the `file.flush`s? I suspect the difference is much smaller then. I understand the idea that batching helps in this case where you want to explicitly flush, but that's a pretty specific example, e.g. it's uncommon to even call IO#flush at all in Ruby. One could also flush after N messages/bytes in ractor1. ---------------------------------------- Feature #21869: Add receive_all Method to Ractor API for Message Batching https://bugs.ruby-lang.org/issues/21869#change-116363 * Author: synacker (Mikhail Milovidov) * Status: Open ---------------------------------------- **Summary** The Ractor API provides an excellent mechanism for inter‑thread communication, but it currently lacks a built‑in message batching technique. I propose adding a receive_all method to enable batch processing of messages, which can significantly improve performance in high‑load scenarios. **Motivation** In distributed queued systems, processing messages one‑by‑one (as with the current receive method) can introduce unnecessary overhead. Batch processing allows: Reduced context‑switching overhead. More efficient I/O operations (e.g., fewer file writes). Better throughput in high‑concurrency environments. **Proposed Solution** Add a receive_all method to the Ractor API that: Returns all available messages in the Ractor’s mailbox at once (as an array). **Demonstration Code** Below is a benchmark comparing individual receive vs. batch receive_all: ``` ruby require 'benchmark' class RactorsTest def initialize(count) @count = count @ractor1 = Ractor.new(count, 'output1.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? message = receive file.write("Ractor 1 received message: #{message}\n") file.flush count -= 1 end end end @ractor2 = Ractor.new(count, 'output2.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? messages = receive_all messages.each do |message| file.write("Ractor 2 received message: #{message}\n") end count -= messages.length file.flush end end end end def run1 @count.times do |i| @ractor1.send("Message #{i + 1}") end @ractor1.join end def run2 @count.times do |i| @ractor2.send("Message #{i + 1}") end @ractor2.join end end records = 1_000_000 test = RactorsTest.new(records) p [:once, Benchmark.realtime { test.run1 }.round(2)] p [:all, Benchmark.realtime { test.run2 }.round(2)] ``` **Benchmark Results** On my system, receive_all shows ~4x improvement over individual receive: **Key Observations:** Ractor1 (using receive): Processes each message individually, resulting in frequent I/O calls. Ractor2 (using receive_all): Processes all queued messages at once, minimizing I/O overhead -- https://bugs.ruby-lang.org/
Issue #21869 has been updated by synacker (Mikhail Milovidov). ko1 (Koichi Sasada) wrote in #note-2:
Does it block when the queue is empty or returns \[\]? (I think the example expects blocking)
Yes, it blocks if the queue empty. The method ```receive_all``` accepts a limit parameter: 1. ```limit > 0```: collects up to ```limit``` messages (may return fewer if fewer are queued). Block if the queue empty. 2. ```limit == 0```: returns an empty array immediately (no blocking) 3. ```limit <0``` or ```nil``` (default): returns all messages from the queue or blocks if the queue is empty Eregon (Benoit Daloze) wrote in #note-3:
I understand the idea that batching helps in this case where you want to explicitly flush, but that's a pretty specific example, e.g. it's uncommon to even call IO#flush at all in Ruby.
The example demonstrates how you can collect messages during long I/O operations and batch them together to reduce the number of subsequent I/O calls. The ```file.flush``` call simulates a long I/O operation - it could equally represent a database call or something like that ---------------------------------------- Feature #21869: Add receive_all Method to Ractor API for Message Batching https://bugs.ruby-lang.org/issues/21869#change-116366 * Author: synacker (Mikhail Milovidov) * Status: Open ---------------------------------------- **Summary** The Ractor API provides an excellent mechanism for inter‑thread communication, but it currently lacks a built‑in message batching technique. I propose adding a receive_all method to enable batch processing of messages, which can significantly improve performance in high‑load scenarios. **Motivation** In distributed queued systems, processing messages one‑by‑one (as with the current receive method) can introduce unnecessary overhead. Batch processing allows: Reduced context‑switching overhead. More efficient I/O operations (e.g., fewer file writes). Better throughput in high‑concurrency environments. **Proposed Solution** Add a receive_all method to the Ractor API that: Returns all available messages in the Ractor’s mailbox at once (as an array). **Demonstration Code** Below is a benchmark comparing individual receive vs. batch receive_all: ``` ruby require 'benchmark' class RactorsTest def initialize(count) @count = count @ractor1 = Ractor.new(count, 'output1.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? message = receive file.write("Ractor 1 received message: #{message}\n") file.flush count -= 1 end end end @ractor2 = Ractor.new(count, 'output2.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? messages = receive_all messages.each do |message| file.write("Ractor 2 received message: #{message}\n") end count -= messages.length file.flush end end end end def run1 @count.times do |i| @ractor1.send("Message #{i + 1}") end @ractor1.join end def run2 @count.times do |i| @ractor2.send("Message #{i + 1}") end @ractor2.join end end records = 1_000_000 test = RactorsTest.new(records) p [:once, Benchmark.realtime { test.run1 }.round(2)] p [:all, Benchmark.realtime { test.run2 }.round(2)] ``` **Benchmark Results** On my system, receive_all shows ~4x improvement over individual receive: **Key Observations:** Ractor1 (using receive): Processes each message individually, resulting in frequent I/O calls. Ractor2 (using receive_all): Processes all queued messages at once, minimizing I/O overhead -- https://bugs.ruby-lang.org/
Issue #21869 has been updated by synacker (Mikhail Milovidov). Eregon (Benoit Daloze) wrote in #note-3:
Is it? In your example you call `file.write` for each message in both cases.
But you also call `file.flush` after each `file.write` in ractor1 and only only once per batch in ractor2.
This is also a realistic scenario. To guarantee messages are saved to file, you'd normally need to call ```flush``` after each message - but that's inefficient when processing single messages. ---------------------------------------- Feature #21869: Add receive_all Method to Ractor API for Message Batching https://bugs.ruby-lang.org/issues/21869#change-116367 * Author: synacker (Mikhail Milovidov) * Status: Open ---------------------------------------- **Summary** The Ractor API provides an excellent mechanism for inter‑thread communication, but it currently lacks a built‑in message batching technique. I propose adding a receive_all method to enable batch processing of messages, which can significantly improve performance in high‑load scenarios. **Motivation** In distributed queued systems, processing messages one‑by‑one (as with the current receive method) can introduce unnecessary overhead. Batch processing allows: Reduced context‑switching overhead. More efficient I/O operations (e.g., fewer file writes). Better throughput in high‑concurrency environments. **Proposed Solution** Add a receive_all method to the Ractor API that: Returns all available messages in the Ractor’s mailbox at once (as an array). **Demonstration Code** Below is a benchmark comparing individual receive vs. batch receive_all: ``` ruby require 'benchmark' class RactorsTest def initialize(count) @count = count @ractor1 = Ractor.new(count, 'output1.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? message = receive file.write("Ractor 1 received message: #{message}\n") file.flush count -= 1 end end end @ractor2 = Ractor.new(count, 'output2.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? messages = receive_all messages.each do |message| file.write("Ractor 2 received message: #{message}\n") end count -= messages.length file.flush end end end end def run1 @count.times do |i| @ractor1.send("Message #{i + 1}") end @ractor1.join end def run2 @count.times do |i| @ractor2.send("Message #{i + 1}") end @ractor2.join end end records = 1_000_000 test = RactorsTest.new(records) p [:once, Benchmark.realtime { test.run1 }.round(2)] p [:all, Benchmark.realtime { test.run2 }.round(2)] ``` **Benchmark Results** On my system, receive_all shows ~4x improvement over individual receive: **Key Observations:** Ractor1 (using receive): Processes each message individually, resulting in frequent I/O calls. Ractor2 (using receive_all): Processes all queued messages at once, minimizing I/O overhead -- https://bugs.ruby-lang.org/
Issue #21869 has been updated by byroot (Jean Boussier).
I understand the idea that batching helps in this case where you want to explicitly flush, but that's a pretty specific example, e.g. it's uncommon to even call IO#flush at all in Ruby.
Not specific to Ractor, but I relatively often had a similar use case with `Thread::Queue`. When implementing instrumentation libraries (e.g. `statsd` or `opentelemetry`) what you generally want to do is to push as much work as possible to a background thread (possibly ractor in the future), but you also want this background thread to serialize and send packets in batch. In such context, being able to pop `1..N` elements from the queue would be convenient. Currently with queue it's semi-doable by first doing a blocking pop, followed by a bounded number of non-blocking one. ---------------------------------------- Feature #21869: Add receive_all Method to Ractor API for Message Batching https://bugs.ruby-lang.org/issues/21869#change-116631 * Author: synacker (Mikhail Milovidov) * Status: Open * Assignee: ractor ---------------------------------------- **Summary** The Ractor API provides an excellent mechanism for inter‑thread communication, but it currently lacks a built‑in message batching technique. I propose adding a receive_all method to enable batch processing of messages, which can significantly improve performance in high‑load scenarios. **Motivation** In distributed queued systems, processing messages one‑by‑one (as with the current receive method) can introduce unnecessary overhead. Batch processing allows: Reduced context‑switching overhead. More efficient I/O operations (e.g., fewer file writes). Better throughput in high‑concurrency environments. **Proposed Solution** Add a receive_all method to the Ractor API that: Returns all available messages in the Ractor’s mailbox at once (as an array). **Demonstration Code** Below is a benchmark comparing individual receive vs. batch receive_all: ``` ruby require 'benchmark' class RactorsTest def initialize(count) @count = count @ractor1 = Ractor.new(count, 'output1.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? message = receive file.write("Ractor 1 received message: #{message}\n") file.flush count -= 1 end end end @ractor2 = Ractor.new(count, 'output2.txt') do |count, filename| File.open(filename, 'w') do |file| while count.positive? messages = receive_all messages.each do |message| file.write("Ractor 2 received message: #{message}\n") end count -= messages.length file.flush end end end end def run1 @count.times do |i| @ractor1.send("Message #{i + 1}") end @ractor1.join end def run2 @count.times do |i| @ractor2.send("Message #{i + 1}") end @ractor2.join end end records = 1_000_000 test = RactorsTest.new(records) p [:once, Benchmark.realtime { test.run1 }.round(2)] p [:all, Benchmark.realtime { test.run2 }.round(2)] ``` **Benchmark Results** On my system, receive_all shows ~4x improvement over individual receive: **Key Observations:** Ractor1 (using receive): Processes each message individually, resulting in frequent I/O calls. Ractor2 (using receive_all): Processes all queued messages at once, minimizing I/O overhead -- https://bugs.ruby-lang.org/
participants (4)
-
byroot (Jean Boussier) -
Eregon (Benoit Daloze) -
ko1 (Koichi Sasada) -
synacker (Mikhail Milovidov)