在上一章,我们粗略的介绍了 Ray 框架的组件和一些基本功能。在本章,我会用几个简单的Demo
来对 Ray 的功能和使用方法有一个初步的认识,我们以几个基本的例子来说明 Ray 框架的简单用法。
1. 将 Python的原生函数变成分布式计算
import ray
ray.init(address="local") # 可以省略address参数
@ray.remote
def squre(x):
return x * x
futures = [squre.remote(i) for i in range(4)]
print(ray.get(futures))
#output: [0, 1, 4, 9]
我们只需要在原始的 Python 函数上使用一个 @ray.remote
装饰器,即可让其支持分布式计算,即声明一个remote function
,这个是 Ray 的基本任务调度单元,它在定义后会被序列化,赋予唯一标识符等元信息,并存储在 GCS
中,这样整个集群都能看到这个函数的定义了。
同样,我们也可以使用这个装饰器修饰一个类,实际上这是声明了一个 Actor
类:
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
def get_count(self):
return self.count
counter = Counter.remote()
counter.increment.remote()
counter.increment.remote()
print(ray.get(counter.get_count.remote()))
# output: 2
在一个Actor
类上调用remote
方法后,Ray 会在一个远端的 worker process上异步的创建一个类实例,并返回一个ActorHandle
对象,可以将其看做是客户端对远端 Actor 对象的一个代理(proxy),它允许你后续通过这个代理来调用 Actor对象的方法。
而当我们在一个具体的方法上调用 remote
方法时,Ray 就会将这个任务提交到任务队列中,并由可用的 worker 开始异步执行。实际上其返回的只是一个ObjectRef
,并不是最终的结果,它类似异步编程中的 future
。
当我们调用 ray.get()
方法时,它会阻塞当前程序的执行,直到 ObjectRef
所引用的那个远端计算完成,并将其结果获取到调用该方法的进程中,如果已经可用,该方法会立即返回。
当然,我们也可以同时组合 Task
和 Actor
一起使用:
counter = Counter.remote()
counter.increment.remote()
counter.increment.remote()
counter.increment.remote()
output1 = counter.get_count.remote() # 3
output2 = squre.remote(output1) # 9
print(ray.get(output2))
2. 资源管理
在云原生的环境中,资源的需求根据配置文件定制的,而 Ray 给我们提供了强大的无感知资源分配和调度能力,用户只需要在 @ray.remote()
装饰器中传入需要的资源即可,其不仅仅支持 CPU、GPU 和 Memory,还能传入自定义资源。在 Ray 的调度中,所有资源都是逻辑资源而不是具体的物理资源,Ray 会根据 Task/Actor 的资源需求来进行调度,但并不会限制其对资源的真实使用,资源值可以为小数,且没有资源隔离。
我们可以举一个简单的例子进行说明,比如我们现在有一个节点,节点上有 16 核 CPU,那么,假设我们 @ray.remote(num_cpus=2)
,调度器会自行计算出可以起 8 个 Task/Actor,此时,每个 Task/Actor 都是一个独立的进程,Ray 并不会去管这个进程里面要开多少线程/用到少核,完全由你实现的代码决定。
Ray 允许把一个核拆成若干份,用小数来表达更细的配合,如@ray.remote(num_cpus=0.5)
。在调度层面上就能在同一台机器上编排更多的任务,但他仍然不会做隔离,拿到 0.5 个核的进程,运行时还是一个完整进程,照样能用满一个核,只是调度器是乐观的,认为其并不会超过节点容量。典型用途是 I/O-bound 任务或轻量级函数,用更小的 CPU 配额塞满调度器,减少排队时间。GPU也是同理,所以我们必须自己确保多进程的资源抢占问题。
@ray.remote(num_cpus=64, num_gpus=8)
def train_model(config):
pass
@ray.remote(num_cpus=16, num_gpus=2)
def evaluate_model(model):
pass
同样,Ray 实现了更灵活的环境指定,我们可以在某些需要特别定制环境的位置进行指定,实现环境的隔离,Ray 集群会在创建这个 Task/Actor 之前去准备该环境,然后将该 Task/Actor调度到该环境执行。
@ray.remote(runtime_env={"env":
{"CUDA_VISIBLE_DEVICES": "0,1,2,3,4,5,6,7",
"CONDA_PREFIX": "/home/xxx/anaconda3/envs/pytorch"}})
def train_model(config):
pass
定制环境的粒度,以 Python 为例,有如下几个维度,一个是 Process 级别的隔离,第二个是 Virtual Env 级别的隔离,第三是 Conda 级别的隔离,第四是 Container 级别的隔离,隔离性依次增强,用户体验轻量级依次降低。
3. Ray Dataset
Ray 支持高效的 Dataset处理,我们在一台 64核CPU的服务器上进行测试,下面我们生成一个简单的数据集,并对其进行 map
和 filter
操作:
items = [{"name": str(i), "data": i} for i in range(10000)]
ds = ray.data.from_items(items)
ds.show(3)
"""
{'name': '0', 'data': 0}
{'name': '1', 'data': 1}
{'name': '2', 'data': 2}
"""
squares = ds.map(lambda x: {"squared": x["data"] ** 2})
evens = squares.filter(lambda row: row["squared"] % 2 == 0)
print(evens.count()) # 5000
我们创建了一个大小为 10000 的简单数据集,每条数据以字典的形式存在,使用 ray.data.from_items
方法进行读取,得到一个 ds
对象,使用 show
方法可以查看数据的前 n 条。
接下来,我们对其分别做map
操作(计算每个数据的平方并添加新的字段),以及过滤出所有平方为偶数的数据样本。
我们也可以使用链式调用合并上述操作:
even = (
ds
.map(lambda x: {"squared": x["data"] ** 2})
.filter(lambda row: row["squared"] % 2 == 0)
)
print(even.count())
我们再举一个鸢尾花数据集的处理例子,我们可以从从网上下载到iris.csv
格式的数据集,假设我们需要为这个数据集增加一个鸢尾花的面积字段,下面是整个处理的流程:
from typing import Dict
import numpy as np
import ray
import os
ray.init()
#读取数据集,支持磁盘文件,python对象以及云存储,比如s3等数据源
ds = ray.data.read_csv("./iris.csv")
# 定义数据处理函数
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
length = batch["petal.length"]
width = batch["petal.width"]
batch["petal.area"] = length * width
return batch
# 执行处理函数,得到处理后的数据对象
transformed_ds = ds.map_batches(compute_area)
# 打印处理后的数据
for batch in transformed_ds.iter_batches(batch_size=32):
print(batch)
# 存储数据,如果需要用local前缀,需要写出绝对路径,支持多种格式的输出
transformed_ds.write_json("iris/")
# transformed_ds.write_parquet("local:///home/staff/jiayining/SFT/iris/")
上面我们就完成了一个完整的数据处理流程,其中,map_batches
可以完成高效的分布式数据处理任务。
执行完之后,我们发现 iris 目录下面,为什么会出现 128 个输出呢? iris 数据集本身就只有150条数据而已,这个是因为,Ray 有其默认的分块数量和阈值,我们可以通过以下方法打印查看:
from ray.data import DataContext
ctx = DataContext.get_current()
# 输出块数量的起始值
print("read_op_min_num_blocks =", ctx.read_op_min_num_blocks)
# 最小块阈值(字节)
print("target_min_block_size =", ctx.target_min_block_size)
# 最大块阈值(字节)
print("target_max_block_size =", ctx.target_max_block_size)
我们会发现默认值:
read_op_min_num_blocks = 200
target_min_block_size = 1048576 # 1MB
target_max_block_size = 134217728 # 128MB
那似乎也不对,不是应该会被分成 200
个 block 吗?
实际上,它使用的四步启发式规则来确定最终的 block数,随后反推每块大约占用多少字节数:
- 先取 200(或用户自己指定的值)
- 如果
总大小 / 块数 < target_min_block_size
,就合并块,把块数减到ceil(总大小 / target_min_block_size)
- 如果
总大小 / 块数 > target_max_block_size
,就继续拆分块,把块数增到ceil(总大小 / target_max_block_size)
- 保证块数 ≥ 2 * CPU 个数,防止CPU空闲。
因为我们在 64 核 CPU 的服务器上运行,根据这些规则,所以最终输出了 128 个 json 文件。
我们有多种方法来解决这个问题,首先,我们可以在初始化时指定 CPU 个数来解决:
ray.init(num_cpus=2)
我们也可以通过在读取数据集的时候指定分块数来解决:
ds = ray.data.read_csv("./iris.csv", override_num_blocks=4)
我们也可以通过在存储前合并分区来解决:
transformed_ds = ds.map_batches(compute_area).repartition(4)
最后,我们也可以通过制定写入时的 min_rows_per_file
参数来解决:
transformed_ds.write_json("iris/", min_rows_per_file=200)
- 方法一:是对整个 Ray 应用的全局资源限制,影响所有阶段。通常不作为精细控制数据处理并行度或输出文件数的首选。
- 方法二:影响读取并行度和初始数据分块。对于控制初始并行度有效,但要注意其对单文件读取可能产生的额外开销。
- 方法三:重量级的全量 shuffle 操作。能精确控制后续操作的并行度和数据块数量,但代价高昂,尤其对于大规模数据。
- 方法四:写入时聚合,用于控制输出文件大小,避免小文件问题。开销远小于
repartition
,因为它不执行全局 shuffle。
通常,我们会使用这些策略:
- 让
ray.init()
自动检测或设置为机器的实际核心数以充分利用资源。 - 根据数据源的特性和大小,合理设置
override_num_blocks
或者让 Ray 自动推断。 - 在需要改变并行度或数据分布以优化某些计算密集型操作(如 join 或 group by)时,才谨慎使用
repartition
。 - 在写入数据时,使用
min_rows_per_file
或类似的参数(如 Parquet 的row_group_size_bytes
)来控制输出文件的大小和数量,以优化存储和后续查询性能。
总结
本章内容介绍了 Ray 的常用方法,以及 Ray Dataset的使用方法,在下一章中,我们会使用 Ray Dataset
、Pytorch
和 Ray Tune
在鸢尾花数据集上训练一个简单的多分类神经网络。
写的好 会写就多写点