Skip to content

Configuration Guide

This documentation contains Celeborn configuration details and a tuning guide.

Important Configurations

Environment Variables

  • CELEBORN_WORKER_MEMORY=4g
  • CELEBORN_WORKER_OFFHEAP_MEMORY=24g

Celeborn workers tend to improve performance by using off-heap buffers. Off-heap memory requirement can be estimated as below:

numDirs = `celeborn.worker.storage.dirs`             # the amount of directory will be used by Celeborn storage
bufferSize = `celeborn.worker.flusher.buffer.size`   # the amount of memory will be used by a single flush buffer 
off-heap-memory = bufferSize * estimatedTasks * 2 + network memory

For example, if a Celeborn worker has 10 storage directories or disks and the buffer size is set to 256 KiB. The necessary off-heap memory is 10 GiB.

Network memory will be consumed when netty reads from a TCP channel, there will need some extra memory. Empirically, Celeborn worker off-heap memory should be set to (numDirs * bufferSize * 1.2).

All Configurations

Master

Key Default isDynamic Description Since Deprecated
celeborn.cluster.name default false Celeborn cluster name. 0.5.0
celeborn.dynamicConfig.refresh.interval 120s false Interval for refreshing the corresponding dynamic config periodically. 0.4.0
celeborn.dynamicConfig.store.backend <undefined> false Store backend for dynamic config service. Available options: FS, DB. If not provided, it means that dynamic configuration is disabled. 0.4.0
celeborn.dynamicConfig.store.db.fetch.pageSize 1000 false The page size for db store to query configurations. 0.5.0
celeborn.dynamicConfig.store.db.hikari.connectionTimeout 30s false The connection timeout that a client will wait for a connection from the pool for db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.driverClassName false The jdbc driver class name of db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.idleTimeout 600s false The idle timeout that a connection is allowed to sit idle in the pool for db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.jdbcUrl false The jdbc url of db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.maxLifetime 1800s false The maximum lifetime of a connection in the pool for db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.maximumPoolSize 2 false The maximum pool size of db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.password false The password of db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.username false The username of db store backend. 0.5.0
celeborn.dynamicConfig.store.fs.path <undefined> false The path of dynamic config file for fs store backend. The file format should be yaml. The default path is ${CELEBORN_CONF_DIR}/dynamicConfig.yaml. 0.5.0
celeborn.internal.port.enabled false false Whether to create a internal port on Masters/Workers for inter-Masters/Workers communication. This is beneficial when SASL authentication is enforced for all interactions between clients and Celeborn Services, but the services can exchange messages without being subject to SASL authentication. 0.5.0
celeborn.logConf.enabled false false When true, log the CelebornConf for debugging purposes. 0.5.0
celeborn.master.estimatedPartitionSize.initialSize 64mb false Initial partition size for estimation, it will change according to runtime stats. 0.3.0 celeborn.shuffle.initialEstimatedPartitionSize
celeborn.master.estimatedPartitionSize.maxSize <undefined> false Max partition size for estimation. Default value should be celeborn.worker.shuffle.partitionSplit.max * 2. 0.4.1
celeborn.master.estimatedPartitionSize.minSize 8mb false Ignore partition size smaller than this configuration of partition size for estimation. 0.3.0 celeborn.shuffle.minPartitionSizeToEstimate
celeborn.master.estimatedPartitionSize.update.initialDelay 5min false Initial delay time before start updating partition size for estimation. 0.3.0 celeborn.shuffle.estimatedPartitionSize.update.initialDelay
celeborn.master.estimatedPartitionSize.update.interval 10min false Interval of updating partition size for estimation. 0.3.0 celeborn.shuffle.estimatedPartitionSize.update.interval
celeborn.master.hdfs.expireDirs.timeout 1h false The timeout for a expire dirs to be deleted on HDFS. 0.3.0
celeborn.master.heartbeat.application.timeout 300s false Application heartbeat timeout. 0.3.0 celeborn.application.heartbeat.timeout
celeborn.master.heartbeat.worker.timeout 120s false Worker heartbeat timeout. 0.3.0 celeborn.worker.heartbeat.timeout
celeborn.master.host <localhost> false Hostname for master to bind. 0.2.0
celeborn.master.http.host <localhost> false Master's http host. 0.4.0 celeborn.metrics.master.prometheus.host,celeborn.master.metrics.prometheus.host
celeborn.master.http.idleTimeout 30s false Master http server idle timeout. 0.5.0
celeborn.master.http.maxWorkerThreads 200 false Maximum number of threads in the master http worker thread pool. 0.5.0
celeborn.master.http.port 9098 false Master's http port. 0.4.0 celeborn.metrics.master.prometheus.port,celeborn.master.metrics.prometheus.port
celeborn.master.http.stopTimeout 5s false Master http server stop timeout. 0.5.0
celeborn.master.internal.port 8097 false Internal port on the master where both workers and other master nodes connect. 0.5.0
celeborn.master.port 9097 false Port for master to bind. 0.2.0
celeborn.master.rackResolver.refresh.interval 30s false Interval for refreshing the node rack information periodically. 0.5.0
celeborn.master.send.applicationMeta.threads 8 false Number of threads used by the Master to send ApplicationMeta to Workers. 0.5.0
celeborn.master.slot.assign.extraSlots 2 false Extra slots number when master assign slots. 0.3.0 celeborn.slots.assign.extraSlots
celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1 false This value means how many more workload will be placed into a faster disk group than a slower group. 0.3.0 celeborn.slots.assign.loadAware.diskGroupGradient
celeborn.master.slot.assign.loadAware.fetchTimeWeight 1.0 false Weight of average fetch time when calculating ordering in load-aware assignment strategy 0.3.0 celeborn.slots.assign.loadAware.fetchTimeWeight
celeborn.master.slot.assign.loadAware.flushTimeWeight 0.0 false Weight of average flush time when calculating ordering in load-aware assignment strategy 0.3.0 celeborn.slots.assign.loadAware.flushTimeWeight
celeborn.master.slot.assign.loadAware.numDiskGroups 5 false This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created. 0.3.0 celeborn.slots.assign.loadAware.numDiskGroups
celeborn.master.slot.assign.maxWorkers 10000 false Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see celeborn.client.slot.assign.maxWorkers. 0.3.1
celeborn.master.slot.assign.policy ROUNDROBIN false Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when HDFS is enabled in celeborn.storage.activeTypes 0.3.0 celeborn.slots.assign.policy
celeborn.master.userResourceConsumption.update.interval 30s false Time length for a window about compute user resource consumption. 0.3.0
celeborn.master.workerUnavailableInfo.expireTimeout 1800s false Worker unavailable info would be cleared when the retention period is expired 0.3.1
celeborn.quota.enabled true false When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. 0.2.0
celeborn.redaction.regex (?i)secret password token access[.]key false
celeborn.storage.availableTypes HDD false Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. 0.3.0 celeborn.storage.activeTypes
celeborn.storage.hdfs.dir <undefined> false HDFS base directory for Celeborn to store shuffle data. 0.2.0
celeborn.storage.hdfs.kerberos.keytab <undefined> false Kerberos keytab file path for HDFS storage connection. 0.3.2
celeborn.storage.hdfs.kerberos.principal <undefined> false Kerberos principal for HDFS storage connection. 0.3.2

Apart from these, the following properties are also available for enable master HA:

Master HA

