从零开始的 Ray 框架之旅(二)

在上一章,我们粗略的介绍了 Ray 框架的组件和一些基本功能。在本章,我会用几个简单的Demo来对 Ray 的功能和使用方法有一个初步的认识,我们以几个基本的例子来说明 Ray 框架的简单用法。

1. 将 Python的原生函数变成分布式计算

import ray
ray.init(address="local") # 可以省略address参数

@ray.remote
def squre(x):
    return x * x

futures = [squre.remote(i) for i in range(4)]

print(ray.get(futures))

#output: [0, 1, 4, 9]

我们只需要在原始的 Python 函数上使用一个 @ray.remote 装饰器,即可让其支持分布式计算,即声明一个remote function,这个是 Ray 的基本任务调度单元,它在定义后会被序列化,赋予唯一标识符等元信息,并存储在 GCS 中,这样整个集群都能看到这个函数的定义了。

同样,我们也可以使用这个装饰器修饰一个类,实际上这是声明了一个 Actor 类:

@ray.remote
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1

    def get_count(self):
        return self.count

counter = Counter.remote()
counter.increment.remote()
counter.increment.remote()
print(ray.get(counter.get_count.remote()))

# output: 2

在一个Actor类上调用remote方法后,Ray 会在一个远端的 worker process上异步的创建一个类实例,并返回一个ActorHandle对象,可以将其看做是客户端对远端 Actor 对象的一个代理(proxy),它允许你后续通过这个代理来调用 Actor对象的方法。
而当我们在一个具体的方法上调用 remote 方法时,Ray 就会将这个任务提交到任务队列中,并由可用的 worker 开始异步执行。实际上其返回的只是一个ObjectRef,并不是最终的结果,它类似异步编程中的 future
当我们调用 ray.get() 方法时,它会阻塞当前程序的执行,直到 ObjectRef 所引用的那个远端计算完成,并将其结果获取到调用该方法的进程中,如果已经可用,该方法会立即返回。

当然,我们也可以同时组合 TaskActor 一起使用:

counter = Counter.remote()
counter.increment.remote()
counter.increment.remote()
counter.increment.remote()

output1 = counter.get_count.remote() # 3
output2 = squre.remote(output1) # 9
print(ray.get(output2))

2. 资源管理

在云原生的环境中,资源的需求根据配置文件定制的,而 Ray 给我们提供了强大的无感知资源分配和调度能力,用户只需要在 @ray.remote() 装饰器中传入需要的资源即可,其不仅仅支持 CPU、GPU 和 Memory,还能传入自定义资源。在 Ray 的调度中,所有资源都是逻辑资源而不是具体的物理资源,Ray 会根据 Task/Actor 的资源需求来进行调度,但并不会限制其对资源的真实使用,资源值可以为小数,且没有资源隔离。
我们可以举一个简单的例子进行说明,比如我们现在有一个节点,节点上有 16 核 CPU,那么,假设我们 @ray.remote(num_cpus=2),调度器会自行计算出可以起 8 个 Task/Actor,此时,每个 Task/Actor 都是一个独立的进程,Ray 并不会去管这个进程里面要开多少线程/用到少核,完全由你实现的代码决定。
Ray 允许把一个核拆成若干份,用小数来表达更细的配合,如@ray.remote(num_cpus=0.5)。在调度层面上就能在同一台机器上编排更多的任务,但他仍然不会做隔离,拿到 0.5 个核的进程,运行时还是一个完整进程,照样能用满一个核,只是调度器是乐观的,认为其并不会超过节点容量。典型用途是 I/O-bound 任务或轻量级函数,用更小的 CPU 配额塞满调度器,减少排队时间。GPU也是同理,所以我们必须自己确保多进程的资源抢占问题。

@ray.remote(num_cpus=64, num_gpus=8)
def train_model(config):
    pass

@ray.remote(num_cpus=16, num_gpus=2)
def evaluate_model(model):
    pass

同样,Ray 实现了更灵活的环境指定,我们可以在某些需要特别定制环境的位置进行指定,实现环境的隔离,Ray 集群会在创建这个 Task/Actor 之前去准备该环境,然后将该 Task/Actor调度到该环境执行。

@ray.remote(runtime_env={"env": 
                         {"CUDA_VISIBLE_DEVICES": "0,1,2,3,4,5,6,7",
                          "CONDA_PREFIX": "/home/xxx/anaconda3/envs/pytorch"}})
def train_model(config):
    pass

定制环境的粒度,以 Python 为例,有如下几个维度,一个是 Process 级别的隔离,第二个是 Virtual Env 级别的隔离,第三是 Conda 级别的隔离,第四是 Container 级别的隔离,隔离性依次增强,用户体验轻量级依次降低。
image.png

3. Ray Dataset

Ray 支持高效的 Dataset处理,我们在一台 64核CPU的服务器上进行测试,下面我们生成一个简单的数据集,并对其进行 mapfilter操作:

items = [{"name": str(i), "data": i} for i in range(10000)]
ds = ray.data.from_items(items)
ds.show(3)

"""
{'name': '0', 'data': 0} 
{'name': '1', 'data': 1} 
{'name': '2', 'data': 2} 
"""
squares = ds.map(lambda x: {"squared": x["data"] ** 2})
evens   = squares.filter(lambda row: row["squared"] % 2 == 0)
print(evens.count()) # 5000

我们创建了一个大小为 10000 的简单数据集,每条数据以字典的形式存在,使用 ray.data.from_items 方法进行读取,得到一个 ds 对象,使用 show 方法可以查看数据的前 n 条。
接下来,我们对其分别做map操作(计算每个数据的平方并添加新的字段),以及过滤出所有平方为偶数的数据样本。
我们也可以使用链式调用合并上述操作:

