Ray 实战:节点级 Actor Pool、模型预热与节点调度

一、问题背景

在上一篇《Ray 应用场景一例》中,我介绍了如何使用 Ray 内置的 node:IP 资源,把任务绑定到指定节点。那篇文章要解决的核心问题是:当任务依赖节点本地文件或节点本地环境时,如何确保后续步骤落到正确的机器上执行。

但在当前项目里,这还不够。

这个项目的执行链路大致如下:

  1. 先把音频或视频下载到某个节点的本地磁盘
  2. 再使用该节点上的 GPU 模型做说话人分离、ASR 和视频分析
  3. 最后生成报告并回调业务系统

如果继续沿用第一篇文章里的思路,节点绑定本身依然是成立的,但会进一步暴露出两个新问题:

  1. 每次 GPU 任务启动时都要重新加载模型,初始化成本高
  2. 调用方需要自己处理节点绑定、Actor 选择和空闲资源调度,工程上比较粗糙

所以,本文并不是要推翻上一篇文章里的方案,而是在那个思路之上继续往前走一步:在“节点绑定”已经成立的前提下,引入节点级 Actor Pool 做模型预热,并通过 NodeAffinitySchedulingStrategy 实现更稳定的节点级资源调度。

需要先说明一点:本文中的 Pipeline 设计来自一个具体项目实现,不是 Ray 的固定范式。文中提到的某些约束,例如 step_d 固定在 Head 节点执行、step_c 不做节点绑定,都是当前项目的工程选择,而不是 Actor Pool 方案本身的必然要求。

可以把两篇文章的关系简单理解为:

方案 主要解决的问题 模型预热 调度灵活性 实现复杂度
node:IP 资源绑定 任务如何落到正确节点 ⚠️ 可做但通常会退回任务级初始化 ⭐⭐ 简单
节点级 Actor Pool + NodeAffinity 节点绑定基础上的模型复用与节点内调度 ✅ 预加载复用 ⭐⭐⭐⭐ 中等

二、方案目标

这套方案要解决的,不再只是“如何把任务丢给 Ray”这么简单,而是在上一篇文章已经解决“节点绑定”问题的基础上,同时满足下面几个工程目标:

  • 模型预热:GPU 模型在 Actor 初始化阶段加载,后续任务直接复用
  • 数据本地性:下载到本地的媒体文件,尽量在同一节点完成后续 GPU 分析
  • 节点级隔离:每个节点独立维护自己的 GPU 执行单元,不互相干扰
  • 调用方简化:调用方只需要知道“把任务交给哪个节点”,不需要自己挑 Actor
  • 满载可排队:节点上的 GPU Actor 忙碌时,新任务进入等待,而不是调用方自己重试

核心思路可以概括为三点:

  1. Actor 预热:在 Actor 初始化时加载模型,Actor 持久存活,跨任务复用
  2. 节点级 Actor Pool:每个节点独立管理自己的热 Actor 池,避免跨节点搬运大文件
  3. 零资源 Manager:用一个几乎只负责调度的 Manager Actor 管理本节点 GPU Actor

三、整体设计

1、为什么不用现成的 ray.util.actor_pool

Ray 提供了 ray.util.actor_pool,但官方文档明确指出它不是线程安全的。这并不意味着它不能用,而是说在当前这种多任务共享同一个调度器的场景下,直接拿来做节点内资源分发并不稳妥。

更关键的是,这个项目里的 GPU 执行单元需要同时具备以下特性:

特性 说明
模型预热 Actor 初始化时加载 Sherpa(ASR)和 MiniCPM(视频分析)模型
持久存活 lifetime="detached",Actor 不随单次任务结束而销毁
节点绑定 通过 NodeAffinitySchedulingStrategy 确保 Actor 固定在指定节点
排队机制 满载时新请求自动等待,无需调用方处理

换句话说,这里需要的不是一个简单的“Actor 列表”,而是一个能长期运行、具备节点亲和性和排队能力的节点级热池。

2、为什么这里不优先用 Placement Group

看到这里,一个很自然的问题是:这类“多个步骤尽量落到同一节点”的需求,是否也可以用 Ray Placement Group 来实现?

答案是:可以解决一部分问题,但不完全适合当前这个场景。

Placement Group 更擅长做的是成组资源预留。例如:

  • 需要为一组 Task/Actor 一次性申请 CPU + GPU + 内存 资源
  • 希望这些资源通过 PACKSTRICT_PACKSPREAD 等策略集中或分散放置
  • 希望一组任务在调度时共享同一份资源预留结果