Key Default isDynamic Description Since Deprecated
celeborn.master.ha.enabled false false When true, master nodes run as Raft cluster mode. 0.3.0 celeborn.ha.enabled
celeborn.master.ha.node.<id>.host <required> false Host to bind of master node in HA mode. 0.3.0 celeborn.ha.master.node.<id>.host
celeborn.master.ha.node.<id>.internal.port 8097 false Internal port for the workers and other masters to bind to a master node in HA mode. 0.5.0
celeborn.master.ha.node.<id>.port 9097 false Port to bind of master node in HA mode. 0.3.0 celeborn.ha.master.node.<id>.port
celeborn.master.ha.node.<id>.ratis.port 9872 false Ratis port to bind of master node in HA mode. 0.3.0 celeborn.ha.master.node.<id>.ratis.port
celeborn.master.ha.ratis.raft.rpc.type netty false RPC type for Ratis, available options: netty, grpc. 0.3.0 celeborn.ha.master.ratis.raft.rpc.type
celeborn.master.ha.ratis.raft.server.storage.dir /tmp/ratis false 0.3.0 celeborn.ha.master.ratis.raft.server.storage.dir

Worker

Key Default isDynamic Description Since Deprecated
celeborn.cluster.name default false Celeborn cluster name. 0.5.0
celeborn.dynamicConfig.refresh.interval 120s false Interval for refreshing the corresponding dynamic config periodically. 0.4.0
celeborn.dynamicConfig.store.backend <undefined> false Store backend for dynamic config service. Available options: FS, DB. If not provided, it means that dynamic configuration is disabled. 0.4.0
celeborn.dynamicConfig.store.db.fetch.pageSize 1000 false The page size for db store to query configurations. 0.5.0
celeborn.dynamicConfig.store.db.hikari.connectionTimeout 30s false The connection timeout that a client will wait for a connection from the pool for db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.driverClassName false The jdbc driver class name of db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.idleTimeout 600s false The idle timeout that a connection is allowed to sit idle in the pool for db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.jdbcUrl false The jdbc url of db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.maxLifetime 1800s false The maximum lifetime of a connection in the pool for db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.maximumPoolSize 2 false The maximum pool size of db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.password false The password of db store backend. 0.5.0
celeborn.dynamicConfig.store.db.hikari.username false The username of db store backend. 0.5.0
celeborn.dynamicConfig.store.fs.path <undefined> false The path of dynamic config file for fs store backend. The file format should be yaml. The default path is ${CELEBORN_CONF_DIR}/dynamicConfig.yaml. 0.5.0
celeborn.internal.port.enabled false false Whether to create a internal port on Masters/Workers for inter-Masters/Workers communication. This is beneficial when SASL authentication is enforced for all interactions between clients and Celeborn Services, but the services can exchange messages without being subject to SASL authentication. 0.5.0
celeborn.logConf.enabled false false When true, log the CelebornConf for debugging purposes. 0.5.0
celeborn.master.endpoints <localhost>:9097 false Endpoints of master nodes for celeborn client to connect, allowed pattern is: <host1>:<port1>[,<host2>:<port2>]*, e.g. clb1:9097,clb2:9098,clb3:9099. If the port is omitted, 9097 will be used. 0.2.0
celeborn.master.estimatedPartitionSize.minSize 8mb false Ignore partition size smaller than this configuration of partition size for estimation. 0.3.0 celeborn.shuffle.minPartitionSizeToEstimate
celeborn.master.internal.endpoints <localhost>:8097 false Endpoints of master nodes just for celeborn workers to connect, allowed pattern is: <host1>:<port1>[,<host2>:<port2>]*, e.g. clb1:8097,clb2:8097,clb3:8097. If the port is omitted, 8097 will be used. 0.5.0
celeborn.redaction.regex (?i)secret password token access[.]key false
celeborn.shuffle.chunk.size 8m false Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. 0.2.0
celeborn.storage.availableTypes HDD false Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. 0.3.0 celeborn.storage.activeTypes
celeborn.storage.hdfs.dir <undefined> false HDFS base directory for Celeborn to store shuffle data. 0.2.0
celeborn.storage.hdfs.kerberos.keytab <undefined> false Kerberos keytab file path for HDFS storage connection. 0.3.2
celeborn.storage.hdfs.kerberos.principal <undefined> false Kerberos principal for HDFS storage connection. 0.3.2
celeborn.worker.activeConnection.max <undefined> false If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. 0.3.1
celeborn.worker.applicationRegistry.cache.size 10000 false Cache size of the application registry on Workers. 0.5.0
celeborn.worker.bufferStream.threadsPerMountpoint 8 false Threads count for read buffer per mount point. 0.3.0
celeborn.worker.clean.threads 64 false Thread number of worker to clean up expired shuffle keys. 0.3.2
celeborn.worker.closeIdleConnections false false Whether worker will close idle connections. 0.2.0
celeborn.worker.commitFiles.threads 32 false Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least 128 when HDFS is enabled in celeborn.storage.activeTypes. 0.3.0 celeborn.worker.commit.threads
celeborn.worker.commitFiles.timeout 120s false Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least 240s when HDFS is enabled in celeborn.storage.activeTypes. 0.3.0 celeborn.worker.shuffle.commit.timeout
celeborn.worker.congestionControl.check.interval 10ms false Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. 0.3.2
celeborn.worker.congestionControl.enabled false false Whether to enable congestion control or not. 0.3.0
celeborn.worker.congestionControl.high.watermark <undefined> false If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.low.watermark 0.3.0
celeborn.worker.congestionControl.low.watermark <undefined> false Will stop congest users if the total pending bytes of disk buffer is lower than this configuration 0.3.0
celeborn.worker.congestionControl.sample.time.window 10s false The worker holds a time sliding list to calculate users' produce/consume rate 0.3.0
celeborn.worker.congestionControl.user.inactive.interval 10min false How long will consider this user is inactive if it doesn't send data 0.3.0
celeborn.worker.decommission.checkInterval 30s false The wait interval of checking whether all the shuffle expired during worker decommission 0.4.0
celeborn.worker.decommission.forceExitTimeout 6h false The wait time of waiting for all the shuffle expire during worker decommission. 0.4.0
celeborn.worker.directMemoryRatioForMemoryShuffleStorage 0.0 false Max ratio of direct memory to store shuffle data 0.2.0
celeborn.worker.directMemoryRatioForReadBuffer 0.1 false Max ratio of direct memory for read buffer 0.2.0
celeborn.worker.directMemoryRatioToPauseReceive 0.85 false If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients. 0.2.0
celeborn.worker.directMemoryRatioToPauseReplicate 0.95 false If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. 0.2.0
celeborn.worker.directMemoryRatioToResume 0.7 false If direct memory usage is less than this limit, worker will resume. 0.2.0
celeborn.worker.disk.clean.threads 4 false Thread number of worker to clean up directories of expired shuffle keys on disk. 0.3.2
celeborn.worker.fetch.heartbeat.enabled false false enable the heartbeat from worker to client when fetching data 0.3.0
celeborn.worker.fetch.io.threads <undefined> false Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. 0.2.0
celeborn.worker.fetch.port 0 false Server port for Worker to receive fetch data request from ShuffleClient. 0.2.0
celeborn.worker.flusher.buffer.size 256k false Size of buffer used by a single flusher. 0.2.0
celeborn.worker.flusher.diskTime.slidingWindow.size 20 false The size of sliding windows used to calculate statistics about flushed time and count. 0.3.0 celeborn.worker.flusher.avgFlushTime.slidingWindow.size
celeborn.worker.flusher.hdd.threads 1 false Flusher's thread count per disk used for write data to HDD disks. 0.2.0
celeborn.worker.flusher.hdfs.buffer.size 4m false Size of buffer used by a HDFS flusher. 0.3.0
celeborn.worker.flusher.hdfs.threads 8 false Flusher's thread count used for write data to HDFS. 0.2.0
celeborn.worker.flusher.shutdownTimeout 3s false Timeout for a flusher to shutdown. 0.2.0
celeborn.worker.flusher.ssd.threads 16 false Flusher's thread count per disk used for write data to SSD disks. 0.2.0
celeborn.worker.flusher.threads 16 false Flusher's thread count per disk for unknown-type disks. 0.2.0
celeborn.worker.graceful.shutdown.checkSlotsFinished.interval 1s false The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown 0.2.0
celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout 480s false The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown. 0.2.0
celeborn.worker.graceful.shutdown.enabled false false When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed. 0.2.0
celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout 120s false The wait time of waiting for sorting partition files during worker graceful shutdown. 0.2.0
celeborn.worker.graceful.shutdown.recoverDbBackend ROCKSDB false Specifies a disk-based store used in local db. ROCKSDB or LEVELDB (deprecated). 0.4.0
celeborn.worker.graceful.shutdown.recoverPath <tmp>/recover false The path to store DB. 0.2.0
celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval 5s false Interval for a Celeborn worker to flush committed file infos into Level DB. 0.3.1
celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync false false Whether to call sync method to save committed file infos into Level DB to handle OS crash. 0.3.1
celeborn.worker.graceful.shutdown.timeout 600s false The worker's graceful shutdown timeout time. 0.2.0
celeborn.worker.http.host <localhost> false Worker's http host. 0.4.0 celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host
celeborn.worker.http.idleTimeout 30s false Worker http server idle timeout. 0.5.0
celeborn.worker.http.maxWorkerThreads 200 false Maximum number of threads in the worker http worker thread pool. 0.5.0
celeborn.worker.http.port 9096 false Worker's http port. 0.4.0 celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port
celeborn.worker.http.stopTimeout 5s false Worker http server stop timeout. 0.5.0
celeborn.worker.internal.port 0 false Internal server port on the Worker where the master nodes connect. 0.5.0
celeborn.worker.jvmProfiler.enabled false false Turn on code profiling via async_profiler in workers. 0.5.0
celeborn.worker.jvmProfiler.localDir . false Local file system path on worker where profiler output is saved. Defaults to the working directory of the worker process. 0.5.0
celeborn.worker.jvmProfiler.options event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s false Options to pass on to the async profiler. 0.5.0
celeborn.worker.jvmQuake.check.interval 1s false Interval of gc behavior checking for worker jvm quake. 0.4.0
celeborn.worker.jvmQuake.dump.enabled true false Whether to heap dump for the maximum GC 'deficit' during worker jvm quake. 0.4.0
celeborn.worker.jvmQuake.dump.path <tmp>/jvm-quake/dump/<pid> false The path of heap dump for the maximum GC 'deficit' during worker jvm quake. 0.4.0
celeborn.worker.jvmQuake.dump.threshold 30s false The threshold of heap dump for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. Meanwhile, there is no heap dump generated when dump threshold is greater than kill threshold. 0.4.0
celeborn.worker.jvmQuake.enabled false false When true, Celeborn worker will start the jvm quake to monitor of gc behavior, which enables early detection of memory management issues and facilitates fast failure. 0.4.0
celeborn.worker.jvmQuake.exitCode 502 false The exit code of system kill for the maximum GC 'deficit' during worker jvm quake. 0.4.0
celeborn.worker.jvmQuake.kill.threshold 60s false The threshold of system kill for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. 0.4.0
celeborn.worker.jvmQuake.runtimeWeight 5.0 false The factor by which to multiply running JVM time, when weighing it against GCing time. 'Deficit' is accumulated as gc_time - runtime * runtime_weight, and is compared against threshold to determine whether to take action. 0.4.0
celeborn.worker.monitor.disk.check.interval 30s false Intervals between device monitor to check disk. 0.3.0 celeborn.worker.monitor.disk.checkInterval
celeborn.worker.monitor.disk.check.timeout 30s false Timeout time for worker check device status. 0.3.0 celeborn.worker.disk.check.timeout
celeborn.worker.monitor.disk.checklist readwrite,diskusage false Monitor type for disk, available items are: iohang, readwrite and diskusage. 0.2.0
celeborn.worker.monitor.disk.enabled true false When true, worker will monitor device and report to master. 0.3.0
celeborn.worker.monitor.disk.notifyError.expireTimeout 10m false The expire timeout of non-critical device error. Only notify critical error when the number of non-critical errors for a period of time exceeds threshold. 0.3.0
celeborn.worker.monitor.disk.notifyError.threshold 64 false Device monitor will only notify critical error once the accumulated valid non-critical error number exceeding this threshold. 0.3.0
celeborn.worker.monitor.disk.sys.block.dir /sys/block false The directory where linux file block information is stored. 0.2.0
celeborn.worker.monitor.memory.check.interval 10ms false Interval of worker direct memory checking. 0.3.0 celeborn.worker.memory.checkInterval
celeborn.worker.monitor.memory.report.interval 10s false Interval of worker direct memory tracker reporting to log. 0.3.0 celeborn.worker.memory.reportInterval
celeborn.worker.monitor.memory.trimChannelWaitInterval 1s false Wait time after worker trigger channel to trim cache. 0.3.0
celeborn.worker.monitor.memory.trimFlushWaitInterval 1s false Wait time after worker trigger StorageManger to flush data. 0.3.0
celeborn.worker.partition.initial.readBuffersMax 1024 false Max number of initial read buffers 0.3.0
celeborn.worker.partition.initial.readBuffersMin 1 false Min number of initial read buffers 0.3.0
celeborn.worker.partitionSorter.directMemoryRatioThreshold 0.1 false Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. 0.2.0
celeborn.worker.push.heartbeat.enabled false false enable the heartbeat from worker to client when pushing data 0.3.0
celeborn.worker.push.io.threads <undefined> false Netty IO thread number of worker to handle client push data. The default threads number is the number of flush thread. 0.2.0
celeborn.worker.push.port 0 false Server port for Worker to receive push data request from ShuffleClient. 0.2.0
celeborn.worker.readBuffer.allocationWait 50ms false The time to wait when buffer dispatcher can not allocate a buffer. 0.3.0
celeborn.worker.readBuffer.target.changeThreshold 1mb false The target ratio for pre read memory usage. 0.3.0
celeborn.worker.readBuffer.target.ratio 0.9 false The target ratio for read ahead buffer's memory usage. 0.3.0
celeborn.worker.readBuffer.target.updateInterval 100ms false The interval for memory manager to calculate new read buffer's target memory. 0.3.0
celeborn.worker.readBuffer.toTriggerReadMin 32 false Min buffers count for map data partition to trigger read. 0.3.0
celeborn.worker.register.timeout 180s false Worker register timeout. 0.2.0
celeborn.worker.replicate.fastFail.duration 60s false If a replicate request not replied during the duration, worker will mark the replicate data request as failed.It's recommended to set at least 240s when HDFS is enabled in celeborn.storage.activeTypes. 0.2.0
celeborn.worker.replicate.io.threads <undefined> false Netty IO thread number of worker to replicate shuffle data. The default threads number is the number of flush thread. 0.2.0
celeborn.worker.replicate.port 0 false Server port for Worker to receive replicate data request from other Workers. 0.2.0
celeborn.worker.replicate.randomConnection.enabled true false Whether worker will create random connection to peer when replicate data. When false, worker tend to reuse the same cached TransportClient to a specific replicate worker; when true, worker tend to use different cached TransportClient. Netty will use the same thread to serve the same connection, so with more connections replicate server can leverage more netty threads 0.2.1
celeborn.worker.replicate.threads 64 false Thread number of worker to replicate shuffle data. 0.2.0
celeborn.worker.rpc.port 0 false Server port for Worker to receive RPC request. 0.2.0
celeborn.worker.shuffle.partitionSplit.enabled true false enable the partition split on worker side 0.3.0 celeborn.worker.partition.split.enabled
celeborn.worker.shuffle.partitionSplit.max 2g false Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit. 0.3.0
celeborn.worker.shuffle.partitionSplit.min 1m false Min size for a partition to split 0.3.0 celeborn.shuffle.partitionSplit.min
celeborn.worker.sortPartition.indexCache.expire 180s false PartitionSorter's cache item expire time. 0.4.0
celeborn.worker.sortPartition.indexCache.maxWeight 100000 false PartitionSorter's cache max weight for index buffer. 0.4.0
celeborn.worker.sortPartition.prefetch.enabled true false When true, partition sorter will prefetch the original partition files to page cache and reserve memory configured by celeborn.worker.sortPartition.reservedMemoryPerPartition to allocate a block of memory for prefetching while sorting a shuffle file off-heap with page cache for non-hdfs files. Otherwise, partition sorter seeks to position of each block and does not prefetch for non-hdfs files. 0.5.0
celeborn.worker.sortPartition.reservedMemoryPerPartition 1mb false Reserved memory when sorting a shuffle file off-heap. 0.3.0 celeborn.worker.partitionSorter.reservedMemoryPerPartition
celeborn.worker.sortPartition.threads <undefined> false PartitionSorter's thread counts. It's recommended to set at least 64 when HDFS is enabled in celeborn.storage.activeTypes. 0.3.0 celeborn.worker.partitionSorter.threads
celeborn.worker.sortPartition.timeout 220s false Timeout for a shuffle file to sort. 0.3.0 celeborn.worker.partitionSorter.sort.timeout
celeborn.worker.storage.checkDirsEmpty.maxRetries 3 false The number of retries for a worker to check if the working directory is cleaned up before registering with the master. 0.3.0 celeborn.worker.disk.checkFileClean.maxRetries
celeborn.worker.storage.checkDirsEmpty.timeout 1000ms false The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. 0.3.0 celeborn.worker.disk.checkFileClean.timeout
celeborn.worker.storage.dirs <undefined> false Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=] 0.2.0
celeborn.worker.storage.disk.reserve.ratio <undefined> false Celeborn worker reserved ratio for each disk. The minimum usable size for each disk is the max space between the reserved space and the space calculate via reserved ratio. 0.3.2
celeborn.worker.storage.disk.reserve.size 5G false Celeborn worker reserved space for each disk. 0.3.0 celeborn.worker.disk.reserve.size
celeborn.worker.storage.expireDirs.timeout 1h false The timeout for a expire dirs to be deleted on disk. 0.3.2
celeborn.worker.storage.workingDir celeborn-worker/shuffle_data false Worker's working dir path name. 0.3.0 celeborn.worker.workingDir
celeborn.worker.writer.close.timeout 120s false Timeout for a file writer to close 0.2.0
celeborn.worker.writer.create.maxAttempts 3 false Retry count for a file writer to create if its creation was failed. 0.2.0

