Skip to content

LifecycleManager

Overview

LifecycleManager maintains information of each shuffle for the application:

  • All active shuffle ids
  • Workers that are serving each shuffle, and what PartitionLocations are on each Worker
  • Status of each shuffle, i.e. not committed, committing, committed, data lost, expired
  • The newest PartitionLocation with the largest epoch of each partition id
  • User identifier for this application

Also, LifecycleManager handles control messages with ShuffleClient and Celeborn Master, typically, it receives requests from ShuffleClient:

  • RegisterShuffle
  • Revive/PartitionSplit
  • MapperEnd/StageEnd
  • GetReducerFileGroup

to handle the requests, LifecycleManager will send requests to Master and Workers:

  • Heartbeat to Master
  • RequestSlots to Master
  • UnregisterShuffle to Master
  • ReserveSlots to Worker
  • CommitFiles to Worker
  • DestroyWorkerSlots to Worker

RegisterShuffle

As described in PushData, ShuffleClient lazily send RegisterShuffle to LifecycleManager, so many concurrent requests will be sent to LifecycleManager.

To ensure only one request for each shuffle is handled, LifecycleManager puts tail requests in a set and only let go the first request. When the first request finishes, LifecycleManager responds to all cached requests.

The process of handling RegisterShuffle is as follows:

LifecycleManager sends RequestSlots to Master, Master allocates slots for the shuffle, as Here describes.

Upon receiving slots allocation result, LifecycleManager sends ReserveSlots to all Workerss allocated in parallel. Workers then select a disk and initialize for each PartitionLocation, see Here.

After all related Workers successfully reserved slots, LifecycleManager stores the shuffle information in memory and responds to all pending and future requests.

Revive/PartitionSplit

Celeborn handles push data failure in a so-called Revive mechanism, see Here. Similar to Split, they both asks LifecycleManager for a new epoch of PartitionLocation for future data pushing.

Upon receiving Revive/PartitionSplit, LifecycleManager first checks whether it has a newer epoch locally, if so it just responds the newer one. If not, like handling RegisterShuffle, it puts tail requests for the same partition id in a set and only let go the first one.

Unlike RegisterShuffle, LifecycleManager does not send RequestSlots to Master to ask for new Workers. Instead, it randomly picks Workers from local Worker list, excluding the failing ones. This design is to avoid too many RPCs to Master.

Then LifecycleManager sends ReserveSlots to the picked Workers. When success, it responds the new PartitionLocations to ShuffleClients.

MapperEnd/StageEnd

Celeborn needs to known when shuffle write stage ends to persist shuffle data, check if any data lost, and prepare for shuffle read. Many compute engines do not signal such event (for example, Spark's ShuffleManager does not have such API), Celeborn has to recognize that itself.

To achieve this, Celeborn requires ShuffleClient to specify the number of map tasks in RegisterShuffle request, and send MapperEnd request to LifecycleManager when a map task succeeds. When MapperEnd are received for every map id, LifecycleManager knows that the shuffle write stage ends, and sends CommitFiles to related Workers.

For many compute engines, a map task may launch multiple attempts (i.e. speculative execution), and the engine chooses one of them as successful attempt. However, there is no way for Celeborn to know about the chosen attempt. Instead, LifecycleManager records the first attempt sending MapperEnd as the success one for each map task, and ignores other attempts. This is correct because compute engines guarantee that all attempts for a map task generate the same output data.

Upon receiving CommitFiles, Workers flush buffered data to files and responds the succeeded and failed PartitionLocations to LifecycleManager, see Here. LifecycleManager then checks if any of PartitionLocation loses both primary and replica data (mark data lost if so), and stores the information in memory.

GetReducerFileGroup

Reduce task asks LifecycleManager for PartitionLocations of each partition id to read data. To reduce the number of RPCs, ShuffleClient asks for the mapping from all partition ids to their PartitionLocations and caches in memory, through GetReducerFileGroup request

Upon receiving the request, LifecycleManager responds the cached mapping or indicates data lost.

Heartbeat to Master

LifecycleManager periodically sends heartbeat to Master, piggybacking the following information:

  • Bytes and files written by the application, used to calculate estimated partition size, see Here
  • Worker list that LifecycleManager wants Master to tell status

UnregisterShuffle

When compute engines tells Celeborn that some shuffle is complete (i.e. through unregisterShuffle for Spark), LifecycleManager first checks and waits for write stage end, then put the shuffle id into unregistered set, after some expire time it removes the id and sends UnregisterShuffle to Master for cleanup, see Here

DestroyWorkerSlots

Normally, Workers cleanup resources for PartitionLocations after notified shuffle unregistered. In some abnormal cases, Master will send DestroyWorkerSlots to early cleanup, for example if some Workers fail to reserve slots, LifecycleManager will tell the successfully reserved Workers to release the slots.

Batch RPCs

Some RPCs are of high frequent, for example Revive/PartitionSplit, CommitFiles, DestroyWorkerSlots. To reduce the number of RPCs, LifecycleManager batches the same kind of RPCs and periodically checks and sends to Master through a dedicated thread.

Users can enable and tune batch RPC through the following configs: celeborn.client.shuffle.batch*