Configuration Guide
This documentation contains Celeborn configuration details and a tuning guide.
Important Configurations
Environment Variables
CELEBORN_WORKER_MEMORY=4g
CELEBORN_WORKER_OFFHEAP_MEMORY=24g
Celeborn workers tend to improve performance by using off-heap buffers. Off-heap memory requirement can be estimated as below:
numDirs = `celeborn.worker.storage.dirs` # the amount of directory will be used by Celeborn storage
bufferSize = `celeborn.worker.flusher.buffer.size` # the amount of memory will be used by a single flush buffer
off-heap-memory = bufferSize * estimatedTasks * 2 + network memory
For example, if an Celeborn worker has 10 storage directories or disks and the buffer size is set to 256 KiB. The necessary off-heap memory is 10 GiB.
Network memory will be consumed when netty reads from a TPC channel, there will need some extra
memory. Empirically, Celeborn worker off-heap memory should be set to (numDirs * bufferSize * 1.2)
.
All Configurations
Client
Key | Default | Description | Since |
---|---|---|---|
celeborn.application.heartbeatInterval | 10s | Interval for client to send heartbeat message to master. | 0.2.0 |
celeborn.client.maxRetries | 15 | Max retry times for client to connect master endpoint | 0.2.0 |
celeborn.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch request. | 0.2.0 |
celeborn.fetch.maxRetries | 3 | Max retries of fetch chunk | 0.2.0 |
celeborn.fetch.timeout | 120s | Timeout for a task to fetch chunk. | 0.2.0 |
celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: <host1>:<port1>[,<host2>:<port2>]* , e.g. clb1:9097,clb2:9098,clb3:9099 . If the port is omitted, 9097 will be used. |
0.2.0 |
celeborn.push.buffer.initial.size | 8k | 0.2.0 | |
celeborn.push.buffer.max.size | 64k | Max size of reducer partition buffer memory for shuffle hash writer. The pushed data will be buffered in memory before sending to Celeborn worker. For performance consideration keep this buffer size higher than 32K. Example: If reducer amount is 2000, buffer size is 64K, then each task will consume up to 64KiB * 2000 = 125MiB heap memory. |
0.2.0 |
celeborn.push.data.timeout | 120s | Timeout for a task to push data rpc message. | 0.2.0 |
celeborn.push.limit.inFlight.sleepInterval | 50ms | Sleep interval when check netty in-flight requests to be done. | 0.2.0 |
celeborn.push.limit.inFlight.timeout | 240s | Timeout for netty in-flight requests to be done. | 0.2.0 |
celeborn.push.maxReqsInFlight | 32 | Amount of Netty in-flight requests. The maximum memory is celeborn.push.maxReqsInFlight * celeborn.push.buffer.max.size * compression ratio(1 in worst case), default: 64Kib * 32 = 2Mib |
0.2.0 |
celeborn.push.queue.capacity | 512 | Push buffer queue size for a task. The maximum memory is celeborn.push.buffer.max.size * celeborn.push.queue.capacity , default: 64KiB * 512 = 32MiB |
0.2.0 |
celeborn.push.replicate.enabled | true | When true, Celeborn worker will replicate shuffle data to another Celeborn worker asynchronously to ensure the pushed shuffle data won't be lost after the node failure. | 0.2.0 |
celeborn.push.retry.threads | 8 | Thread number to process shuffle re-send push data requests. | 0.2.0 |
celeborn.push.sortMemory.threshold | 64m | When SortBasedPusher use memory over the threshold, will trigger push data. | 0.2.0 |
celeborn.push.splitPartition.threads | 8 | Thread number to process shuffle split request in shuffle client. | 0.2.0 |
celeborn.push.stageEnd.timeout | <undefined> | Timeout for waiting StageEnd. Default value should be celeborn.rpc.askTimeout * (celeborn.rpc.requestCommitFiles.maxRetries + 1) . |
0.2.0 |
celeborn.rpc.cache.concurrencyLevel | 32 | The number of write locks to update rpc cache. | 0.2.0 |
celeborn.rpc.cache.expireTime | 15s | The time before a cache item is removed. | 0.2.0 |
celeborn.rpc.cache.size | 256 | The max cache items count for rpc cache. | 0.2.0 |
celeborn.rpc.getReducerFileGroup.askTimeout | <undefined> | Timeout for ask operations during get reducer file group. Default value should be celeborn.rpc.askTimeout * (celeborn.rpc.requestCommitFiles.maxRetries + 1 + 1) . |
0.2.0 |
celeborn.rpc.maxParallelism | 1024 | Max parallelism of client on sending RPC requests. | 0.2.0 |
celeborn.rpc.registerShuffle.askTimeout | <undefined> | Timeout for ask operations during register shuffle. Default value should be celeborn.rpc.askTimeout * (celeborn.slots.reserve.maxRetries + 1 + 1) . |
0.2.0 |
celeborn.rpc.requestCommitFiles.maxRetries | 2 | Max retry times for requestCommitFiles RPC. | 1.0.0 |
celeborn.rpc.requestPartition.askTimeout | <undefined> | Timeout for ask operations during request change partition location, such as revive or split partition. Default value should be celeborn.rpc.askTimeout * (celeborn.slots.reserve.maxRetries + 1) . |
0.2.0 |
celeborn.shuffle.batchHandleChangePartition.enabled | false | When true, LifecycleManager will handle change partition request in batch. Otherwise, LifecycleManager will process the requests one by one | 0.2.0 |
celeborn.shuffle.batchHandleChangePartition.interval | 100ms | Interval for LifecycleManager to schedule handling change partition requests in batch. | 0.2.0 |
celeborn.shuffle.batchHandleChangePartition.threads | 8 | Threads number for LifecycleManager to handle change partition request in batch. | 0.2.0 |
celeborn.shuffle.batchHandleCommitPartition.enabled | false | When true, LifecycleManager will handle commit partition request in batch. Otherwise, LifecycleManager won't commit partition before stage end | 0.2.0 |
celeborn.shuffle.batchHandleCommitPartition.interval | 5s | Interval for LifecycleManager to schedule handling commit partition requests in batch. | 0.2.0 |
celeborn.shuffle.batchHandleCommitPartition.threads | 8 | Threads number for LifecycleManager to handle commit partition request in batch. | 0.2.0 |
celeborn.shuffle.chuck.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
celeborn.shuffle.compression.codec | LZ4 | The codec used to compress shuffle data. By default, Celeborn provides two codecs: lz4 and zstd . |
0.2.0 |
celeborn.shuffle.compression.zstd.level | 1 | Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. | 0.2.0 |
celeborn.shuffle.expired.checkInterval | 60s | Interval for client to check expired shuffles. | 0.2.0 |
celeborn.shuffle.forceFallback.enabled | false | Whether force fallback shuffle to Spark's default. | 0.2.0 |
celeborn.shuffle.forceFallback.numPartitionsThreshold | 500000 | Celeborn will only accept shuffle of partition number lower than this configuration value. | 0.2.0 |
celeborn.shuffle.manager.port | 0 | Port used by the LifecycleManager on the Driver. | 0.2.0 |
celeborn.shuffle.partition.type | REDUCE | Type of shuffle's partition. | 0.2.0 |
celeborn.shuffle.partitionSplit.mode | SOFT | soft: the shuffle file size might be larger than split threshold. hard: the shuffle file size will be limited to split threshold. | 0.2.0 |
celeborn.shuffle.partitionSplit.threshold | 1G | Shuffle file size threshold, if file size exceeds this, trigger split. | 0.2.0 |
celeborn.shuffle.rangeReadFilter.enabled | false | If a spark application have skewed partition, this value can set to true to improve performance. | 0.2.0 |
celeborn.shuffle.register.maxRetries | 3 | Max retry times for client to register shuffle. | 0.2.0 |
celeborn.shuffle.register.retryWait | 3s | Wait time before next retry if register shuffle failed. | 0.2.0 |
celeborn.shuffle.writer | HASH | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. | 0.2.0 |
celeborn.slots.reserve.maxRetries | 3 | Max retry times for client to reserve slots. | 0.2.0 |
celeborn.slots.reserve.retryWait | 3s | Wait time before next retry if reserve slots failed. | 0.2.0 |
celeborn.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 |
celeborn.test.fetchFailure | false | Whether to test fetch chunk failure | 0.2.0 |
celeborn.test.retryCommitFiles | false | Fail commitFile request for test | 0.2.0 |
celeborn.worker.excluded.checkInterval | 30s | Interval for client to refresh excluded worker list. | 0.2.0 |
celeborn.worker.excluded.expireTimeout | 600s | Timeout time for LifecycleManager to clear reserved excluded worker. | 0.2.0 |
Columnar Shuffle
Key | Default | Description | Since |
---|---|---|---|
celeborn.columnar.offHeap.enabled | false | Whether to use off heap columnar vector. | 0.2.0 |
celeborn.columnar.shuffle.batch.size | 10000 | Vector batch size for columnar shuffle. | 0.2.0 |
celeborn.columnar.shuffle.codegen.enabled | false | Whether to use codegen for columnar-based shuffle. | 0.2.0 |
celeborn.columnar.shuffle.enabled | false | Whether to enable columnar-based shuffle. | 0.2.0 |
celeborn.columnar.shuffle.encoding.dictionary.enabled | false | Whether to use dictionary encoding for columnar-based shuffle data. | 0.2.0 |
celeborn.columnar.shuffle.encoding.dictionary.maxFactor | 0.3 | Max factor for dictionary size. The max dictionary size is min(32.0 KB, celeborn.columnar.shuffle.batch.size * celeborn.columnar.shuffle.encoding.dictionary.maxFactor) . |
0.2.0 |
Master
Key | Default | Description | Since |
---|---|---|---|
celeborn.application.heartbeat.timeout | 120s | Application heartbeat timeout. | 0.2.0 |
celeborn.ha.enabled | false | When true, master nodes run as Raft cluster mode. | 0.2.0 |
celeborn.ha.master.node.<id>.host | <required> | Host to bind of master node |
0.2.0 |
celeborn.ha.master.node.<id>.port | 9097 | Port to bind of master node |
0.2.0 |
celeborn.ha.master.node.<id>.ratis.port | 9872 | Ratis port to bind of master node |
0.2.0 |
celeborn.ha.master.ratis.raft.rpc.type | netty | RPC type for Ratis, available options: netty, grpc. | 0.2.0 |
celeborn.ha.master.ratis.raft.server.storage.dir | /tmp/ratis | 0.2.0 | |
celeborn.master.host | <localhost> | Hostname for master to bind. | 0.2.0 |
celeborn.master.metrics.prometheus.host | 0.0.0.0 | Master's Prometheus host. | 0.2.0 |
celeborn.master.metrics.prometheus.port | 9098 | Master's Prometheus port. | 0.2.0 |
celeborn.master.port | 9097 | Port for master to bind. | 0.2.0 |
celeborn.metrics.app.topDiskUsage.count | 50 | Size for top items about top disk usage applications list. | 0.2.0 |
celeborn.metrics.app.topDiskUsage.interval | 10min | Time length for a window about top disk usage application list. | 0.2.0 |
celeborn.metrics.app.topDiskUsage.windowSize | 24 | Window size about top disk usage application list. | 0.2.0 |
celeborn.metrics.capacity | 4096 | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 |
celeborn.metrics.collectPerfCritical.enabled | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 |
celeborn.metrics.enabled | true | When true, enable metrics system. | 0.2.0 |
celeborn.metrics.sample.rate | 1.0 | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 |
celeborn.metrics.timer.slidingWindow.size | 4096 | The sliding window size of timer metric. | 0.2.0 |
celeborn.shuffle.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.2.0 |
celeborn.shuffle.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.2.0 |
celeborn.shuffle.initialEstimatedPartitionSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.2.0 |
celeborn.slots.assign.extraSlots | 2 | Extra slots number when master assign slots. | 0.2.0 |
celeborn.slots.assign.loadAware.diskGroupGradient | 0.1 | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.2.0 |
celeborn.slots.assign.loadAware.fetchTimeWeight | 1.0 | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.2.1 |
celeborn.slots.assign.loadAware.flushTimeWeight | 0.0 | Weight of average flush time when calculating ordering in load-aware assignment strategy | 0.2.1 |
celeborn.slots.assign.loadAware.numDiskGroups | 5 | This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created. | 0.2.0 |
celeborn.slots.assign.policy | ROUNDROBIN | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. | 0.2.0 |
celeborn.worker.heartbeat.timeout | 120s | Worker heartbeat timeout. | 0.2.0 |
Worker
Key | Default | Description | Since |
---|---|---|---|
celeborn.client.maxRetries | 15 | Max retry times for client to connect master endpoint | 0.2.0 |
celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: <host1>:<port1>[,<host2>:<port2>]* , e.g. clb1:9097,clb2:9098,clb3:9099 . If the port is omitted, 9097 will be used. |
0.2.0 |
celeborn.metrics.capacity | 4096 | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 |
celeborn.metrics.collectPerfCritical.enabled | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 |
celeborn.metrics.enabled | true | When true, enable metrics system. | 0.2.0 |
celeborn.metrics.sample.rate | 1.0 | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 |
celeborn.metrics.timer.slidingWindow.size | 4096 | The sliding window size of timer metric. | 0.2.0 |
celeborn.shuffle.chuck.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
celeborn.shuffle.minPartitionSizeToEstimate | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.2.0 |
celeborn.shuffle.partitionSplit.min | 1m | Min size for a partition to split | 0.2.0 |
celeborn.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 |
celeborn.test.pushdataTimeout | false | Wheter to test pushdata timeout | 0.2.0 |
celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 |
celeborn.worker.commit.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. | 0.2.0 |
celeborn.worker.directMemoryRatioForMemoryShuffleStorage | 0.1 | Max ratio of direct memory to store shuffle data | 0.2.0 |
celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | Max ratio of direct memory for read buffer | 0.2.0 |
celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients. | 0.2.0 |
celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. | 0.2.0 |
celeborn.worker.directMemoryRatioToResume | 0.5 | If direct memory usage is less than this limit, worker will resume. | 0.2.0 |
celeborn.worker.disk.check.timeout | 30s | Timeout time for worker check device status. | 0.2.0 |
celeborn.worker.disk.checkFileClean.maxRetries | 3 | The number of retries for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 |
celeborn.worker.disk.checkFileClean.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 |
celeborn.worker.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.2.0 |
celeborn.worker.diskTime.slidingWindow.size | 20 | The size of sliding windows used to calculate statistics about flushed time and count. | 0.2.1 |
celeborn.worker.fetch.io.threads | 16 | Netty IO thread number of worker to handle client fetch data. The default threads number is 16. | 0.2.0 |
celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 |
celeborn.worker.flusher.buffer.size | 256k | Size of buffer used by a single flusher. | 0.2.0 |
celeborn.worker.flusher.hdd.threads | 1 | Flusher's thread count per disk used for write data to HDD disks. | 0.2.0 |
celeborn.worker.flusher.hdfs.threads | 4 | Flusher's thread count used for write data to HDFS. | 0.2.0 |
celeborn.worker.flusher.shutdownTimeout | 3s | Timeout for a flusher to shutdown. | 0.2.0 |
celeborn.worker.flusher.ssd.threads | 8 | Flusher's thread count per disk used for write data to SSD disks. | 0.2.0 |
celeborn.worker.flusher.threads | 2 | Flusher's thread count per disk for unkown-type disks. | 0.2.0 |
celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | 1s | The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown | 0.2.0 |
celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown. | 0.2.0 |
celeborn.worker.graceful.shutdown.enabled | false | When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed. | 0.2.0 |
celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | 120s | The wait time of waiting for sorting partition files during worker graceful shutdown. | 0.2.0 |
celeborn.worker.graceful.shutdown.recoverPath | <tmp>/recover | The path to store levelDB. | 0.2.0 |
celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful shutdown timeout time. | 0.2.0 |
celeborn.worker.heartbeat.timeout | 120s | Worker heartbeat timeout. | 0.2.0 |
celeborn.worker.memory.checkInterval | 10ms | Interval of worker direct memory checking. | 0.2.0 |
celeborn.worker.memory.reportInterval | 10s | Interval of worker direct memory tracker reporting to log. | 0.2.0 |
celeborn.worker.metrics.prometheus.host | 0.0.0.0 | Worker's Prometheus host. | 0.2.0 |
celeborn.worker.metrics.prometheus.port | 9096 | Worker's Prometheus port. | 0.2.0 |
celeborn.worker.monitor.disk.checkInterval | 60s | Intervals between device monitor to check disk. | 0.2.0 |
celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 |
celeborn.worker.monitor.disk.enabled | true | When true, worker will monitor device and report to master. | 0.2.0 |
celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory where linux file block information is stored. | 0.2.0 |
celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application shuffle data dir have not been operated during le duration time, will mark this application as expired. | 0.2.0 |
celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 |
celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Reserved memory when sorting a shuffle file off-heap. | 0.2.0 |
celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle file to sort. | 0.2.0 |
celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to handle client push data. The default threads number is 16. | 0.2.0 |
celeborn.worker.push.port | 0 | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 |
celeborn.worker.register.timeout | 180s | Worker register timeout. | 0.2.0 |
celeborn.worker.replicate.fastFail.duration | 60s | If a replicate request not replied during the duration, worker will mark the replicate data request as failed. | 0.2.0 |
celeborn.worker.replicate.io.threads | 16 | Netty IO thread number of worker to replicate shuffle data. The default threads number is 16. | 0.2.0 |
celeborn.worker.replicate.port | 0 | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 |
celeborn.worker.replicate.threads | 64 | Thread number of worker to replicate shuffle data. | 0.2.0 |
celeborn.worker.rpc.port | 0 | Server port for Worker to receive RPC request. | 0.2.0 |
celeborn.worker.shuffle.commit.timeout | <value of celeborn.rpc.askTimeout> | Timeout for a Celeborn worker to commit files of a shuffle. | 0.2.0 |
celeborn.worker.storage.baseDir.number | 16 | How many directories will be used if celeborn.worker.storage.dirs is not set. The directory name is a combination of celeborn.worker.storage.baseDir.prefix and from one(inclusive) to celeborn.worker.storage.baseDir.number (inclusive) step by one. |
0.2.0 |
celeborn.worker.storage.baseDir.prefix | /mnt/disk | Base directory for Celeborn worker to write if celeborn.worker.storage.dirs is not set. |
0.2.0 |
celeborn.worker.storage.dirs | <undefined> | Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=] |
0.2.0 |
celeborn.worker.workingDir | hadoop/rss-worker/shuffle_data | Worker's working dir path name. | 0.2.0 |
celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to close | 0.2.0 |
celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file writer to create if its creation was failed. | 0.2.0 |
Quota
Key | Default | Description | Since |
---|---|---|---|
celeborn.quota.configuration.path | <undefined> | Quota configuration file path. The file format should be yaml. Quota configuration file template can be found under conf directory. | 0.2.0 |
celeborn.quota.enabled | true | When true, before registering shuffle, LifecycleManager should check if current user have enough quota space, if cluster don't have enough quota space for current user, fallback to Spark's default shuffle | 0.2.0 |
celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | IdentityProvider class name. Default class is org.apache.celeborn.common.identity.DefaultIdentityProvider , return org.apache.celeborn.common.identity.UserIdentifier with default tenant id and username from org.apache.hadoop.security.UserGroupInformation . |
0.2.0 |
celeborn.quota.manager | org.apache.celeborn.common.quota.DefaultQuotaManager | QuotaManger class name. Default class is org.apache.celeborn.common.quota.DefaultQuotaManager . |
0.2.0 |
Network
Key | Default | Description | Since |
---|---|---|---|
celeborn.<module>.decoder.mode | default | Netty TransportFrameDecoder implementation, available options: default, supplier. | |
celeborn.<module>.io.backLog | 0 | Requested maximum length of the queue of incoming connections. Default 0 for no backlog. | |
celeborn.<module>.io.clientThreads | 0 | Number of threads used in the client thread pool. Default to 0, which is 2x#cores. | |
celeborn.<module>.io.connectTimeout | <value of celeborn.network.connect.timeout> | Socket connect timeout. | |
celeborn.<module>.io.connectionTimeout | <value of celeborn.network.timeout> | Connection active timeout. | |
celeborn.<module>.io.enableVerboseMetrics | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | |
celeborn.<module>.io.lazyFD | true | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. | |
celeborn.<module>.io.maxRetries | 3 | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. | |
celeborn.<module>.io.mode | NIO | Netty EventLoopGroup backend, available options: NIO, EPOLL. | |
celeborn.<module>.io.numConnectionsPerPeer | 2 | Number of concurrent connections between two nodes. | |
celeborn.<module>.io.preferDirectBufs | true | If true, we will prefer allocating off-heap byte buffers within Netty. | |
celeborn.<module>.io.receiveBuffer | 0b | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. | 0.2.0 |
celeborn.<module>.io.retryWait | 5s | Time that we will wait in order to perform a retry after an IOException. Only relevant if maxIORetries > 0. | 0.2.0 |
celeborn.<module>.io.sendBuffer | 0b | Send buffer size (SO_SNDBUF). | 0.2.0 |
celeborn.<module>.io.serverThreads | 0 | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. | |
celeborn.network.connect.timeout | 10s | Default socket connect timeout. | 0.2.0 |
celeborn.network.timeout | 240s | Default timeout for network operations. | 0.2.0 |
celeborn.port.maxRetries | 1 | When port is occupied, we will retry for max retry times. | 0.2.0 |
celeborn.rpc.askTimeout | <value of celeborn.network.timeout> | Timeout for RPC ask operations. | 0.2.0 |
celeborn.rpc.connect.threads | 64 | 0.2.0 | |
celeborn.rpc.haClient.askTimeout | <value of celeborn.network.timeout> | Timeout for HA client RPC ask operations. | 0.2.0 |
celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. | 0.2.0 |
celeborn.shuffle.maxChunksBeingTransferred | 9223372036854775807 | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see celeborn.shuffle.io.maxRetries and celeborn.shuffle.io.retryWait ), if those limits are reached the task will fail with fetch failure. |
0.2.0 |
Metrics
Key | Default | Description | Since |
---|---|---|---|
celeborn.master.metrics.prometheus.host | 0.0.0.0 | Master's Prometheus host. | 0.2.0 |
celeborn.master.metrics.prometheus.port | 9098 | Master's Prometheus port. | 0.2.0 |
celeborn.metrics.capacity | 4096 | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 |
celeborn.metrics.collectPerfCritical.enabled | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 |
celeborn.metrics.enabled | true | When true, enable metrics system. | 0.2.0 |
celeborn.metrics.sample.rate | 1.0 | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 |
celeborn.metrics.timer.slidingWindow.size | 4096 | The sliding window size of timer metric. | 0.2.0 |
celeborn.worker.metrics.prometheus.host | 0.0.0.0 | Worker's Prometheus host. | 0.2.0 |
celeborn.worker.metrics.prometheus.port | 9096 | Worker's Prometheus port. | 0.2.0 |
metrics.properties
*.sink.csv.class=org.apache.celeborn.common.metrics.sink.CsvSink
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
Environment Variables
Recommend configuring in conf/celeborn-env.sh
.
Key | Default | Description |
---|---|---|
CELEBORN_HOME |
$(cd "`dirname "$0"`"/..; pwd) |
|
CELEBORN_CONF_DIR |
${CELEBORN_CONF_DIR:-"${CELEBORN_HOME}/conf"} |
|
CELEBORN_MASTER_MEMORY |
1 GB | |
CELEBORN_WORKER_MEMORY |
1 GB | |
CELEBORN_WORKER_OFFHEAP_MEMORY |
1 GB | |
CELEBORN_MASTER_JAVA_OPTS |
||
CELEBORN_WORKER_JAVA_OPTS |
||
CELEBORN_PID_DIR |
${CELEBORN_HOME}/pids |
|
CELEBORN_LOG_DIR |
${CELEBORN_HOME}/logs |
|
CELEBORN_SSH_OPTS |
-o StrictHostKeyChecking=no |
|
CELEBORN_SLEEP |
Tuning
Assume we have a cluster described as below: 5 Celeborn Workers with 20 GB off-heap memory and 10 disks. As we need to reserve 20% off-heap memory for netty, so we could assume 16 GB off-heap memory can be used for flush buffers.
If spark.celeborn.push.buffer.size
is 64 KB, we can have in-flight requests up to 1310720.
If you have 8192 mapper tasks, you could set spark.celeborn.push.maxReqsInFlight=160
to gain performance improvements.
If celeborn.worker.flush.buffer.size
is 256 KB, we can have total slots up to 327680 slots.
Worker Recover Status After Restart
ShuffleClient
records the shuffle partition location's host, service port, and filename,
to support workers recovering reading existing shuffle data after worker restart,
during worker shutdown, workers should store the meta about reading shuffle partition files in LevelDB,
and restore the meta after restarting workers, also workers should keep a stable service port to support
ShuffleClient
retry reading data. Users should set celeborn.worker.graceful.shutdown.enabled
to true
and
set below service port with stable port to support worker recover status.
celeborn.worker.rpc.port
celeborn.worker.fetch.port
celeborn.worker.push.port
celeborn.worker.replicate.port