从零开始的 Ray 框架之旅(二)

在上一章,我们粗略的介绍了 Ray 框架的组件和一些基本功能。在本章,我会用几个简单的Demo来对 Ray 的功能和使用方法有一个初步的认识,我们以几个基本的例子来说明 Ray 框架的简单用法。

1. 将 Python的原生函数变成分布式计算

import ray
ray.init(address="local") # 可以省略address参数

@ray.remote
def squre(x):
    return x * x

futures = [squre.remote(i) for i in range(4)]

print(ray.get(futures))

#output: [0, 1, 4, 9]

我们只需要在原始的 Python 函数上使用一个 @ray.remote 装饰器,即可让其支持分布式计算,即声明一个remote function,这个是 Ray 的基本任务调度单元,它在定义后会被序列化,赋予唯一标识符等元信息,并存储在 GCS 中,这样整个集群都能看到这个函数的定义了。

同样,我们也可以使用这个装饰器修饰一个类,实际上这是声明了一个 Actor 类:

@ray.remote
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1

    def get_count(self):
        return self.count

counter = Counter.remote()
counter.increment.remote()
counter.increment.remote()
print(ray.get(counter.get_count.remote()))

# output: 2

在一个Actor类上调用remote方法后,Ray 会在一个远端的 worker process上异步的创建一个类实例,并返回一个ActorHandle对象,可以将其看做是客户端对远端 Actor 对象的一个代理(proxy),它允许你后续通过这个代理来调用 Actor对象的方法。
而当我们在一个具体的方法上调用 remote 方法时,Ray 就会将这个任务提交到任务队列中,并由可用的 worker 开始异步执行。实际上其返回的只是一个ObjectRef,并不是最终的结果,它类似异步编程中的 future
当我们调用 ray.get() 方法时,它会阻塞当前程序的执行,直到 ObjectRef 所引用的那个远端计算完成,并将其结果获取到调用该方法的进程中,如果已经可用,该方法会立即返回。

当然,我们也可以同时组合 TaskActor 一起使用:

counter = Counter.remote()
counter.increment.remote()
counter.increment.remote()
counter.increment.remote()

output1 = counter.get_count.remote() # 3
output2 = squre.remote(output1) # 9
print(ray.get(output2))

2. 资源管理

在云原生的环境中,资源的需求根据配置文件定制的,而 Ray 给我们提供了强大的无感知资源分配和调度能力,用户只需要在 @ray.remote() 装饰器中传入需要的资源即可,其不仅仅支持 CPU、GPU 和 Memory,还能传入自定义资源。在 Ray 的调度中,所有资源都是逻辑资源而不是具体的物理资源,Ray 会根据 Task/Actor 的资源需求来进行调度,但并不会限制其对资源的真实使用,资源值可以为小数,且没有资源隔离。
我们可以举一个简单的例子进行说明,比如我们现在有一个节点,节点上有 16 核 CPU,那么,假设我们 @ray.remote(num_cpus=2),调度器会自行计算出可以起 8 个 Task/Actor,此时,每个 Task/Actor 都是一个独立的进程,Ray 并不会去管这个进程里面要开多少线程/用到少核,完全由你实现的代码决定。
Ray 允许把一个核拆成若干份,用小数来表达更细的配合,如@ray.remote(num_cpus=0.5)。在调度层面上就能在同一台机器上编排更多的任务,但他仍然不会做隔离,拿到 0.5 个核的进程,运行时还是一个完整进程,照样能用满一个核,只是调度器是乐观的,认为其并不会超过节点容量。典型用途是 I/O-bound 任务或轻量级函数,用更小的 CPU 配额塞满调度器,减少排队时间。GPU也是同理,所以我们必须自己确保多进程的资源抢占问题。

@ray.remote(num_cpus=64, num_gpus=8)
def train_model(config):
    pass

@ray.remote(num_cpus=16, num_gpus=2)
def evaluate_model(model):
    pass

同样,Ray 实现了更灵活的环境指定,我们可以在某些需要特别定制环境的位置进行指定,实现环境的隔离,Ray 集群会在创建这个 Task/Actor 之前去准备该环境,然后将该 Task/Actor调度到该环境执行。

@ray.remote(runtime_env={"env": 
                         {"CUDA_VISIBLE_DEVICES": "0,1,2,3,4,5,6,7",
                          "CONDA_PREFIX": "/home/xxx/anaconda3/envs/pytorch"}})
def train_model(config):
    pass

定制环境的粒度,以 Python 为例,有如下几个维度,一个是 Process 级别的隔离,第二个是 Virtual Env 级别的隔离,第三是 Conda 级别的隔离,第四是 Container 级别的隔离,隔离性依次增强,用户体验轻量级依次降低。
image.png

3. Ray Dataset

Ray 支持高效的 Dataset处理,我们在一台 64核CPU的服务器上进行测试,下面我们生成一个简单的数据集,并对其进行 mapfilter操作:

items = [{"name": str(i), "data": i} for i in range(10000)]
ds = ray.data.from_items(items)
ds.show(3)

"""
{'name': '0', 'data': 0} 
{'name': '1', 'data': 1} 
{'name': '2', 'data': 2} 
"""
squares = ds.map(lambda x: {"squared": x["data"] ** 2})
evens   = squares.filter(lambda row: row["squared"] % 2 == 0)
print(evens.count()) # 5000