Client

Key Default isDynamic Description Since Deprecated
celeborn.client.application.heartbeatInterval 10s false Interval for client to send heartbeat message to master. 0.3.0 celeborn.application.heartbeatInterval
celeborn.client.application.unregister.enabled true false When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. 0.3.2
celeborn.client.closeIdleConnections true false Whether client will close idle connections. 0.3.0
celeborn.client.commitFiles.ignoreExcludedWorker false false When true, LifecycleManager will skip workers which are in the excluded list. 0.3.0
celeborn.client.eagerlyCreateInputStream.threads 32 false Threads count for streamCreatorPool in CelebornShuffleReader. 0.3.1
celeborn.client.excludePeerWorkerOnFailure.enabled true false When true, Celeborn will exclude partition's peer worker on failure when push data to replica failed. 0.3.0
celeborn.client.excludedWorker.expireTimeout 180s false Timeout time for LifecycleManager to clear reserved excluded worker. Default to be 1.5 * celeborn.master.heartbeat.worker.timeoutto cover worker heartbeat timeout check period 0.3.0 celeborn.worker.excluded.expireTimeout
celeborn.client.fetch.buffer.size 64k false Size of reducer partition buffer memory for shuffle reader. The fetched data will be buffered in memory before consuming. For performance consideration keep this buffer size not less than celeborn.client.push.buffer.max.size. 0.4.0
celeborn.client.fetch.dfsReadChunkSize 8m false Max chunk size for DfsPartitionReader. 0.3.1
celeborn.client.fetch.excludeWorkerOnFailure.enabled false false Whether to enable shuffle client-side fetch exclude workers on failure. 0.3.0
celeborn.client.fetch.excludedWorker.expireTimeout <value of celeborn.client.excludedWorker.expireTimeout> false ShuffleClient is a static object, it will be used in the whole lifecycle of Executor,We give a expire time for excluded workers to avoid a transient worker issues. 0.3.0
celeborn.client.fetch.maxReqsInFlight 3 false Amount of in-flight chunk fetch request. 0.3.0 celeborn.fetch.maxReqsInFlight
celeborn.client.fetch.maxRetriesForEachReplica 3 false Max retry times of fetch chunk on each replica 0.3.0 celeborn.fetch.maxRetriesForEachReplica,celeborn.fetch.maxRetries
celeborn.client.fetch.timeout 600s false Timeout for a task to open stream and fetch chunk. 0.3.0 celeborn.fetch.timeout
celeborn.client.flink.compression.enabled true false Whether to compress data in Flink plugin. 0.3.0 remote-shuffle.job.enable-data-compression
celeborn.client.flink.inputGate.concurrentReadings 2147483647 false Max concurrent reading channels for a input gate. 0.3.0 remote-shuffle.job.concurrent-readings-per-gate
celeborn.client.flink.inputGate.memory 32m false Memory reserved for a input gate. 0.3.0 remote-shuffle.job.memory-per-gate
celeborn.client.flink.inputGate.supportFloatingBuffer true false Whether to support floating buffer in Flink input gates. 0.3.0 remote-shuffle.job.support-floating-buffer-per-input-gate
celeborn.client.flink.resultPartition.memory 64m false Memory reserved for a result partition. 0.3.0 remote-shuffle.job.memory-per-partition
celeborn.client.flink.resultPartition.supportFloatingBuffer true false Whether to support floating buffer for result partitions. 0.3.0 remote-shuffle.job.support-floating-buffer-per-output-gate
celeborn.client.mr.pushData.max 32m false Max size for a push data sent from mr client. 0.4.0
celeborn.client.push.buffer.initial.size 8k false 0.3.0 celeborn.push.buffer.initial.size
celeborn.client.push.buffer.max.size 64k false Max size of reducer partition buffer memory for shuffle hash writer. The pushed data will be buffered in memory before sending to Celeborn worker. For performance consideration keep this buffer size higher than 32K. Example: If reducer amount is 2000, buffer size is 64K, then each task will consume up to 64KiB * 2000 = 125MiB heap memory. 0.3.0 celeborn.push.buffer.max.size
celeborn.client.push.excludeWorkerOnFailure.enabled false false Whether to enable shuffle client-side push exclude workers on failures. 0.3.0
celeborn.client.push.limit.inFlight.sleepInterval 50ms false Sleep interval when check netty in-flight requests to be done. 0.3.0 celeborn.push.limit.inFlight.sleepInterval
celeborn.client.push.limit.inFlight.timeout <undefined> false Timeout for netty in-flight requests to be done.Default value should be celeborn.client.push.timeout * 2. 0.3.0 celeborn.push.limit.inFlight.timeout
celeborn.client.push.limit.strategy SIMPLE false The strategy used to control the push speed. Valid strategies are SIMPLE and SLOWSTART. The SLOWSTART strategy usually works with congestion control mechanism on the worker side. 0.3.0
celeborn.client.push.maxReqsInFlight.perWorker 32 false Amount of Netty in-flight requests per worker. Default max memory of in flight requests per worker is celeborn.client.push.maxReqsInFlight.perWorker * celeborn.client.push.buffer.max.size * compression ratio(1 in worst case): 64KiB * 32 = 2MiB. The maximum memory will not exceed celeborn.client.push.maxReqsInFlight.total. 0.3.0
celeborn.client.push.maxReqsInFlight.total 256 false Amount of total Netty in-flight requests. The maximum memory is celeborn.client.push.maxReqsInFlight.total * celeborn.client.push.buffer.max.size * compression ratio(1 in worst case): 64KiB * 256 = 16MiB 0.3.0 celeborn.push.maxReqsInFlight
celeborn.client.push.queue.capacity 512 false Push buffer queue size for a task. The maximum memory is celeborn.client.push.buffer.max.size * celeborn.client.push.queue.capacity, default: 64KiB * 512 = 32MiB 0.3.0 celeborn.push.queue.capacity
celeborn.client.push.replicate.enabled false false When true, Celeborn worker will replicate shuffle data to another Celeborn worker asynchronously to ensure the pushed shuffle data won't be lost after the node failure. It's recommended to set false when HDFS is enabled in celeborn.storage.activeTypes. 0.3.0 celeborn.push.replicate.enabled
celeborn.client.push.retry.threads 8 false Thread number to process shuffle re-send push data requests. 0.3.0 celeborn.push.retry.threads
celeborn.client.push.revive.batchSize 2048 false Max number of partitions in one Revive request. 0.3.0
celeborn.client.push.revive.interval 100ms false Interval for client to trigger Revive to LifecycleManager. The number of partitions in one Revive request is celeborn.client.push.revive.batchSize. 0.3.0
celeborn.client.push.revive.maxRetries 5 false Max retry times for reviving when celeborn push data failed. 0.3.0
celeborn.client.push.sendBufferPool.checkExpireInterval 30s false Interval to check expire for send buffer pool. If the pool has been idle for more than celeborn.client.push.sendBufferPool.expireTimeout, the pooled send buffers and push tasks will be cleaned up. 0.3.1
celeborn.client.push.sendBufferPool.expireTimeout 60s false Timeout before clean up SendBufferPool. If SendBufferPool is idle for more than this time, the send buffers and push tasks will be cleaned up. 0.3.1
celeborn.client.push.slowStart.initialSleepTime 500ms false The initial sleep time if the current max in flight requests is 0 0.3.0
celeborn.client.push.slowStart.maxSleepTime 2s false If celeborn.client.push.limit.strategy is set to SLOWSTART, push side will take a sleep strategy for each batch of requests, this controls the max sleep time if the max in flight requests limit is 1 for a long time 0.3.0
celeborn.client.push.sort.randomizePartitionId.enabled false false Whether to randomize partitionId in push sorter. If true, partitionId will be randomized when sort data to avoid skew when push to worker 0.3.0 celeborn.push.sort.randomizePartitionId.enabled
celeborn.client.push.stageEnd.timeout <value of celeborn.<module>.io.connectionTimeout> false Timeout for waiting StageEnd. During this process, there are celeborn.client.requestCommitFiles.maxRetries times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value celeborn.<module>.io.connectionTimeout. 0.3.0 celeborn.push.stageEnd.timeout
celeborn.client.push.takeTaskMaxWaitAttempts 1 false Max wait times if no task available to push to worker. 0.3.0
celeborn.client.push.takeTaskWaitInterval 50ms false Wait interval if no task available to push to worker. 0.3.0
celeborn.client.push.timeout 120s false Timeout for a task to push data rpc message. This value should better be more than twice of celeborn.<module>.push.timeoutCheck.interval 0.3.0 celeborn.push.data.timeout
celeborn.client.readLocalShuffleFile.enabled false false Enable read local shuffle file for clusters that co-deployed with yarn node manager. 0.3.1
celeborn.client.readLocalShuffleFile.threads 4 false Threads count for read local shuffle file. 0.3.1
celeborn.client.registerShuffle.maxRetries 3 false Max retry times for client to register shuffle. 0.3.0 celeborn.shuffle.register.maxRetries
celeborn.client.registerShuffle.retryWait 3s false Wait time before next retry if register shuffle failed. 0.3.0 celeborn.shuffle.register.retryWait
celeborn.client.requestCommitFiles.maxRetries 4 false Max retry times for requestCommitFiles RPC. 0.3.0
celeborn.client.reserveSlots.maxRetries 3 false Max retry times for client to reserve slots. 0.3.0 celeborn.slots.reserve.maxRetries
celeborn.client.reserveSlots.rackaware.enabled false false Whether need to place different replicates on different racks when allocating slots. 0.3.1 celeborn.client.reserveSlots.rackware.enabled
celeborn.client.reserveSlots.retryWait 3s false Wait time before next retry if reserve slots failed. 0.3.0 celeborn.slots.reserve.retryWait
celeborn.client.rpc.cache.concurrencyLevel 32 false The number of write locks to update rpc cache. 0.3.0 celeborn.rpc.cache.concurrencyLevel
celeborn.client.rpc.cache.expireTime 15s false The time before a cache item is removed. 0.3.0 celeborn.rpc.cache.expireTime
celeborn.client.rpc.cache.size 256 false The max cache items count for rpc cache. 0.3.0 celeborn.rpc.cache.size
celeborn.client.rpc.getReducerFileGroup.askTimeout <value of celeborn.<module>.io.connectionTimeout> false Timeout for ask operations during getting reducer file group information. During this process, there are celeborn.client.requestCommitFiles.maxRetries times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value celeborn.<module>.io.connectionTimeout. 0.2.0
celeborn.client.rpc.maxRetries 3 false Max RPC retry times in LifecycleManager. 0.3.2
celeborn.client.rpc.registerShuffle.askTimeout <value of celeborn.<module>.io.connectionTimeout> false Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and celeborn.client.reserveSlots.maxRetries times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value celeborn.<module>.io.connectionTimeout. 0.3.0 celeborn.rpc.registerShuffle.askTimeout
celeborn.client.rpc.requestPartition.askTimeout <value of celeborn.<module>.io.connectionTimeout> false Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are celeborn.client.reserveSlots.maxRetries times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value celeborn.<module>.io.connectionTimeout. 0.2.0
celeborn.client.rpc.reserveSlots.askTimeout <value of celeborn.rpc.askTimeout> false Timeout for LifecycleManager request reserve slots. 0.3.0
celeborn.client.rpc.shared.threads 16 false Number of shared rpc threads in LifecycleManager. 0.3.2
celeborn.client.shuffle.batchHandleChangePartition.interval 100ms false Interval for LifecycleManager to schedule handling change partition requests in batch. 0.3.0 celeborn.shuffle.batchHandleChangePartition.interval
celeborn.client.shuffle.batchHandleChangePartition.threads 8 false Threads number for LifecycleManager to handle change partition request in batch. 0.3.0 celeborn.shuffle.batchHandleChangePartition.threads
celeborn.client.shuffle.batchHandleCommitPartition.interval 5s false Interval for LifecycleManager to schedule handling commit partition requests in batch. 0.3.0 celeborn.shuffle.batchHandleCommitPartition.interval
celeborn.client.shuffle.batchHandleCommitPartition.threads 8 false Threads number for LifecycleManager to handle commit partition request in batch. 0.3.0 celeborn.shuffle.batchHandleCommitPartition.threads
celeborn.client.shuffle.batchHandleReleasePartition.interval 5s false Interval for LifecycleManager to schedule handling release partition requests in batch. 0.3.0
celeborn.client.shuffle.batchHandleReleasePartition.threads 8 false Threads number for LifecycleManager to handle release partition request in batch. 0.3.0
celeborn.client.shuffle.compression.codec LZ4 false The codec used to compress shuffle data. By default, Celeborn provides three codecs: lz4, zstd, none. none means that shuffle compression is disabled. Since Flink version 1.17, zstd is supported for Flink shuffle client. 0.3.0 celeborn.shuffle.compression.codec,remote-shuffle.job.compression.codec
celeborn.client.shuffle.compression.zstd.level 1 false Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. 0.3.0 celeborn.shuffle.compression.zstd.level
celeborn.client.shuffle.decompression.lz4.xxhash.instance <undefined> false Decompression XXHash instance for Lz4. Available options: JNI, JAVASAFE, JAVAUNSAFE. 0.3.2
celeborn.client.shuffle.expired.checkInterval 60s false Interval for client to check expired shuffles. 0.3.0 celeborn.shuffle.expired.checkInterval
celeborn.client.shuffle.manager.port 0 false Port used by the LifecycleManager on the Driver. 0.3.0 celeborn.shuffle.manager.port
celeborn.client.shuffle.mapPartition.split.enabled false false whether to enable shuffle partition split. Currently, this only applies to MapPartition. 0.3.1
celeborn.client.shuffle.partition.type REDUCE false Type of shuffle's partition. 0.3.0 celeborn.shuffle.partition.type
celeborn.client.shuffle.partitionSplit.mode SOFT false soft: the shuffle file size might be larger than split threshold. hard: the shuffle file size will be limited to split threshold. 0.3.0 celeborn.shuffle.partitionSplit.mode
celeborn.client.shuffle.partitionSplit.threshold 1G false Shuffle file size threshold, if file size exceeds this, trigger split. 0.3.0 celeborn.shuffle.partitionSplit.threshold
celeborn.client.shuffle.rangeReadFilter.enabled false false If a spark application have skewed partition, this value can set to true to improve performance. 0.2.0 celeborn.shuffle.rangeReadFilter.enabled
celeborn.client.shuffle.register.filterExcludedWorker.enabled false false Whether to filter excluded worker when register shuffle. 0.4.0
celeborn.client.slot.assign.maxWorkers 10000 false Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see celeborn.master.slot.assign.maxWorkers. 0.3.1
celeborn.client.spark.fetch.throwsFetchFailure false false client throws FetchFailedException instead of CelebornIOException 0.4.0
celeborn.client.spark.push.dynamicWriteMode.enabled false false Whether to dynamically switch push write mode based on conditions.If true, shuffle mode will be only determined by partition count 0.5.0
celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold 2000 false Threshold of shuffle partition number for dynamically switching push writer mode. When the shuffle partition number is greater than this value, use the sort-based shuffle writer for memory efficiency; otherwise use the hash-based shuffle writer for speed. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is true. 0.5.0
celeborn.client.spark.push.sort.memory.maxMemoryFactor 0.4 false the max portion of executor memory which can be used for SortBasedWriter buffer (only valid when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is enabled 0.5.0
celeborn.client.spark.push.sort.memory.smallPushTolerateFactor 0.2 false Only be in effect when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is turned on. The larger this value is, the more aggressive Celeborn will enlarge the Sort-based Shuffle writer's memory threshold. Specifically, this config controls when to enlarge the sort shuffle writer's memory threshold. With N bytes data in memory and V as the value of this config, if the number of pushes, C, when using sort based shuffle writer C >= (1 + V) * C' where C' is the number of pushes if we were using hash based writer, we will enlarge the memory threshold by 2X. 0.5.0
celeborn.client.spark.push.sort.memory.threshold 64m false When SortBasedPusher use memory over the threshold, will trigger push data. 0.3.0 celeborn.push.sortMemory.threshold
celeborn.client.spark.push.sort.memory.useAdaptiveThreshold false false Adaptively adjust sort-based shuffle writer's memory threshold 0.5.0
celeborn.client.spark.push.unsafeRow.fastWrite.enabled true false This is Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you have changed UnsafeRow's memory layout set this to false. 0.2.2
celeborn.client.spark.shuffle.checkWorker.enabled true false When true, before registering shuffle, LifecycleManager should check if current cluster have available workers, if cluster don't have available workers, fallback to Spark's default shuffle 0.5.0
celeborn.client.spark.shuffle.forceFallback.enabled false false Whether force fallback shuffle to Spark's default. 0.3.0 celeborn.shuffle.forceFallback.enabled
celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold 2147483647 false Celeborn will only accept shuffle of partition number lower than this configuration value. 0.3.0 celeborn.shuffle.forceFallback.numPartitionsThreshold
celeborn.client.spark.shuffle.writer HASH false Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. 0.3.0 celeborn.shuffle.writer
celeborn.master.endpoints <localhost>:9097 false Endpoints of master nodes for celeborn client to connect, allowed pattern is: <host1>:<port1>[,<host2>:<port2>]*, e.g. clb1:9097,clb2:9098,clb3:9099. If the port is omitted, 9097 will be used. 0.2.0
celeborn.quota.enabled true false When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. 0.2.0
celeborn.quota.identity.provider org.apache.celeborn.common.identity.DefaultIdentityProvider false IdentityProvider class name. Default class is org.apache.celeborn.common.identity.DefaultIdentityProvider. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. 0.2.0
celeborn.quota.identity.user-specific.tenant default false Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. 0.3.0
celeborn.quota.identity.user-specific.userName default false User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. 0.3.0
celeborn.storage.availableTypes HDD false Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. 0.3.0 celeborn.storage.activeTypes
celeborn.storage.hdfs.dir <undefined> false HDFS base directory for Celeborn to store shuffle data. 0.2.0

Quota

Key Default isDynamic Description Since Deprecated
celeborn.quota.enabled true false When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. 0.2.0
celeborn.quota.identity.provider org.apache.celeborn.common.identity.DefaultIdentityProvider false IdentityProvider class name. Default class is org.apache.celeborn.common.identity.DefaultIdentityProvider. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. 0.2.0
celeborn.quota.identity.user-specific.tenant default false Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. 0.3.0
celeborn.quota.identity.user-specific.userName default false User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. 0.3.0
celeborn.quota.tenant.diskBytesWritten 9223372036854775807 true Quota dynamic configuration for written disk bytes. 0.5.0
celeborn.quota.tenant.diskFileCount 9223372036854775807 true Quota dynamic configuration for written disk file count. 0.5.0
celeborn.quota.tenant.hdfsBytesWritten 9223372036854775807 true Quota dynamic configuration for written hdfs bytes. 0.5.0
celeborn.quota.tenant.hdfsFileCount 9223372036854775807 true Quota dynamic configuration for written hdfs file count. 0.5.0

Network

Key Default isDynamic Description Since Deprecated
celeborn.<module>.fetch.timeoutCheck.interval 5s false Interval for checking fetch data timeout. It only support setting to data since it works for shuffle client fetch data. 0.3.0
celeborn.<module>.fetch.timeoutCheck.threads 4 false Threads num for checking fetch data timeout. It only support setting to data since it works for shuffle client fetch data. 0.3.0
celeborn.<module>.heartbeat.interval 60s false The heartbeat interval between worker and client. If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to data, it works for shuffle client push and fetch data. If setting to replicate, it works for replicate client of worker replicating data to peer worker.If you are using the "celeborn.client.heartbeat.interval", please use the new configs for each module according to your needs or replace it with "celeborn.rpc.heartbeat.interval", "celeborn.data.heartbeat.interval" and"celeborn.replicate.heartbeat.interval". 0.3.0 celeborn.client.heartbeat.interval
celeborn.<module>.io.backLog 0 false Requested maximum length of the queue of incoming connections. Default 0 for no backlog. If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to push, it works for worker receiving push data. If setting to replicate, it works for replicate server of worker replicating data to peer worker. If setting to fetch, it works for worker fetch server.
celeborn.<module>.io.clientThreads 0 false Number of threads used in the client thread pool. Default to 0, which is 2x#cores. If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to data, it works for shuffle client push and fetch data. If setting to replicate, it works for replicate client of worker replicating data to peer worker.
celeborn.<module>.io.connectTimeout <value of celeborn.network.connect.timeout> false Socket connect timeout. If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to data, it works for shuffle client push and fetch data. If setting to replicate, it works for the replicate client of worker replicating data to peer worker.
celeborn.<module>.io.connectionTimeout <value of celeborn.network.timeout> false Connection active timeout. If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to data, it works for shuffle client push and fetch data. If setting to push, it works for worker receiving push data. If setting to replicate, it works for replicate server or client of worker replicating data to peer worker. If setting to fetch, it works for worker fetch server.
celeborn.<module>.io.enableVerboseMetrics false false Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked.
celeborn.<module>.io.lazyFD true false Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting to fetch, it works for worker fetch server.
celeborn.<module>.io.maxRetries 3 false Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting to push, it works for Flink shuffle client push data.
celeborn.<module>.io.mode NIO false Netty EventLoopGroup backend, available options: NIO, EPOLL.
celeborn.<module>.io.numConnectionsPerPeer 1 false Number of concurrent connections between two nodes. If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to data, it works for shuffle client push and fetch data. If setting to replicate, it works for replicate client of worker replicating data to peer worker.
celeborn.<module>.io.preferDirectBufs true false If true, we will prefer allocating off-heap byte buffers within Netty. If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to data, it works for shuffle client push and fetch data. If setting to push, it works for worker receiving push data. If setting to replicate, it works for replicate server or client of worker replicating data to peer worker. If setting to fetch, it works for worker fetch server.
celeborn.<module>.io.receiveBuffer 0b false Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to data, it works for shuffle client push and fetch data. If setting to push, it works for worker receiving push data. If setting to replicate, it works for replicate server or client of worker replicating data to peer worker. If setting to fetch, it works for worker fetch server. 0.2.0
celeborn.<module>.io.retryWait 5s false Time that we will wait in order to perform a retry after an IOException. Only relevant if maxIORetries > 0. If setting to data, it works for shuffle client push and fetch data. If setting to push, it works for Flink shuffle client push data. 0.2.0
celeborn.<module>.io.saslTimeout 30s false Timeout for a single round trip of auth message exchange, in milliseconds. 0.5.0
celeborn.<module>.io.sendBuffer 0b false Send buffer size (SO_SNDBUF). If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to data, it works for shuffle client push and fetch data. If setting to push, it works for worker receiving push data. If setting to replicate, it works for replicate server or client of worker replicating data to peer worker. If setting to fetch, it works for worker fetch server. 0.2.0
celeborn.<module>.io.serverThreads 0 false Number of threads used in the server thread pool. Default to 0, which is 2x#cores. If setting to rpc_app, works for shuffle client. If setting to rpc_service, works for master or worker. If setting to push, it works for worker receiving push data. If setting to replicate, it works for replicate server of worker replicating data to peer worker. If setting to fetch, it works for worker fetch server.
celeborn.<module>.push.timeoutCheck.interval 5s false Interval for checking push data timeout. If setting to data, it works for shuffle client push data. If setting to push, it works for Flink shuffle client push data. If setting to replicate, it works for replicate client of worker replicating data to peer worker. 0.3.0
celeborn.<module>.push.timeoutCheck.threads 4 false Threads num for checking push data timeout. If setting to data, it works for shuffle client push data. If setting to push, it works for Flink shuffle client push data. If setting to replicate, it works for replicate client of worker replicating data to peer worker. 0.3.0
celeborn.<role>.rpc.dispatcher.threads <value of celeborn.rpc.dispatcher.threads> false Threads number of message dispatcher event loop for roles
celeborn.io.maxDefaultNettyThreads 64 false Max default netty threads 0.3.2
celeborn.network.bind.preferIpAddress true false When true, prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind. 0.3.0
celeborn.network.connect.timeout 10s false Default socket connect timeout. 0.2.0
celeborn.network.memory.allocator.numArenas <undefined> false Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2. 0.3.0
celeborn.network.memory.allocator.verbose.metric false false Whether to enable verbose metric for pooled allocator. 0.3.0
celeborn.network.timeout 240s false Default timeout for network operations. 0.2.0
celeborn.port.maxRetries 1 false When port is occupied, we will retry for max retry times. 0.2.0
celeborn.rpc.askTimeout 60s false Timeout for RPC ask operations. It's recommended to set at least 240s when HDFS is enabled in celeborn.storage.activeTypes 0.2.0
celeborn.rpc.connect.threads 64 false 0.2.0
celeborn.rpc.dispatcher.threads 0 false Threads number of message dispatcher event loop. Default to 0, which is availableCore. 0.3.0 celeborn.rpc.dispatcher.numThreads
celeborn.rpc.inbox.capacity 0 false Specifies size of the in memory bounded capacity. 0.5.0
celeborn.rpc.io.threads <undefined> false Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. 0.2.0
celeborn.rpc.lookupTimeout 30s false Timeout for RPC lookup operations. 0.2.0
celeborn.shuffle.io.maxChunksBeingTransferred <undefined> false The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see celeborn.<module>.io.maxRetries and celeborn.<module>.io.retryWait), if those limits are reached the task will fail with fetch failure. 0.2.0
celeborn.ssl.<module>.enabled false false Enables SSL for securing wire traffic. 0.5.0
celeborn.ssl.<module>.enabledAlgorithms <undefined> false A comma-separated list of ciphers. The specified ciphers must be supported by JVM. The reference list of protocols can be found in the "JSSE Cipher Suite Names" section of the Java security guide. The list for Java 17 can be found at https://docs.oracle.com/en/java/javase/17/docs/specs/security/standard-names.html#jsse-cipher-suite-names . Note: If not set, the default cipher suite for the JRE will be used. 0.5.0
celeborn.ssl.<module>.keyStore <undefined> false Path to the key store file. The path can be absolute or relative to the directory in which the process is started. 0.5.0
celeborn.ssl.<module>.keyStorePassword <undefined> false Password to the key store. 0.5.0
celeborn.ssl.<module>.protocol TLSv1.2 false SSL protocol to use 0.5.0
celeborn.ssl.<module>.trustStore <undefined> false Path to the trust store file. The path can be absolute or relative to the directory in which the process is started. 0.5.0
celeborn.ssl.<module>.trustStorePassword <undefined> false Password for the trust store. 0.5.0
celeborn.ssl.<module>.trustStoreReloadIntervalMs 10s false The interval at which the trust store should be reloaded (in milliseconds). This setting is mostly only useful for server components, not applications. 0.5.0
celeborn.ssl.<module>.trustStoreReloadingEnabled false false Whether the trust store should be reloaded periodically. This setting is mostly only useful for server components, not applications. 0.5.0

