Skip to content

Fault Tolerant

This article describes the detailed design of Celeborn's fault-tolerant.

In addition to data replication to handle Worker lost, Celeborn tries to handle exceptions during shuffle as much as possible, especially the following:

  • When PushData/PushMergedData fail
  • When fetch chunk fails
  • When disk is unhealthy or reaching limit

This article is based on ReducePartition.

Handle PushData Failure

The detailed description of push data can be found in PushData. Push data can fail for various reasons, i.e. CPU high load, network fluctuation, JVM GC, Worker lost.

Celeborn does not eagerly consider Worker lost when push data fails, instead it considers it as temporary unavailable, and asks for another (pair of) PartitionLocation(s) on different Worker(s) to continue pushing. The process is called Revive:

Revive

Handling PushMergedData failure is similar but more complex. Currently, PushMergedData is in all-or-nothing fashion, meaning either all data batches in the request succeed or all fail. Partial success is not supported yet.

Upon PushMergedData failure, ShuffleClient first unpacks and revives for every data batch. Notice that previously all data batches in PushMergedData have the same primary and replica (if any) destination, after reviving new PartitionLocations can spread across multiple Workers.

Then ShuffleClient groups the new PartitionLocations in the same way as before, resulting in multiple PushMergedData requests, then send them to their destinations.

Celeborn detects data lost when processing CommitFiles (See Worker). Celeborn considers no DataLost if and only if every PartitionLocation has succeeded to commit at least one replica (if replication is turned off, there is only one replica for each PartitionLocation).

When a Worker is down, all PartitionLocations on the Worker will be revived, causing Revive RPC flood to LifecycleManager. To alleviate this, ShuffleClient batches all Revive requests before sending to LifecycleManager:

BatchRevive

Handle Fetch Failure

As ReducePartition describes, data file consists of chunks, ShuffleClient asks for a chunk once a time.

ShuffleClient defines the max number of retries for each replica(defaults to 3). When fetch chunk fails, ShuffleClient will try another replica (in case where replication is off, retry the same one).

If the max retry number exceeds, ShuffleClient gives up retrying and throws Exception.

Disk Check

Worker periodically checks disk health and usage. When health check fails, Worker isolates the disk and will not allocate slots on it until it becomes healthy again.

Similarly, if usable space goes less than threshold (defaults to 5GiB), Worker will not allocate slots on it. In addition, to avoid exceeding space, Worker will trigger HARD_SPLIT for all PartitionLocations on the disk to avoid file size growth.

Exactly Once

It can happen that Worker successfully receives and writes a data batch but fails to send ACK to ShuffleClient, or primary successfully receives and writes a data batch but replica fails. Also, different task attempts (i.e. speculative execution) will push the same data twice.

In a word, it can happen that the same data batch are duplicated across PartitionLocation splits. To guarantee exactly once, Celeborn ensures no data is lost, and no duplicate read:

  • For each data batch, ShuffleClient adds a (Map Id, Attempt Id, Batch Id) header, in which Batch Id is a unique id for the data batch in the map attempt
  • LifecycleManager keeps all PartitionLocations with the same partition id
  • For each PartitionLocation split, at least one replica is successfully committed before shuffle read
  • LifecycleManager records the successful task attempt for each map id, and only data from that attempt is read for the map id
  • ShuffleClient discards data batches with a batch id that it has already read