做Ray Platform也快2年了,遇到过各种的问题,整理一些踩过的坑看一下。
先从我们自己最常用的Ray Data开始,看看最常见的OOM/OOD问题,这个问题很多时候都是和反压相关的。
说是Ray Data,不过这里的反压不止一层,大概包括下面几个地方:
- Ray Core Generator:针对Ray Generators的控制,防止后台生成的数据过多导致OOM/OOD。
 - Streaming Executor + Resource Allocator:
- 针对正在执行的任务,控制生成结果的速度,避免单个任务生成的数据过多导致OOM/OOD。
 - 针对单个Operator,控制提交任务的数量,避免在资源紧张时提交新任务。
 
 - Backpressure Policies: 其他关于任务提交的反压规则。
 
下面我们逐层分析这些机制的实现。
Ray Core Generator:对象数量反压
Ray Generator 类似Python Generator,用来作为迭代器进行遍历,但是和Python Generator有一个很大的不同在于:Ray Generator使用ObjectRefGenerator在后台持续执行。也就是说如果Ray Data的单个read_task需要读取一个很大的文件时,没法通过控制拉取任务产出的速度来控制任务的内存占用。(不管下游是否主动拉取,都会持续读取新的数据block。)
针对这个问题,Ray Generators支持手动配置一个threshold(_generator_backpressure_num_objects parameter)来对Generators进行反压。
核心逻辑在task_manager.cc中的HandleReportGeneratorItemReturns这个方法里面。这个函数逻辑比较复杂,里面还有比如乱序/幂等等问题的处理,我们只看反压状态的管理:
  // 请求的item的index
  int64_t item_index = request.item_index();
  // 生成器已生产的对象数量
  auto total_generated = stream_it->second.TotalNumObjectWritten();
  //已被消费的对象数量  
  auto total_consumed = stream_it->second.TotalNumObjectConsumed();
  // item已经被消费了,说明消费速度足够快,不用反压。
  if (stream_it->second.IsObjectConsumed(item_index)) {
    execution_signal_callback(Status::OK(), total_consumed);
    return false;
  }
  // Otherwise, follow the regular backpressure logic.
  // NOTE, here we check `item_index - last_consumed_index >= backpressure_threshold`,
  // instead of the number of unconsumed items, because we may receive the
  // `HandleReportGeneratorItemReturns` requests out of order.
  if (backpressure_threshold != -1 &&
      (item_index - stream_it->second.LastConsumedIndex()) >= backpressure_threshold) {
    RAY_LOG(DEBUG) << "Stream " << generator_id
                   << " is backpressured. total_generated: " << total_generated
                   << ". total_consumed: " << total_consumed
                   << ". threshold: " << backpressure_threshold;
    auto signal_it = ref_stream_execution_signal_callbacks_.find(generator_id);
    if (signal_it == ref_stream_execution_signal_callbacks_.end()) {
      execution_signal_callback(Status::NotFound("Stream is deleted."), -1);
    } else {
      signal_it->second.push_back(execution_signal_callback);
    }
  } else {
    // No need to backpressure.
    execution_signal_callback(Status::OK(), total_consumed);
  }
所以未消费对象数量达到阈值时,Ray Generator会暂停任务执行。
在Ray Data中,taskpool和actor pool都默认设置了_generator_backpressure_num_objects参数来控制数据的生成,以TaskPoolMapOperator为例:
        if (
            "_generator_backpressure_num_objects" not in dynamic_ray_remote_args
            and self.data_context._max_num_blocks_in_streaming_gen_buffer is not None
        ):
            # 2 objects for each block: the block and the block metadata.
            dynamic_ray_remote_args["_generator_backpressure_num_objects"] = (
                2 * self.data_context._max_num_blocks_in_streaming_gen_buffer
            )
Streaming Executor + Resource Allocator
虽然Ray Core提供了基础反压的接口,但是运行Ray Data任务的时候,还是有其他问题,其中最核心的问题就是是否需要消费上游算子生成的结果?
预算分配
Ray使用了预算预分配的方式,给Ray Data任务的每个operator都分配了一个预算,这个预算包括2部分:
reserved_for_op_outputs
- 为算子输出数据预留的内存空间。
 - 用来保证有足够的内存来存储算子的输出数据,防止所有预算都被pending task outputs占用。
 