我们创建了一个大小为 10000 的简单数据集,每条数据以字典的形式存在,使用 ray.data.from_items 方法进行读取,得到一个 ds 对象,使用 show 方法可以查看数据的前 n 条。
接下来,我们对其分别做map操作(计算每个数据的平方并添加新的字段),以及过滤出所有平方为偶数的数据样本。
我们也可以使用链式调用合并上述操作:

even = (
    ds
    .map(lambda x: {"squared": x["data"] ** 2})
    .filter(lambda row: row["squared"] % 2 == 0)
)
print(even.count())

我们再举一个鸢尾花数据集的处理例子,我们可以从从网上下载到iris.csv格式的数据集,假设我们需要为这个数据集增加一个鸢尾花的面积字段,下面是整个处理的流程:

from typing import Dict
import numpy as np
import ray
import os

ray.init()

#读取数据集,支持磁盘文件,python对象以及云存储,比如s3等数据源
ds = ray.data.read_csv("./iris.csv")

# 定义数据处理函数
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    length = batch["petal.length"]
    width = batch["petal.width"]
    batch["petal.area"] = length * width
    return batch

# 执行处理函数,得到处理后的数据对象
transformed_ds = ds.map_batches(compute_area)

# 打印处理后的数据
for batch in transformed_ds.iter_batches(batch_size=32):
    print(batch)

# 存储数据,如果需要用local前缀,需要写出绝对路径,支持多种格式的输出
transformed_ds.write_json("iris/")
# transformed_ds.write_parquet("local:///home/staff/jiayining/SFT/iris/")

上面我们就完成了一个完整的数据处理流程,其中,map_batches可以完成高效的分布式数据处理任务。
执行完之后,我们发现 iris 目录下面,为什么会出现 128 个输出呢? iris 数据集本身就只有150条数据而已,这个是因为,Ray 有其默认的分块数量和阈值,我们可以通过以下方法打印查看:

from ray.data import DataContext

ctx = DataContext.get_current()

# 输出块数量的起始值
print("read_op_min_num_blocks =", ctx.read_op_min_num_blocks)   
# 最小块阈值(字节)
print("target_min_block_size  =", ctx.target_min_block_size)    
# 最大块阈值(字节)
print("target_max_block_size  =", ctx.target_max_block_size)    

我们会发现默认值:

read_op_min_num_blocks = 200 
target_min_block_size = 1048576 # 1MB
target_max_block_size = 134217728 # 128MB

那似乎也不对,不是应该会被分成 200 个 block 吗?
实际上,它使用的四步启发式规则来确定最终的 block数,随后反推每块大约占用多少字节数:

  1. 先取 200(或用户自己指定的值)
  2. 如果 总大小 / 块数 < target_min_block_size,就合并块,把块数减到 ceil(总大小 / target_min_block_size)
  3. 如果 总大小 / 块数 > target_max_block_size,就继续拆分块,把块数增到 ceil(总大小 / target_max_block_size)
  4. 保证块数 ≥ 2 * CPU 个数,防止CPU空闲。

因为我们在 64 核 CPU 的服务器上运行,根据这些规则,所以最终输出了 128 个 json 文件。
我们有多种方法来解决这个问题,首先,我们可以在初始化时指定 CPU 个数来解决:

ray.init(num_cpus=2)

我们也可以通过在读取数据集的时候指定分块数来解决:

ds = ray.data.read_csv("./iris.csv", override_num_blocks=4)

我们也可以通过在存储前合并分区来解决:

transformed_ds = ds.map_batches(compute_area).repartition(4)

最后,我们也可以通过制定写入时的 min_rows_per_file参数来解决:

transformed_ds.write_json("iris/", min_rows_per_file=200)
  • 方法一:是对整个 Ray 应用的全局资源限制,影响所有阶段。通常不作为精细控制数据处理并行度或输出文件数的首选。
  • 方法二:影响读取并行度和初始数据分块。对于控制初始并行度有效,但要注意其对单文件读取可能产生的额外开销。
  • 方法三:重量级的全量 shuffle 操作。能精确控制后续操作的并行度和数据块数量,但代价高昂,尤其对于大规模数据。
  • 方法四:写入时聚合,用于控制输出文件大小,避免小文件问题。开销远小于 repartition,因为它不执行全局 shuffle。

通常,我们会使用这些策略:

  1. ray.init() 自动检测或设置为机器的实际核心数以充分利用资源。
  2. 根据数据源的特性和大小,合理设置 override_num_blocks 或者让 Ray 自动推断。
  3. 在需要改变并行度或数据分布以优化某些计算密集型操作(如 join 或 group by)时,才谨慎使用 repartition
  4. 在写入数据时,使用 min_rows_per_file 或类似的参数(如 Parquet 的 row_group_size_bytes)来控制输出文件的大小和数量,以优化存储和后续查询性能。

总结

本章内容介绍了 Ray 的常用方法,以及 Ray Dataset的使用方法,在下一章中,我们会使用 Ray DatasetPytorchRay Tune 在鸢尾花数据集上训练一个简单的多分类神经网络。

参考文献

  1. Getting Started with Distributed Machine Learning with PyTorch and Ray
  2. Ray Github Repo
  3. Ray Docs
  4. Ray: 大模型时代的AI计算基础设施
评论区
头像
    头像
    JayNing
      

    写的好 会写就多写点