[ruby-core:124895] [Ruby Feature#21930] Add Ractor#empty? method to check for pending messages without blocking
Issue #21930 has been reported by synacker (Mikhail Milovidov). ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930 * Author: synacker (Mikhail Milovidov) * Status: Open ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ``` require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
Issue #21930 has been updated by synacker (Mikhail Milovidov). Added pr: https://github.com/ruby/ruby/pull/16277 ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930#change-116555 * Author: synacker (Mikhail Milovidov) * Status: Open ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ```ruby require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
Issue #21930 has been updated by nobu (Nobuyoshi Nakada). Status changed from Open to Feedback This sounds like leading to a typical TOC/TOU problem. As for your example, why does `Worker` ractor handle both of `main_task` and dispatch alone, instead of launching each ractors? ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930#change-116556 * Author: synacker (Mikhail Milovidov) * Status: Feedback ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ```ruby require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
Issue #21930 has been updated by synacker (Mikhail Milovidov). nobu (Nobuyoshi Nakada) wrote in #note-3:
This sounds like leading to a typical TOC/TOU problem.
I appreciate the concern about a potential TOC/TOU (Time‑of‑Check to Time‑of‑Use) issue, but I believe it doesn’t apply in this specific case. Consider the following pattern: ```ruby messages = [] while !Ractor.current.empty? messages << Ractor.receive end process_batch(messages) if messages.any? ``` In this code: * The «check» (empty?) and the «use» (receive) are tightly coupled in a loop. * Even if a new message arrives after the empty? check but before the receive call, the loop will catch it on the next iteration. * The batch simply grows by one more message — no data is lost, and no invalid state is entered. * This pattern is by design: the goal is to collect all available messages at the moment of polling, not to make an atomic decision based on a single state snapshot. Thus, ```Ractor#empty?``` doesn’t introduce a new race condition — it enables a safe and efficient polling mechanism that’s already common in concurrent systems (e.g., event loops). nobu (Nobuyoshi Nakada) wrote in #note-3:
As for your example, why does `Worker` ractor handle both of `main_task` and dispatch alone, instead of launching each ractors?
You asked why the Worker Ractor handles both main_task and dispatch logic instead of launching a separate Ractor per task. Here’s why creating one Ractor per task is impractical: 1. Key drawbacks of one‑Ractor‑per‑task: * High overhead. Creating a Ractor is significantly more expensive than sending a message or scheduling a Fiber. For 10 000 tasks, spawning 10 000 Ractors would cause massive memory and scheduling overhead. * Resource exhaustion. The OS and Ruby VM have limits on concurrent threads/processes. Unbounded Ractor creation risks crashing the system or exhausting system resources. * Complex coordination. Managing 10 000+ Ractors (joining, error handling, monitoring, logging) becomes a complex task in itself, adding significant operational burden. 2. Why the Worker Ractor acts as a managed pool: The current design uses a bounded number of Ractors (typically one per CPU core) and leverages Fibers for cooperative multitasking within each Ractor. Specifically: * It receives a stream of commands (TimeCommand objects) via the Ractor message queue. * It uses the Async gem to run their task method concurrently within the same Ractor, using Fibers to achieve lightweight concurrency. This approach follows a well‑established architectural pattern for high‑load systems: * Create a fixed number of Ractors, typically matching the number of CPU cores (or a small multiple of it), to avoid OS/VM resource exhaustion. * Within each Ractor, use cooperative multitasking (Fibers, event loops, coroutines) to handle many concurrent operations efficiently. * Use ```Ractor#empty?``` as a scheduler hint — to batch work and yield control when idle — not as a security or state‑decision primitive. This pattern balances high concurrency with bounded resource usage, making it suitable for production workloads. Thank you again for the thoughtful questions — they help clarify the design rationale. Let me know if you’d like me to elaborate on any point! ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930#change-116557 * Author: synacker (Mikhail Milovidov) * Status: Feedback ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ```ruby require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
Issue #21930 has been updated by synacker (Mikhail Milovidov). nobu (Nobuyoshi Nakada) wrote in #note-3:
As for your example, why does `Worker` ractor handle both of `main_task` and dispatch alone, instead of launching each ractors?
I compared two strategies for handling concurrent tasks: 1. run1: Ractors + fibers 2. run2: only ractors Here’s the benchmark code that executed 10000 tasks using both strategies and measured real execution time:: ```ruby require 'async' require 'benchmark' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class StopCommand end class Worker attr_reader :iterations def initialize(iterations: 1000) @iterations = iterations end def run1 ractor = Ractor.new do loop do run = true Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end if command.is_a?(StopCommand) break end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? commands.each { |command| in_queue.enqueue(command) } end if commands.any? { |cmd| cmd.is_a?(StopCommand) } run = false in_queue.enqueue(StopCommand.new) break end end end end break unless run end end @iterations.times do |i| ractor.send(TimeCommand.new(i + 1)) end ractor.send(StopCommand.new) ractor.join end def run2 ractor = Ractor.new do ractors = [] loop do command = Ractor.receive if command.is_a?(StopCommand) break else ractors << Ractor.new(command) do |cmd| cmd.task end end end ractors.each(&:join) end @iterations.times do |i| ractor.send(TimeCommand.new(i + 1)) end ractor.send(StopCommand.new) ractor.join end end worker = Worker.new(iterations: 10000) p [:fibers, Benchmark.realtime { worker.run1 }.round(2)] p [:ractors, Benchmark.realtime { worker.run2 }.round(2)] ``` **Output:** Benchmark result: ``` [:fibers, 6.29] [:ractors, 58.73] ``` The results show a nearly 10‑fold performance difference. This benchmark demonstrates why the Worker Ractor uses a pooled design with Fibers instead of spawning a Ractor per task. But, for merging ractors with fibers need to add ```Ractor#empty?``` method. ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930#change-116573 * Author: synacker (Mikhail Milovidov) * Status: Feedback ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ```ruby require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
Issue #21930 has been updated by ufuk (Ufuk Kayserilioglu). @synack Which version of Ruby are you testing with? Can you please send your `ruby -v` output for the benchmark results? ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930#change-116574 * Author: synacker (Mikhail Milovidov) * Status: Feedback ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ```ruby require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
Issue #21930 has been updated by synacker (Mikhail Milovidov). ufuk (Ufuk Kayserilioglu) wrote in #note-6:
@synacker Which version of Ruby are you testing with? Can you please send your `ruby -v` output for the benchmark results?
This is a ruby version from my pr (https://github.com/ruby/ruby/pull/16277) for this feature, because I used new ```Ractor#empty?``` method in ```run1``` method: ```bash ruby 4.1.0dev (2026-03-02T21:05:21Z feature-21930 805cf8c2d2) +PRISM [x86_64-linux] ``` ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930#change-116576 * Author: synacker (Mikhail Milovidov) * Status: Feedback ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ```ruby require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
Issue #21930 has been updated by nobu (Nobuyoshi Nakada). synacker (Mikhail Milovidov) wrote in #note-4:
In this code:
* The «check» (empty?) and the «use» (receive) are tightly coupled in a loop. * Even if a new message arrives after the empty? check but before the receive call, the loop will catch it on the next iteration. * The batch simply grows by one more message — no data is lost, and no invalid state is entered. * This pattern is by design: the goal is to collect all available messages at the moment of polling, not to make an atomic decision based on a single state snapshot.
It would work for **your code**, but may not for general purposes. And if `main_task` finished immediately and no command is coming, it will be a busy loop. I guess what you want is non-blocking (and maybe bulk) read, right? ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930#change-116586 * Author: synacker (Mikhail Milovidov) * Status: Feedback ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ```ruby require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
Issue #21930 has been updated by byroot (Jean Boussier). I don't have a string opinion on whether `empty?` is useful, that being said it's present on `Thread::Queue` and I support trying to mirror the API as much as possible. But `empty?` alone isn't that helpful because of TOC/TOU problem as mentioned, so it only make sense if we also get a non blocking pop/push like `Thread::Queue` has. I think @etienne and @jhawthorn were looking into that recently? ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930#change-116588 * Author: synacker (Mikhail Milovidov) * Status: Feedback ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ```ruby require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
Issue #21930 has been updated by synacker (Mikhail Milovidov). nobu (Nobuyoshi Nakada) wrote in #note-8:
synacker (Mikhail Milovidov) wrote in #note-4:
In this code:
* The «check» (empty?) and the «use» (receive) are tightly coupled in a loop. * Even if a new message arrives after the empty? check but before the receive call, the loop will catch it on the next iteration. * The batch simply grows by one more message — no data is lost, and no invalid state is entered. * This pattern is by design: the goal is to collect all available messages at the moment of polling, not to make an atomic decision based on a single state snapshot.
It would work for **your code**, but may not for general purposes. And if `main_task` finished immediately and no command is coming, it will be a busy loop. I guess what you want is non-blocking (and maybe bulk) read, right?
Thank you for the feedback. Ractor#empty? isn’t a niche fix - it’s a general‑purpose primitive for efficient schedulers and Ractor‑Fiber integration. The code doesn’t cause a busy loop because ```Ractor.receive``` blocks when the queue is empty. This method enables non‑blocking batching, complementing my other PR (https://bugs.ruby-lang.org/issues/21869) to improve the Ractor API for using it with cooperative multitasking. ---------------------------------------- Feature #21930: Add Ractor#empty? method to check for pending messages without blocking https://bugs.ruby-lang.org/issues/21930#change-116589 * Author: synacker (Mikhail Milovidov) * Status: Feedback ---------------------------------------- **Summary** In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API **Motivation** The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations: * It blocks the current thread until a message arrives. * It doesn’t offer a non‑blocking way to check the message queue. * This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems). As a result, developers must either: * Accept thread blocking (hurting responsiveness). * Implement complex workarounds with timeouts or auxiliary queues. **Proposed solution** Add Ractor#empty? to the Ractor API. The method should: * Return true if there are no pending messages in the Ractor’s main queue. * Return false if there is at least one message available for processing. * Not block the calling thread under any circumstances. * Be safe to call from any Ractor (including the current one). **Demonstration code** Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem: ```ruby require 'async' class TimeCommand attr_reader :id def initialize(id) @id = id end def task 1.upto(3) do |i| sleep(1) puts "[cmd #{@id}] step #{i} @ #{Time.now}" end end end class Worker def initialize @ractor = Ractor.new do loop do Sync do |task| in_queue = Async::Queue.new queue_task = task.async do |subtask| while command = in_queue.dequeue subtask.async do |child_task| command.task end end end task.async(transient: true) do |main_task| loop do commands = [] if queue_task.children? || !in_queue.empty? main_task.yield commands.append Ractor.receive while !Ractor.current.empty? else commands.append Ractor.receive end unless commands.empty? puts "Worker received batch of #{commands.size} commands." commands.each { |command| in_queue.enqueue(command) } end end end end end end end def send(command) @ractor.send(command, move: true) end def wait @ractor.join end end worker = Worker.new 1000.times do |i| 100.times do |j| worker.send TimeCommand.new(i * 10 + j) end sleep(1) end worker.wait ``` **Key observations:** With Ractor#empty?, developers can: * Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally. * Avoid thread blocking when checking for incoming messages. * Batch process messages efficiently (collect all pending messages in one go). * Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available. **Benefits** * Enables better integration with modern Ruby concurrency tools. * Reduces need for complex workarounds. * Improves performance in message‑driven architectures. * Maintains Ractor’s thread‑safety guarantees. -- https://bugs.ruby-lang.org/
participants (4)
-
byroot (Jean Boussier) -
nobu (Nobuyoshi Nakada) -
synacker (Mikhail Milovidov) -
ufuk (Ufuk Kayserilioglu)