Columnar Shuffle

Key Default isDynamic Description Since Deprecated
celeborn.columnarShuffle.batch.size 10000 false Vector batch size for columnar shuffle. 0.3.0 celeborn.columnar.shuffle.batch.size
celeborn.columnarShuffle.codegen.enabled false false Whether to use codegen for columnar-based shuffle. 0.3.0 celeborn.columnar.shuffle.codegen.enabled
celeborn.columnarShuffle.enabled false false Whether to enable columnar-based shuffle. 0.2.0 celeborn.columnar.shuffle.enabled
celeborn.columnarShuffle.encoding.dictionary.enabled false false Whether to use dictionary encoding for columnar-based shuffle data. 0.3.0 celeborn.columnar.shuffle.encoding.dictionary.enabled
celeborn.columnarShuffle.encoding.dictionary.maxFactor 0.3 false Max factor for dictionary size. The max dictionary size is min(32.0 KiB, celeborn.columnarShuffle.batch.size * celeborn.columnar.shuffle.encoding.dictionary.maxFactor). 0.3.0 celeborn.columnar.shuffle.encoding.dictionary.maxFactor
celeborn.columnarShuffle.offHeap.enabled false false Whether to use off heap columnar vector. 0.3.0 celeborn.columnar.offHeap.enabled

