LifecycleManager
Overview
LifecycleManager
maintains information of each shuffle for the application:
- All active shuffle ids
Worker
s that are serving each shuffle, and whatPartitionLocation
s are on eachWorker
- Status of each shuffle, i.e. not committed, committing, committed, data lost, expired
- The newest
PartitionLocation
with the largest epoch of each partition id - User identifier for this application
Also, LifecycleManager
handles control messages with ShuffleClient
and Celeborn Master
, typically, it receives
requests from ShuffleClient
:
- RegisterShuffle
- Revive/PartitionSplit
- MapperEnd/StageEnd
- GetReducerFileGroup
to handle the requests, LifecycleManager
will send requests to Master
and Worker
s:
- Heartbeat to
Master
- RequestSlots to
Master
- UnregisterShuffle to
Master
- ReserveSlots to
Worker
- CommitFiles to
Worker
- DestroyWorkerSlots to
Worker
RegisterShuffle
As described in PushData, ShuffleClient
lazily send
RegisterShuffle to LifecycleManager, so many concurrent requests will be sent to LifecycleManager
.
To ensure only one request for each shuffle is handled, LifecycleManager
puts tail requests in a set and only
let go the first request. When the first request finishes, LifecycleManager
responds to all cached requests.
The process of handling RegisterShuffle is as follows:
LifecycleManager
sends RequestSlots to Master
, Master
allocates slots for the shuffle, as
Here describes.
Upon receiving slots allocation result, LifecycleManager
sends ReserveSlots to all Workers
s allocated
in parallel. Worker
s then select a disk and initialize for each PartitionLocation
, see
Here.
After all related Worker
s successfully reserved slots, LifecycleManager
stores the shuffle information in
memory and responds to all pending and future requests.
Revive/PartitionSplit
Celeborn handles push data failure in a so-called Revive mechanism, see
Here. Similar to Split,
they both asks LifecycleManager
for a new epoch of PartitionLocation
for future data pushing.
Upon receiving Revive/PartitionSplit, LifecycleManager
first checks whether it has a newer epoch locally, if so
it just responds the newer one. If not, like handling RegisterShuffle, it puts tail requests for the same partition id
in a set and only let go the first one.
Unlike RegisterShuffle, LifecycleManager
does not send RequestSlots to Master
to ask for new Worker
s. Instead,
it randomly picks Worker
s from local Worker
list, excluding the failing ones. This design is to avoid too many
RPCs to Master
.
Then LifecycleManager
sends ReserveSlots to the picked Worker
s. When success, it responds the new
PartitionLocation
s to ShuffleClient
s.
MapperEnd/StageEnd
Celeborn needs to known when shuffle write stage ends to persist shuffle data, check if any data lost, and prepare for shuffle read. Many compute engines do not signal such event (for example, Spark's ShuffleManager does not have such API), Celeborn has to recognize that itself.
To achieve this, Celeborn requires ShuffleClient
to specify the number of map tasks in RegisterShuffle request,
and send MapperEnd request to LifecycleManager
when a map task succeeds. When MapperEnd are received for every
map id, LifecycleManager
knows that the shuffle write stage ends, and sends CommitFiles to related Worker
s.
For many compute engines, a map task may launch multiple attempts (i.e. speculative execution), and the engine
chooses one of them as successful attempt. However, there is no way for Celeborn to know about the chosen attempt.
Instead, LifecycleManager
records the first attempt sending MapperEnd as the success one for each map task,
and ignores other attempts. This is correct because compute engines guarantee that all attempts for a map task
generate the same output data.
Upon receiving CommitFiles, Worker
s flush buffered data to files and responds the succeeded and failed
PartitionLocation
s to LifecycleManager
, see Here.
LifecycleManager
then checks if any of PartitionLocation
loses both primary and replica data (mark data lost if so),
and stores the information in memory.
GetReducerFileGroup
Reduce task asks LifecycleManager
for PartitionLocation
s of each partition id to read data. To reduce the number
of RPCs, ShuffleClient
asks for the mapping from all partition ids to their PartitionLocation
s and caches in
memory, through GetReducerFileGroup request
Upon receiving the request, LifecycleManager
responds the cached mapping or indicates data lost.
Heartbeat to Master
LifecycleManager
periodically sends heartbeat to Master
, piggybacking the following information:
- Bytes and files written by the application, used to calculate estimated partition size, see Here
Worker
list thatLifecycleManager
wantsMaster
to tell status
UnregisterShuffle
When compute engines tells Celeborn that some shuffle is complete (i.e. through unregisterShuffle for Spark),
LifecycleManager
first checks and waits for write stage end, then put the shuffle id into unregistered set,
after some expire time it removes the id and sends UnregisterShuffle to Master
for cleanup, see
Here
DestroyWorkerSlots
Normally, Worker
s cleanup resources for PartitionLocation
s after notified shuffle unregistered.
In some abnormal cases, Master
will send DestroyWorkerSlots to early cleanup, for example if some Worker
s fail
to reserve slots, LifecycleManager
will tell the successfully reserved Worker
s to release the slots.
Batch RPCs
Some RPCs are of high frequent, for example Revive/PartitionSplit, CommitFiles, DestroyWorkerSlots. To reduce
the number of RPCs, LifecycleManager
batches the same kind of RPCs and periodically checks and sends to Master
through a dedicated thread.
Users can enable and tune batch RPC through the following configs:
celeborn.client.shuffle.batch*