Celeborn Architecture
This article introduces high level Apache Celeborn™ Architecture. For more detailed description of each module/process, please refer to dedicated articles.
Why Celeborn
In distributed compute engines, data exchange between compute nodes is common but expensive. The cost comes from the disk and network inefficiency (M * N between Mappers and Reducers) in traditional shuffle frame, as following:
Besides inefficiency, traditional shuffle framework requires large local storage in compute node to store shuffle data, thus blocks the adoption of disaggregated architecture.
Apache Celeborn solves the problems by reorganizing shuffle data in a more efficient way, and storing the data in a separate service. The high level architecture of Celeborn is as follows:
Components
Celeborn has three primary components: Master, Worker, and Client.
- Master manages Celeborn cluster and achieves high availability(HA) based on Raft.
- Worker processes read-write requests.
- Client writes/reads data to/from Celeborn cluster, and manages shuffle metadata for the application.
In most distributed compute engines, there are typically two roles: one role for application lifecycle management
and task orchestration, i.e. Driver
in Spark and JobMaster
for Flink; the other role for executing tasks, i.e.
Executor
in Spark and TaskManager
for Flink.
Similarly, Celeborn Client is also divided into two roles: LifecycleManager
for control plane, responsible for
managing all shuffle metadata for the application; and ShuffleClient
for data plane, responsible for write/read
data to/from Workers.
LifecycleManager
resides in Driver
or JobMaster
, one instance in each application; ShuffleClient
resides in
each Executor
or TaskManager
, one instance in each process of Executor
/TaskManager
.
Shuffle Lifecycle
A typical lifecycle of a shuffle with Celeborn is as follows:
- Client sends
RegisterShuffle
to Master. Master allocates slots among Workers and responds to Client. - Client sends
ReserveSlots
to Workers. Workers reserve slots for the shuffle and responds to Client. - Clients push data to allocated Workers. Data of the same
partitionId
are pushed to the same logicalPartitionLocation
. - After all Clients finishes pushing data, Client sends
CommitFiles
to each Worker. Workers commit data for the shuffle then respond to Client. - Clients send
OpenStream
to Workers for each partition split file to prepare for reading. - Clients send
ChunkFetchRequest
to Workers to read chunks. - After Client finishes reading data, Client sends
UnregisterShuffle
to Master to release resources.
Data Reorganization
Celeborn improves disk and network efficiency through data reorganization. Typically, Celeborn stores all shuffle data
with the same partitionId
in a logical PartitionLocation
.
In normal cases each PartitionLocation
corresponds to a single file. When a reducer requires for the partition's data,
it just needs one network connection and sequentially read the coarse grained file.
In abnormal cases, such as when the file grows too large, or push data fails, Celeborn spawns a new split of the
PartitionLocation
, and future data within the partition will be pushed to the new split.
LifecycleManager
keeps the split information and tells reducer to read from all splits of the PartitionLocation
to guarantee no data is lost.
Data Storage
Celeborn stores shuffle data in configurable multiple layers, i.e. Memory
, Local Disks
, Distributed File System
,
and Object Store
. Users can specify any combination of the layers on each Worker.
Currently, Celeborn only supports Local Disks
and HDFS
. Supporting for other storage systems are under working.
Compute Engine Integration
Celeborn's primary components(i.e. Master, Worker, Client) are engine irrelevant. The Client APIs are extensible and easy to implement plugins for various engines.
Currently, Celeborn officially supports Spark(both Spark 2.x and Spark 3.x), Flink(1.14/1.15/1.16/1.17/1.18/1.19), and Gluten. Also, developers are integrating Celeborn with other engines, for example MR3.
Celeborn community is also working on integrating Celeborn with other engines.
Graceful Shutdown
In order not to impact running applications when upgrading Celeborn Cluster, Celeborn implements Graceful Upgrade.
When graceful shutdown is turned on, upon shutdown, Celeborn will do the following things:
- Master will not allocate slots on the Worker.
- Worker will inform Clients to split.
- Client will send
CommitFiles
to the Worker.
Then the Worker waits until all PartitionLocation
flushes data to persistent storage, stores states in local RocksDB or LevelDB(deprecated),
then stops itself. The process is typically within one minute.
For more details, please refer to Rolling upgrade