Metrics

Below metrics configuration both work for master and worker.

Key Default isDynamic Description Since Deprecated
celeborn.metrics.app.topDiskUsage.count 50 false Size for top items about top disk usage applications list. 0.2.0
celeborn.metrics.app.topDiskUsage.interval 10min false Time length for a window about top disk usage application list. 0.2.0
celeborn.metrics.app.topDiskUsage.windowSize 24 false Window size about top disk usage application list. 0.2.0
celeborn.metrics.capacity 4096 false The maximum number of metrics which a source can use to generate output strings. 0.2.0
celeborn.metrics.collectPerfCritical.enabled false false It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. 0.2.0
celeborn.metrics.conf <undefined> false Custom metrics configuration file path. Default use metrics.properties in classpath. 0.3.0
celeborn.metrics.enabled true false When true, enable metrics system. 0.2.0
celeborn.metrics.extraLabels false If default metric labels are not enough, extra metric labels can be customized. Labels' pattern is: <label1_key>=<label1_value>[,<label2_key>=<label2_value>]*; e.g. env=prod,version=1 0.3.0
celeborn.metrics.json.path /metrics/json false URI context path of json metrics HTTP server. 0.4.0
celeborn.metrics.json.pretty.enabled true false When true, view metrics in json pretty format 0.4.0
celeborn.metrics.prometheus.path /metrics/prometheus false URI context path of prometheus metrics HTTP server. 0.4.0
celeborn.metrics.sample.rate 1.0 false It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. 0.2.0
celeborn.metrics.timer.slidingWindow.size 4096 false The sliding window size of timer metric. 0.2.0
celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold 10 false Force append worker pause spent time even if worker still in pause serving state.Help user can find worker pause spent time increase, when worker always been pause state.