如果只是想表达“这一组任务最好放在同一节点”或者“这一组 Actor 需要一起占住若干资源”,Placement Group 是合适的。

但在本文这个场景里,真正的目标并不只是“同节点”,而是同时满足下面几件事:

  1. step_a 先落到某个节点并把媒体文件下载到本地
  2. step_b 再回到这个已经确定的节点
  3. 该节点上最好已经有预热完成的长期存活 GPU Actor
  4. 多个请求到来时,能够在该节点内部自动排队并复用这些 Actor

这时,Placement Group 就会显得不那么贴手。

一方面,如果为每个请求单独创建一个 Placement Group,确实可以把一组步骤尽量绑定到同一节点,但代价也很明显:

  • 更像是“为单次请求临时占一组资源”,而不是“复用一个长期存在的节点级 GPU 池”
  • 模型预热收益不明显,容易退回到“每组任务自己拉起一套执行单元”
  • 使用 STRICT_PACK 一类策略时,更容易出现资源碎片和排队放大

另一方面,如果尝试用 Placement Group + 长期存活 Actor 来做常驻池,也不是完全不行,但实现上通常会变复杂:

  • 你仍然需要自己维护“哪个节点上有哪些热 Actor”
  • 你仍然需要解决“空闲 Actor 选择”和“满载等待”问题
  • 最后很容易演化成和本文类似的节点级池,只是中间多包了一层 Placement Group

这里还需要区分一个容易混淆的点:RayActor 本身当然也会占用资源,例如一个 Actor 声明 num_gpus=1, num_cpus=1,创建成功后就会长期占住这一份资源。从结果上看,这确实也带有某种“资源打包”的味道。

但它和 Placement Group 仍然不是同一种东西。

  • Placement Group 更像“先预留一组资源,再把任务或 Actor 放进去”
  • RayActor 更像“直接创建一个长期存活的执行单元,由这个执行单元自己占住资源”

也可以换个角度理解:

  • Placement Group 关注的是资源集合的预留与放置
  • 节点级 Actor Pool 关注的是长期执行单元的复用与调度

因此,本文并不是说 Actor 方案“不占资源”或者“不是资源组织方式”,而是说它的抽象层级和使用目标与 Placement Group 不同。

换句话说,在这个场景里:

  • Placement Group 更像资源预留工具
  • 节点级 Actor Pool 更像长期运行的节点内执行池

两者不是互斥关系,但关注点不同。

如果你的目标是:

  • 给单次任务或单批任务临时预留一组资源
  • 明确要求一组 Task/Actor 成组调度
  • 不太关心长期模型预热和节点内空闲池复用

那么 Placement Group 很合适。

如果你的目标是:

  • 下载先决定数据落点
  • 后续 GPU 步骤回到该节点
  • 节点上有长期存活且已经预热的 Actor
  • 多个请求共享这个节点内的热池

那么本文这种“节点级 Actor Pool + NodeAffinity”的方式通常更自然。

一句话总结:Placement Group 更适合“为一组任务预留一组资源”;节点级 Actor Pool 更适合“长期复用一批已经预热好的执行单元”。

3、为什么要引入 Manager Actor

既然没有优先选择 Placement Group,接下来的问题就是:如果直接创建多个 GPU Actor,是否已经足够?

答案通常也是否定的,因为调用方仍然需要自己处理这些问题:

  1. 当前节点上有多少个 Actor
  2. 这些 Actor 分别叫什么名字
  3. 哪个 Actor 当前空闲
  4. 所有 Actor 都忙时该怎么等待

引入 Manager Actor 之后,调用方只需要关心“把任务交给这个节点”,不再需要感知底层有多少个 Actor、哪个 Actor 当前空闲。例如:

1
2
manager = ray.get_actor(f"RayActorPoolManagerActor_{node_id}")
result = manager.run.remote(...) # 内部自动调度到空闲 Actor

这样一来,调用方和资源调度逻辑就解耦了。

4、节点级 Actor Pool 长什么样