_op_reserved和_op_budgets
_op_reserved:每个算子的预留资源。_op_budgets: 根据实际情况算出来的,算子可以使用的资源,大致上op_budgets[op] = max(_op_reserved[op] - 当前使用量, 0) + 分配的共享资源
预算分配的逻辑在resource_manager.py里,整个逻辑大概包括:
- 把整个object store分为reserved资源(
op_total_reserved)和shared资源(_total_shared)两部分。 - 给每个算子分配一个初始的budget(
op_total_reserved)。 - 把budget分成2份:
reserved_for_op_outputs和_op_reserved - 根据算子实际使用的内存情况,计算每个算子剩余的budget数量。(从
_op_reserved得到_op_budgets)。 - 把共享资源按需分配到各个算子的
_op_budgets。 - 特殊算子处理:对materializing算子如AllToAllOperator不做任何限制。
 
单个Task生成速度的控制
有了budget以后,就可以对Ray Data中的每个算子进行反压了,先看正在执行的Ray Generator Task的反压:
# ...
 # 对有结果产生的任务,计算还可以输出的bytes,控制任务输出。
 for task in ready_tasks:
    bytes_read = task.on_data_ready(
        max_bytes_to_read_per_op.get(state, None)
    )
    if state in max_bytes_to_read_per_op:
        max_bytes_to_read_per_op[state] -= bytes_read
# ...
其中on_data_ready会从Ray Generator消费数据,并且一旦消费的数据量达到预算限制就会停止消费:
def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
    """当数据准备就绪时的回调"""
    bytes_read = 0
    while max_bytes_to_read is None or bytes_read < max_bytes_to_read:
        try:
            block_ref = self._streaming_gen._next_sync(0)
            if block_ref.is_nil():
                break
        except StopIteration:
            self._task_done_callback(None)
            break
        
        # 处理数据块并累计读取字节数
        # bytes_read += process_block(block_ref)
    return bytes_read
预算的限制则来自max_task_output_bytes_to_read,计算逻辑就是分配的资源减去使用的资源。
    def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
        # ...
        res = self._op_budgets[op].object_store_memory
        # Add the remaining of `_reserved_for_op_outputs`.
        op_outputs_usage = self._get_op_outputs_usage_with_downstream(op)
        res += max(self._reserved_for_op_outputs[op] - op_outputs_usage, 0)
        if math.isinf(res):
            return None
        # corner case的处理,略。        
        return res        
这样就控制了每个task的Generator的消费速度,防止任何单个操作符占用过多内存。
Task提交速度的控制
除了限制单个任务的消费,Ray Data还会控制任务的提交,即在算子budget不足时停止提交该算子的任务。
这块逻辑比较简单,由streaming executor的select_operator_to_run方法控制
    ops = []
    for op, state in topology.items():
        assert resource_manager.op_resource_allocator_enabled(), topology
        under_resource_limits = (
            resource_manager.op_resource_allocator.can_submit_new_task(op)
        )
        in_backpressure = not under_resource_limits or any(
            not p.can_add_input(op) for p in backpressure_policies
        )
其中can_submit_new_task就是在判断是否有足够的资源可以提交新的任务。
    def can_submit_new_task(self, op: PhysicalOperator) -> bool:
        if op not in self._op_budgets:
            return True
        budget = self._op_budgets[op]
        res = op.incremental_resource_usage().satisfies_limit(budget)
        return res
Backpressure Policies: 其他关于任务提交的反压规则。
最后一个Backpressure Policies其实就是前面select_operator_to_run方法里提到的backpressure_policies了:
回顾一下:
        in_backpressure = not under_resource_limits or any(
            not p.can_add_input(op) for p in backpressure_policies
        )
这里目前其实只有一个策略,就是并发度的控制策略,没什么好说的,就是看一下正在运行的任务数量是否达到设置的并发上限。
class ConcurrencyCapBackpressurePolicy(BackpressurePolicy):
    """A backpressure policy that caps the concurrency of each operator.
    The policy will limit the number of concurrently running tasks based on its
    concurrency cap parameter.
    NOTE: Only support setting concurrency cap for `TaskPoolMapOperator` for now.
    TODO(chengsu): Consolidate with actor scaling logic of `ActorPoolMapOperator`.
    """
    # .....
    def can_add_input(self, op: "PhysicalOperator") -> bool:
        return op.metrics.num_tasks_running < self._concurrency_caps[op]
Last modified on 2025-07-05