metrics.properties

*.sink.csv.class=org.apache.celeborn.common.metrics.sink.CsvSink
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet

Environment Variables

Recommend configuring in conf/celeborn-env.sh.

Key Default Description
CELEBORN_HOME $(cd "`dirname "$0"`"/..; pwd)
CELEBORN_CONF_DIR ${CELEBORN_CONF_DIR:-"${CELEBORN_HOME}/conf"}
CELEBORN_MASTER_MEMORY 1 GB
CELEBORN_WORKER_MEMORY 1 GB
CELEBORN_WORKER_OFFHEAP_MEMORY 1 GB
CELEBORN_MASTER_JAVA_OPTS
CELEBORN_WORKER_JAVA_OPTS
CELEBORN_PID_DIR ${CELEBORN_HOME}/pids
CELEBORN_LOG_DIR ${CELEBORN_HOME}/logs
CELEBORN_SSH_OPTS -o StrictHostKeyChecking=no
CELEBORN_SLEEP Waiting time for start-all and stop-all operations
CELEBORN_PREFER_JEMALLOC set true to enable jemalloc memory allocator
CELEBORN_JEMALLOC_PATH jemalloc library path

Tuning

Assume we have a cluster described as below: 5 Celeborn Workers with 20 GB off-heap memory and 10 disks. As we need to reserve 20% off-heap memory for netty, so we could assume 16 GB off-heap memory can be used for flush buffers.