每个节点运行一个 Manager Actor,名字中包含 node_id。Manager Actor 下面挂着多个 GPU Actor,每个 Actor 独占一个 GPU。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
┌────────────────────────────────────────────────────────────────────────────┐
│ Ray Cluster │
│ │
│ ┌─────────────────────────────────┐ ┌─────────────────────────────────┐ │
│ │ Head Node │ │ Worker Node │ │
│ │ │ │ │ │
│ │ RayActorPoolManagerActor │ │ RayActorPoolManagerActor │ │
│ │ _{head_node_id} │ │ _{worker_node_id} │ │
│ │ ↓ │ │ ↓ │ │
│ │ ┌──────┬──────┬──────┬──────┐ │ │ ┌──────┬──────┬──────┬──────┐ │ │
│ │ │Actor │Actor │Actor │Actor │ │ │ │Actor │Actor │Actor │Actor │ │ │
│ │ │_0 │_1 │_2 │_3 │ │ │ │_0 │_1 │_2 │_3 │ │ │
│ │ └──────┴──────┴──────┴──────┘ │ │ └──────┴──────┴──────┴──────┘ │ │
│ │ 4 GPUs │ │ 4 GPUs │ │
│ └─────────────────────────────────┘ └─────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘

这样做有三个直接收益:

  • 数据本地性:视频下载后保存在节点本地,GPU 分析直接读取,无需跨节点传输
  • 节点隔离:每个节点独立管理 GPU 资源,不互相干扰
  • 弹性扩展:新增 Worker 只需加入集群并初始化一次热池

四、调度链路

这里的关键不是“把所有步骤都绑定到同一节点”,而是只把真正依赖本地文件的步骤绑定回下载节点对应的热池。

先看一张简化后的时序图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sequenceDiagram
participant P as Pipeline
participant A as step_a
participant M as Manager Actor
participant B as RayActor(step_b)
participant L as step_c
participant D as step_d

P->>A: 提交任务
A->>A: 下载音频/视频到本地
A-->>P: 返回 node_id
P->>M: 根据 node_id 获取对应节点的热池
M->>B: 分配一个空闲 GPU Actor
B->>B: 说话人分离 / ASR / 视频分析
B-->>P: step_b 完成
P->>L: 调用 LLM
L-->>P: 返回 model_output
P->>D: 生成报告并回调

当前项目里,核心链路是这样的:

1
step_a → 返回 node_id → 根据 node_id 找到 Manager → Manager.run() → step_b

对应的含义是:

  • step_a 负责下载音频或视频,并返回它实际运行的 node_id
  • step_b 依赖 step_a 下载下来的本地文件,因此必须回到同一节点执行
  • step_c 调用 LLM,不再依赖本地媒体文件
  • step_d 生成报告和成功回调,当前实现固定在 Head 节点执行

也就是说,这套设计真正想保证的是:

下载发生在哪个节点,后续依赖本地媒体文件的 GPU 分析就跟到哪个节点。

调用代码非常直接:

1
2
3
4
5
6
7
8
9
# step_a 返回运行节点的 node_id
step_a_result = ray.get(step_a.remote(task_payload))

# 根据 node_id 找到对应节点的热池
manager_name = f"RayActorPoolManagerActor_{step_a_result}"
manager = ray.get_actor(manager_name, namespace="global")

# step_b 自动调度到 step_a 所在节点的空闲 GPU Actor
step_b_result = manager.run.remote(task_payload, step_a_result)

五、代码实现

1、Manager Actor

先看零资源的 Manager Actor,它负责维护当前节点上的热池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import ray
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


@ray.remote(num_cpus=0, num_gpus=0)
class RayActorPoolManagerActor:
def __init__(self, node_id, actor_count):
# ray.util.actor_pool 不是线程安全的,故使用 asyncio.Queue
self.idle_actors = asyncio.Queue(maxsize=actor_count)
for i in range(actor_count):
name = f"Actor_{node_id}_{i}"
actor = RayActor.options(
name=name,
namespace="global",
get_if_exists=True,
lifetime="detached",
scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=node_id, soft=False),
).remote()
self.idle_actors.put_nowait(actor)

async def run(self, task_payload: dict, previous_result):
actor = await self.idle_actors.get() # 没空闲 Actor 时自动阻塞等待
try:
# 真正的并行:多个 run.remote 同时在不同的 Actor 上运行
result = await actor.run.remote(task_payload, previous_result)
return result
finally:
self.idle_actors.put_nowait(actor) # 任务完成,归还 Actor

这段代码有几个关键点:

  • num_cpus=0, num_gpus=0:不占用计算资源,纯调度角色
  • asyncio.Queue:用于异步 Manager Actor 内部管理空闲 Actor;这里的重点是协程级排队,不是通用线程安全语义
  • NodeAffinitySchedulingStrategy(soft=False):硬绑定到指定节点
  • lifetime="detached":Actor 持久存活,跨任务复用

