Integrating Celeborn
Overview
The core components of Celeborn, i.e. Master, Worker, and Client are all engine irrelevant. Developers can
integrate Celeborn with various engines or applications by using or extending Celeborn's Client, as the officially
supported plugins for Spark/Flink/MapReduce.
This article briefly describes an example of integrating Celeborn into a simple distributed application using
Celeborn Client.
Background
Say we have a distributed application who has two phases:
- Write phase that parallel tasks write data to some data service, each record is classified into some logical id, say partition id.
- Read phase that parallel tasks read data from the data service, each task read data from a specified partition id.
Suppose the application has failover mechanism so that it's acceptable that when some data is lost the application will rerun tasks.
Say developers of this application is searching for a suitable data service, and accidentally finds this article.
Step One: Setup Celeborn Cluster
First, you need an available Celeborn Cluster. Refer to QuickStart to set up a simple cluster in a single node, or Deploy to set up a multi-node cluster, standalone or on K8s.
Step Two: Create LifecycleManager
As described in Client, Client is separated into LifecycleManager, which is singleton
through an application; and ShuffleClient, which can have multiple instances.
Step two is to create a LifecycleManager instance, using the following API:
class LifecycleManager(val appUniqueId: String, val conf: CelebornConf)
appUniqueIdis the application id. Celeborn cluster stores, serves, and cleans up data in the granularity of (application id, shuffle id)confis an object ofCelebornConf. The only required configuration is the address of CelebornMaster. For the thorough description of configs, refer to Configuration
The example java code to create an LifecycleManager instance is as follows:
CelebornConf celebornConf = new CelebornConf();
celebornConf.set("celeborn.master.endpoints", "<Master IP>:<Master Port>");
LifecycleManager lm = new LifecycleManager("myApp", celebornConf);
LifecycleManager object automatically starts necessary service after creation, so there is no need to call
other APIs to initialize it. You can get LifecycleManager's address after creating it, which is needed to
create ShuffleClient.
String host = lm.getHost();
int port = lm.getPort();
Step Three: Create ShuffleClient
With LifecycleManager's host and port, you can create ShuffleClient using the following API:
public static ShuffleClient get(
String appUniqueId,
String host,
int port,
CelebornConf conf,
UserIdentifier userIdentifier)
appUniqueIdis the application id, same as above.hostis the host ofLifecycleManagerportis the port ofLifecycleManagerconfis an object ofCelebornConf, safe to pass an empty objectuserIdentifierspecifies user identity, safe to pass null
You can create a ShuffleClient object using the following code:
ShuffleClient shuffleClient =
ShuffleClient.get(
"myApp",
<LifecycleManager Host>,
<LifecycleManager Port>,
new CelebornConf(),
null);
This method returns a singleton ShuffleClientImpl object, and it's recommended to use this way as ShuffleClientImpl
maintains status and reuses resource across all shuffles. To make it work, you have to ensure that the
LifecycleManager's host and port are reachable.
In practice, one ShuffleClient instance is created in each Executor process of Spark, or in each TaskManager
process of Flink.
Step Four: Push Data
You can then push data with ShuffleClient with pushData, like
the following:
int bytesWritten =
shuffleClient.pushData(
shuffleId,
mapId,
attemptId,
partitionId,
data,
0,
length,
numMappers,
numPartitions);
Each call of pushData passes a byte array containing data from the same partition id. In addition to specifying the
shuffleId, mapId, attemptId that the data belongs, ShuffleClient should also specify the number of mappers and the
number of partitions for Lazy Register.
After the map task finishes, ShuffleClient should call mapperEnd to tell LifecycleManager that the map task
finishes pushing its data:
public abstract void mapperEnd(
int shuffleId,
int mapId,
int attempted,
int numMappers)
shuffleIdshuffle id of the current taskmapIdmap id of the current taskattemptIdattempt id of the current tasknumMappersnumber of map ids in this shuffle
Step Five: Read Data
After all tasks successfully called mapperEnd, you can start reading data from some partition id, using the
readPartition API, as the following code:
InputStream inputStream = shuffleClient.readPartition(
shuffleId,
partitionId,
attemptId,
startMapIndex,
endMapIndex);
int byte = inputstream.read();
For simplicity, to read the whole data from the partition, you can pass 0 and Integer.MAX_VALUE to startMapIndex
and endMapIndex. This method will create an InputStream for the data, and guarantees no data lost and no
duplicate reading, else exception will be thrown.
Step Six: Clean Up
After the shuffle finishes, you can call LifecycleManager.unregisterShuffle to clean up resources related to the
shuffle:
lm.unregisterShuffle(0);
It's safe not to call unregisterShuffle, because Celeborn Master recognizes application finish by heartbeat
timeout, and will self-clean resources in the cluster.