If spark.celeborn.client.push.buffer.max.size is 64 KB, we can have in-flight requests up to 1310720. If you have 8192 mapper tasks, you could set spark.celeborn.client.push.maxReqsInFlight=160 to gain performance improvements.

If celeborn.worker.flusher.buffer.size is 256 KB, we can have total slots up to 327680 slots.

Rack Awareness

Celeborn can be rack-aware by setting celeborn.client.reserveSlots.rackware.enabled to true on client side. Shuffle partition block replica placement will use rack awareness for fault tolerance by placing one shuffle partition replica on a different rack. This provides data availability in the event of a network switch failure or partition within the cluster.

Celeborn master daemons obtain the rack id of the cluster workers by invoking either an external script or Java class as specified by configuration files. Using either the Java class or external script for topology, output must adhere to the java org.apache.hadoop.net.DNSToSwitchMapping interface. The interface expects a one-to-one correspondence to be maintained and the topology information in the format of /myrack/myhost, where / is the topology delimiter, myrack is the rack identifier, and myhost is the individual host. Assuming a single /24 subnet per rack, one could use the format of /192.168.100.0/192.168.100.5 as a unique rack-host topology mapping.

To use the Java class for topology mapping, the class name is specified by the celeborn.hadoop.net.topology.node.switch.mapping.impl parameter in the master configuration file. An example, NetworkTopology.java, is included with the Celeborn distribution and can be customized by the Celeborn administrator. Using a Java class instead of an external script has a performance benefit in that Celeborn doesn't need to fork an external process when a new worker node registers itself.