这里最值得注意的一点是:Manager 本身并不做 GPU 计算,它只是负责“当前节点有哪些空闲 Actor”这件事。

一句话总结:Manager Actor 的作用不是计算,而是把“节点内谁空闲、谁接单”这件事集中管理起来。

2、GPU Worker Actor

接着看真正执行 GPU 任务的 RayActor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@ray.remote(num_gpus=1, num_cpus=1)
class RayActor:
def __init__(self):
# 预加载模型,实现预热
from runtime.speech_engine import SpeechEngine
from runtime.vision_engine import VisionEngine
import config

self.speech_engine = SpeechEngine(config.models_dir)
self.vision_engine = VisionEngine(config.models_dir)

def run(self, task_payload: dict, previous_result):
task_request = TaskRequest(**task_payload)
runner = PipelineRunner(task_request)

runner.run_step_3_diarization(self.speech_engine)
runner.run_step_4_asr(self.speech_engine)
runner.run_step_5_video_analysis(self.vision_engine)

return "success"

这里的重点是:

  • num_gpus=1:每个 Actor 独占一个 GPU
  • 模型在 __init__ 中加载,后续任务直接复用
  • Actor 数量通常等于节点 GPU 数量,不超卖资源

这一步解决的核心问题其实很朴素:把“每次任务都加载模型”变成“节点启动后加载一次模型”。

3、初始化节点级 Actor Pool

每个节点启动后执行一次初始化逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def create_ray_actor_pool_manager(total_num_gpus=None):
try:
with ray.init(address="auto"):
node_id = ray.get_runtime_context().get_node_id()

# 自动检测本节点 GPU 数量
if total_num_gpus is None:
for node in ray.nodes():
if node["NodeID"] == node_id:
total_num_gpus = int(node.get("Resources", {}).get("GPU", 0))
break
if total_num_gpus < 1:
return

actor_count = total_num_gpus
name = f"RayActorPoolManagerActor_{node_id}"

RayActorPoolManagerActor.options(
name=name,
namespace="global",
get_if_exists=True,
lifetime="detached",
max_concurrency=1000,
scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=node_id, soft=False),
).remote(node_id, actor_count)
except Exception as e:
print(f"❌ Failed to create Ray Actor Pool Manager: {e}")

这一步的重点是:

  • get_if_exists=True:重复初始化时优先复用同名 Actor,避免重复创建
  • max_concurrency=1000:允许 Manager Actor 同时接收大量 run() 调用;真正的等待逻辑由 idle_actors 队列完成
  • Manager 名字包含 node_id,从而实现节点级隔离

这里还有一个工程上的注意点:由于使用了 lifetime="detached"get_if_exists=True,部署新代码时要考虑旧 Actor 的生命周期管理,否则有机会复用到旧版本的 detached Actor。

4、Pipeline 调度

Pipeline 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@ray.remote(max_retries=0, num_gpus=0, scheduling_strategy="SPREAD", resources={
"step_a_slot": 1
})
def step_a(task_payload: dict):
runner = build_runner(task_payload)
runner.clear_cache()
asyncio.run(runner.run_step_1_download())
runner.run_step_2_download()
return ray.get_runtime_context().get_node_id()


@ray.remote(max_retries=0, num_gpus=0, resources={
"step_d_slot": 1
})
def step_d(task_payload: dict, step_c_result):
runner = build_runner(task_payload)
model_output = runner.run_step_7_generate_artifact()
requests.post(webhook_url, json=callback_data, timeout=30)


def start_pipeline(task_payload: dict):
step_a_result = ray.get(step_a.remote(task_payload))

manager_name = f"RayActorPoolManagerActor_{step_a_result}"
manager = ray.get_actor(manager_name, namespace="global")

step_b_result = manager.run.remote(task_payload, step_a_result)
step_c_result = step_c.remote(task_payload, step_b_result)
step_d_result = step_d.remote(task_payload, step_c_result)

ray.get(step_d_result)

这段调度逻辑背后的设计意图是:

  • step_aSPREAD 尽量把下载任务分散到各节点
  • step_b 显式回到 step_a 所在节点执行
  • step_c 在当前实现中没有额外节点亲和性约束
  • step_d 依赖 step_d_slot,因此会固定在 Head 节点执行
  • 最后的 ray.get(step_d_result) 用于显式抛出异常,让上层 Celery 统一重试

六、集群启动方式

Head 节点:

1
2
3
4
5
6
ray start --head --port=6379 --dashboard-port=8265 \
--num-cpus=16 --num-gpus=4 \
--resources='{"step_a_slot": 4, "step_d_slot": 1000}' \
--disable-usage-stats

python ray_actor.py

Worker 节点:

1
2
3
4
5
ray start --address='192.168.10.117:6379' \
--num-cpus=16 --num-gpus=4 \
--resources='{"step_a_slot": 4}'

python ray_actor.py

这里的资源设计有两个重点:

  • step_a_slot 控制每个节点可并发的下载任务数量
  • step_d_slot 只在 Head 节点上配置,因此 step_d 会固定在 Head 上执行

当前项目中的“成功回调”也是在 step_d 中发出;但进度回调并不受这个约束,它们会从各自实际执行步骤所在的节点发出。

七、运行效果

Actor 初始化日志:

1
2
3
4
(RayActor pid=82717) ✅ Actor initialized on node 38ad38 (127.0.0.1), GPU: [0]
(RayActor pid=82718) ✅ Actor initialized on node 38ad38 (127.0.0.1), GPU: [1]
(RayActor pid=82719) ✅ Actor initialized on node 38ad38 (127.0.0.1), GPU: [2]
(RayActor pid=82720) ✅ Actor initialized on node 38ad38 (127.0.0.1), GPU: [3]

任务执行日志如下。这里可以看到:step_b 复用了已经预热好的 GPU Actor,因此不会重复加载模型。

1
2
3
4
(step_a pid=70715) [job_0] `step_a` Running on node 38ad38 127.0.0.1, GPUs: []
(RayActor pid=82717) [Step B/job_0] Running on node 38ad38 (127.0.0.1), GPUs: [0]
(step_c pid=70715) [job_0] `step_c` Running on node 38ad38 127.0.0.1, GPUs: []
(step_d pid=70715) [job_0] `step_d` Running on node 1ab2cd 192.168.10.117, GPUs: []

这里再强调三点:

  • step_b 会被显式绑定到 step_a 所在节点
  • step_d 会因为 step_d_slot 资源约束落到 Head 节点
  • step_c 在当前实现中没有额外节点亲和性约束,可能运行在任意满足资源条件的节点。上面的日志只是一次实际运行示例,不代表 step_c 必然与 step_astep_b 在同一节点

八、和 node:IP、Placement Group 方案怎么选

特性 node:IP 资源 Placement Group 节点级 Actor Pool + NodeAffinity
数据本地性
节点绑定表达能力 ✅ 直接 ⚠️ 间接 ✅ 直接
成组资源预留
模型预热 ⚠️ 可做但通常会退回任务级初始化 ⚠️ 可做但不自然 ✅ 初始化时加载一次
Actor 复用 ⚠️ 可做但实现较绕 ✅ 持久存活
排队机制 Task @ray.remote 资源分配 基于 bundle 资源可用性等待调度 asyncio.Queue + RayActor 资源分配
适合单次批处理 ⚠️ 偏长期服务
适合长期节点热池 ⚠️ 一般
实现复杂度 简单 中等 中等

如果你的场景只是偶尔执行一次,模型初始化成本也不高,那么第一篇文章里的 node:IP 方案已经够用。

如果你的重点是“为一组任务临时预留并打包调度资源”,那么 Placement Group 会更合适。

但如果你的服务具有以下特征:

  • 任务高频触发
  • 模型加载耗时且模型相对固定
  • 任务依赖节点本地文件或本地缓存
  • 希望把“节点绑定”和“空闲 Actor 调度”从调用方剥离出去

那么本文这一套“节点级 Actor Pool + NodeAffinity”方案,会是更稳妥的工程化升级。

九、结语

这套方案的价值,不在于“把第一篇文章推翻重写”,也不在于“把 Ray 用得更复杂”,而在于在原有节点绑定思路上,把两个现实问题继续处理干净:

  1. 模型初始化成本高,不能每次任务都重新加载
  2. 任务依赖节点本地文件,不能随意跨节点漂移

一旦把这两个问题放在一起看,引入节点级 Actor Pool 就会变得非常自然:

  • step_a 决定数据落点
  • NodeAffinitySchedulingStrategy 保证 step_b 回到正确节点
  • 用 detached Actor 持续复用预热后的 GPU 模型
  • 用 Manager Actor 把节点内调度逻辑从调用方剥离出去

对于高频、长期运行、依赖本地数据的 GPU 服务来说,这种方式比单纯的资源绑定更接近生产环境需要的形态。

参考资料