even = (
    ds
    .map(lambda x: {"squared": x["data"] ** 2})
    .filter(lambda row: row["squared"] % 2 == 0)
)
print(even.count())

我们再举一个鸢尾花数据集的处理例子,我们可以从从网上下载到iris.csv格式的数据集,假设我们需要为这个数据集增加一个鸢尾花的面积字段,下面是整个处理的流程:

from typing import Dict
import numpy as np
import ray
import os

ray.init()

#读取数据集,支持磁盘文件,python对象以及云存储,比如s3等数据源
ds = ray.data.read_csv("./iris.csv")

# 定义数据处理函数
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    length = batch["petal.length"]
    width = batch["petal.width"]
    batch["petal.area"] = length * width
    return batch

# 执行处理函数,得到处理后的数据对象
transformed_ds = ds.map_batches(compute_area)

# 打印处理后的数据
for batch in transformed_ds.iter_batches(batch_size=32):
    print(batch)

# 存储数据,如果需要用local前缀,需要写出绝对路径,支持多种格式的输出
transformed_ds.write_json("iris/")
# transformed_ds.write_parquet("local:///home/staff/jiayining/SFT/iris/")

上面我们就完成了一个完整的数据处理流程,其中,map_batches可以完成高效的分布式数据处理任务。
执行完之后,我们发现 iris 目录下面,为什么会出现 128 个输出呢? iris 数据集本身就只有150条数据而已,这个是因为,Ray 有其默认的分块数量和阈值,我们可以通过以下方法打印查看:

from ray.data import DataContext

ctx = DataContext.get_current()

# 输出块数量的起始值
print("read_op_min_num_blocks =", ctx.read_op_min_num_blocks)   
# 最小块阈值(字节)
print("target_min_block_size  =", ctx.target_min_block_size)    
# 最大块阈值(字节)
print("target_max_block_size  =", ctx.target_max_block_size)    

我们会发现默认值:

read_op_min_num_blocks = 200 
target_min_block_size = 1048576 # 1MB
target_max_block_size = 134217728 # 128MB

那似乎也不对,不是应该会被分成 200 个 block 吗?
实际上,它使用的四步启发式规则来确定最终的 block数,随后反推每块大约占用多少字节数:

  1. 先取 200(或用户自己指定的值)
  2. 如果 总大小 / 块数 < target_min_block_size,就合并块,把块数减到 ceil(总大小 / target_min_block_size)
  3. 如果 总大小 / 块数 > target_max_block_size,就继续拆分块,把块数增到 ceil(总大小 / target_max_block_size)
  4. 保证块数 ≥ 2 * CPU 个数,防止CPU空闲。

因为我们在 64 核 CPU 的服务器上运行,根据这些规则,所以最终输出了 128 个 json 文件。
我们有多种方法来解决这个问题,首先,我们可以在初始化时指定 CPU 个数来解决:

ray.init(num_cpus=2)

我们也可以通过在读取数据集的时候指定分块数来解决:

ds = ray.data.read_csv("./iris.csv", override_num_blocks=4)

我们也可以通过在存储前合并分区来解决:

transformed_ds = ds.map_batches(compute_area).repartition(4)

最后,我们也可以通过制定写入时的 min_rows_per_file参数来解决:

transformed_ds.write_json("iris/", min_rows_per_file=200)
  • 方法一:是对整个 Ray 应用的全局资源限制,影响所有阶段。通常不作为精细控制数据处理并行度或输出文件数的首选。
  • 方法二:影响读取并行度和初始数据分块。对于控制初始并行度有效,但要注意其对单文件读取可能产生的额外开销。
  • 方法三:重量级的全量 shuffle 操作。能精确控制后续操作的并行度和数据块数量,但代价高昂,尤其对于大规模数据。
  • 方法四:写入时聚合,用于控制输出文件大小,避免小文件问题。开销远小于 repartition,因为它不执行全局 shuffle。

通常,我们会使用这些策略:

  1. ray.init() 自动检测或设置为机器的实际核心数以充分利用资源。
  2. 根据数据源的特性和大小,合理设置 override_num_blocks 或者让 Ray 自动推断。
  3. 在需要改变并行度或数据分布以优化某些计算密集型操作(如 join 或 group by)时,才谨慎使用 repartition
  4. 在写入数据时,使用 min_rows_per_file 或类似的参数(如 Parquet 的 row_group_size_bytes)来控制输出文件的大小和数量,以优化存储和后续查询性能。

总结

本章内容介绍了 Ray 的常用方法,以及 Ray Dataset的使用方法,在下一章中,我们会使用 Ray DatasetPytorchRay Tune 在鸢尾花数据集上训练一个简单的多分类神经网络。

参考文献

  1. Getting Started with Distributed Machine Learning with PyTorch and Ray
  2. Ray Github Repo
  3. Ray Docs
  4. Ray: 大模型时代的AI计算基础设施
评论区
头像
    头像
    znpvuvncpn
      

    2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
    新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
    新车首发,新的一年,只带想赚米的人coinsrore.com
    新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
    做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
    新车上路,只带前10个人coinsrore.com
    新盘首开 新盘首开 征召客户!!!coinsrore.com
    新项目准备上线,寻找志同道合的合作伙伴coinsrore.com
    新车即将上线 真正的项目,期待你的参与coinsrore.com
    新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
    新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com

    头像
    JayNing
      

    写的好 会写就多写点