If implementing an external script, it will be specified with the celeborn.hadoop.net.topology.script.file.name parameter in the master side configuration files. Unlike the Java class, the external topology script is not included with the Celeborn distribution and is provided by the administrator. Celeborn will send multiple IP addresses to ARGV when forking the topology script. The number of IP addresses sent to the topology script is controlled with celeborn.hadoop.net.topology.script.number.args and defaults to 100. If celeborn.hadoop.net.topology.script.number.args was changed to 1, a topology script would get forked for each IP submitted by workers.

If celeborn.hadoop.net.topology.script.file.name or celeborn.hadoop.net.topology.node.switch.mapping.impl is not set, the rack id /default-rack is returned for any passed IP address. While this behavior appears desirable, it can cause issues with shuffle partition block replication as default behavior is to write one replicated block off rack and is unable to do so as there is only a single rack named /default-rack.

Example can refer to Hadoop Rack Awareness since Celeborn use hadoop's code about rack-aware.

Worker Recover Status After Restart

ShuffleClient records the shuffle partition location's host, service port, and filename, to support workers recovering reading existing shuffle data after worker restart, during worker shutdown, workers should store the meta about reading shuffle partition files in RocksDB or LevelDB(deprecated), and restore the meta after restarting workers, also workers should keep a stable service port to support ShuffleClient retry reading data. Users should set celeborn.worker.graceful.shutdown.enabled to true and set below service port with stable port to support worker recover status.

celeborn.worker.rpc.port
celeborn.worker.fetch.port
celeborn.worker.push.port
celeborn.worker.replicate.port