Configuration Guide
This documentation contains Celeborn configuration details and a tuning guide.
Important Configurations
Environment Variables
CELEBORN_WORKER_MEMORY=4g
CELEBORN_WORKER_OFFHEAP_MEMORY=24g
Celeborn workers tend to improve performance by using off-heap buffers. Off-heap memory requirement can be estimated as below:
numDirs = `celeborn.worker.storage.dirs` # the amount of directory will be used by Celeborn storage
bufferSize = `celeborn.worker.flusher.buffer.size` # the amount of memory will be used by a single flush buffer
off-heap-memory = (disk buffer * disks) + network memory # the disk buffer is a logical memory region that stores shuffle data received from network
# shuffle data will be flushed to disks through write tasks
# the amount of disk buffer can be set to 1GB or larger for each disk according to the difference of your disk speed and network speed
For example, if a Celeborn worker give each disk 1GiB memory and the buffer size is set to 256 KB.
Celeborn worker can support up to 4096 concurrent write tasks for each disk.
If this worker has 10 disks, the offheap memory should be set to 12GB.
Network memory will be consumed when netty reads from a TCP channel, there will need some extra
memory. Empirically, Celeborn worker off-heap memory should be set to ((disk buffer * disks) * 1.2)
.
All Configurations
Master
Key | Default | isDynamic | Description | Since | Deprecated |
---|---|---|---|---|---|
celeborn.cluster.name | default | false | Celeborn cluster name. | 0.5.0 | |
celeborn.container.info.provider | org.apache.celeborn.server.common.container.DefaultContainerInfoProvider | false | ContainerInfoProvider class name. Default class is org.apache.celeborn.server.common.container.DefaultContainerInfoProvider . |
0.6.0 | |
celeborn.dynamicConfig.refresh.interval | 120s | false | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | |
celeborn.dynamicConfig.store.backend | <undefined> | false | Store backend for dynamic config service. The store backend can be specified in two ways: - Using the short name of the store backend defined in the implementation of ConfigStore#getName whose return value can be mapped to the corresponding backend implementation. Available options: FS, DB. - Using the service class name of the store backend implementation. If not provided, it means that dynamic configuration is disabled. |
0.4.0 | |
celeborn.dynamicConfig.store.db.fetch.pageSize | 1000 | false | The page size for db store to query configurations. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.connectionTimeout | 30s | false | The connection timeout that a client will wait for a connection from the pool for db store backend. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.driverClassName | false | The jdbc driver class name of db store backend. | 0.5.0 | ||
celeborn.dynamicConfig.store.db.hikari.idleTimeout | 600s | false | The idle timeout that a connection is allowed to sit idle in the pool for db store backend. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.jdbcUrl | false | The jdbc url of db store backend. | 0.5.0 | ||
celeborn.dynamicConfig.store.db.hikari.maxLifetime | 1800s | false | The maximum lifetime of a connection in the pool for db store backend. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.maximumPoolSize | 2 | false | The maximum pool size of db store backend. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.password | false | The password of db store backend. | 0.5.0 | ||
celeborn.dynamicConfig.store.db.hikari.username | false | The username of db store backend. | 0.5.0 | ||
celeborn.dynamicConfig.store.fs.path | <undefined> | false | The path of dynamic config file for fs store backend. The file format should be yaml. The default path is ${CELEBORN_CONF_DIR}/dynamicConfig.yaml . |
0.5.0 | |
celeborn.internal.port.enabled | false | false | Whether to create a internal port on Masters/Workers for inter-Masters/Workers communication. This is beneficial when SASL authentication is enforced for all interactions between clients and Celeborn Services, but the services can exchange messages without being subject to SASL authentication. | 0.5.0 | |
celeborn.logConf.enabled | false | false | When true , log the CelebornConf for debugging purposes. |
0.5.0 | |
celeborn.master.allowWorkerHostPattern | <undefined> | false | Pattern of worker host that allowed to register with the master. If not set, all workers are allowed to register. | 0.6.0 | |
celeborn.master.denyWorkerHostPattern | <undefined> | false | Pattern of worker host that denied to register with the master. If not set, no workers are denied to register. | 0.6.0 | |
celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on S3 or HDFS. | 0.6.0 | |
celeborn.master.estimatedPartitionSize.initialSize | 64mb | false | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize |
celeborn.master.estimatedPartitionSize.maxSize | <undefined> | false | Max partition size for estimation. Default value should be celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 | |
celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | false | Initial delay time before start updating partition size for estimation. | 0.3.0 | celeborn.shuffle.estimatedPartitionSize.update.initialDelay |
celeborn.master.estimatedPartitionSize.update.interval | 10min | false | Interval of updating partition size for estimation. | 0.3.0 | celeborn.shuffle.estimatedPartitionSize.update.interval |
celeborn.master.excludeWorker.unhealthyDiskRatioThreshold | 1.0 | false | Max ratio of unhealthy disks for excluding worker, when unhealthy disk is larger than max unhealthy count, master will exclude worker. If this value is set to 1, master will exclude worker of which disks are all unhealthy. | 0.6.0 | |
celeborn.master.hdfs.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on HDFS. | 0.3.0 | |
celeborn.master.heartbeat.application.timeout | 300s | false | Application heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout |
celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout |
celeborn.master.host | <localhost> | false | Hostname for master to bind. | 0.2.0 | |
celeborn.master.http.auth.administers | false | A comma-separated list of users who have admin privileges, Note, when celeborn.master.http.auth.supportedSchemes is not set, everyone is treated as administrator. | 0.6.0 | ||
celeborn.master.http.auth.basic.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined password authentication implementation of org.apache.celeborn.spi.authentication.PasswdAuthenticationProvider | 0.6.0 | |
celeborn.master.http.auth.bearer.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined token authentication implementation of org.apache.celeborn.spi.authentication.TokenAuthenticationProvider | 0.6.0 | |
celeborn.master.http.auth.supportedSchemes | false | A comma-separated list of master http auth supported schemes.
|
0.6.0 | ||
celeborn.master.http.host | <localhost> | false | Master's http host. | 0.4.0 | celeborn.metrics.master.prometheus.host,celeborn.master.metrics.prometheus.host |
celeborn.master.http.idleTimeout | 30s | false | Master http server idle timeout. | 0.5.0 | |
celeborn.master.http.maxWorkerThreads | 200 | false | Maximum number of threads in the master http worker thread pool. | 0.5.0 | |
celeborn.master.http.port | 9098 | false | Master's http port. | 0.4.0 | celeborn.metrics.master.prometheus.port,celeborn.master.metrics.prometheus.port |
celeborn.master.http.proxy.client.ip.header | X-Real-IP | false | The HTTP header to record the real client IP address. If your server is behind a load balancer or other proxy, the server will see this load balancer or proxy IP address as the client IP address, to get around this common issue, most load balancers or proxies offer the ability to record the real remote IP address in an HTTP header that will be added to the request for other devices to use. Note that, because the header value can be specified to any IP address, so it will not be used for authentication. | 0.6.0 | |
celeborn.master.http.spnego.keytab | <undefined> | false | The keytab file for SPNego authentication. | 0.6.0 | |
celeborn.master.http.spnego.principal | <undefined> | false | SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM. SPNego service principal would be used when celeborn http authentication is enabled. This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 | |
celeborn.master.http.ssl.disallowed.protocols | SSLv2,SSLv3 | false | SSL versions to disallow. | 0.6.0 | |
celeborn.master.http.ssl.enabled | false | false | Set this to true for using SSL encryption in http server. | 0.6.0 | |
celeborn.master.http.ssl.include.ciphersuites | false | A comma-separated list of include SSL cipher suite names. | 0.6.0 | ||
celeborn.master.http.ssl.keystore.algorithm | <undefined> | false | SSL certificate keystore algorithm. | 0.6.0 | |
celeborn.master.http.ssl.keystore.password | <undefined> | false | SSL certificate keystore password. | 0.6.0 | |
celeborn.master.http.ssl.keystore.path | <undefined> | false | SSL certificate keystore location. | 0.6.0 | |
celeborn.master.http.ssl.keystore.type | <undefined> | false | SSL certificate keystore type. | 0.6.0 | |
celeborn.master.http.stopTimeout | 5s | false | Master http server stop timeout. | 0.5.0 | |
celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | |
celeborn.master.persist.workerNetworkLocation | false | false | 0.6.0 | ||
celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | |
celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for refreshing the node rack information periodically. | 0.5.0 | |
celeborn.master.send.applicationMeta.threads | 8 | false | Number of threads used by the Master to send ApplicationMeta to Workers. | 0.5.0 | |
celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. | 0.3.0 | celeborn.slots.assign.extraSlots |
celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient |
celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight |
celeborn.master.slot.assign.loadAware.flushTimeWeight | 0.0 | false | Weight of average flush time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.flushTimeWeight |
celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | false | This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created. | 0.3.0 | celeborn.slots.assign.loadAware.numDiskGroups |
celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see celeborn.client.slot.assign.maxWorkers . |
0.3.1 | |
celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when HDFS is enabled in celeborn.storage.availableTypes |
0.3.0 | celeborn.slots.assign.policy |
celeborn.master.userResourceConsumption.update.interval | 30s | false | Time length for a window about compute user resource consumption. | 0.3.0 | |
celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker unavailable info would be cleared when the retention period is expired. Set -1 to disable the expiration. | 0.3.1 | |
celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
celeborn.redaction.regex | (?i)secret | password | token | access[.]key | false |
celeborn.storage.availableTypes | HDD | false | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as identical. | 0.3.0 | celeborn.storage.activeTypes |
celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | |
celeborn.storage.hdfs.kerberos.keytab | <undefined> | false | Kerberos keytab file path for HDFS storage connection. | 0.3.2 | |
celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | |
celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.tags.enabled | true | false | Whether to enable tags for workers. | 0.6.0 | |
celeborn.tags.preferClientTagsExpr | false | true | When true , prefer the tags expression provided by the client over the tags expression provided by the master. |
0.6.0 | |
celeborn.tags.tagsExpr | true | Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example, prod,high-io filters workers that have both the prod and high-io tags. |
0.6.0 |
Apart from these, the following properties are also available for enable master HA:
Master HA
Key | Default | isDynamic | Description | Since | Deprecated |
---|---|---|---|---|---|
celeborn.master.ha.enabled | false | false | When true, master nodes run as Raft cluster mode. | 0.3.0 | celeborn.ha.enabled |
celeborn.master.ha.node.<id>.host | <required> | false | Host to bind of master node |
0.3.0 | celeborn.ha.master.node.<id>.host |
celeborn.master.ha.node.<id>.internal.port | 8097 | false | Internal port for the workers and other masters to bind to a master node |
0.5.0 | |
celeborn.master.ha.node.<id>.port | 9097 | false | Port to bind of master node |
0.3.0 | celeborn.ha.master.node.<id>.port |
celeborn.master.ha.node.<id>.ratis.port | 9872 | false | Ratis port to bind of master node |
0.3.0 | celeborn.ha.master.node.<id>.ratis.port |
celeborn.master.ha.ratis.raft.rpc.type | netty | false | RPC type for Ratis, available options: netty, grpc. | 0.3.0 | celeborn.ha.master.ratis.raft.rpc.type |
celeborn.master.ha.ratis.raft.server.storage.dir | /tmp/ratis | false | Root storage directory to hold RaftServer data. | 0.3.0 | celeborn.ha.master.ratis.raft.server.storage.dir |
celeborn.master.ha.ratis.raft.server.storage.startup.option | RECOVER | false | Startup option of RaftServer storage. Available options: RECOVER, FORMAT. | 0.5.0 |
Worker
Key | Default | isDynamic | Description | Since | Deprecated |
---|---|---|---|---|---|
celeborn.cluster.name | default | false | Celeborn cluster name. | 0.5.0 | |
celeborn.container.info.provider | org.apache.celeborn.server.common.container.DefaultContainerInfoProvider | false | ContainerInfoProvider class name. Default class is org.apache.celeborn.server.common.container.DefaultContainerInfoProvider . |
0.6.0 | |
celeborn.dynamicConfig.refresh.interval | 120s | false | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | |
celeborn.dynamicConfig.store.backend | <undefined> | false | Store backend for dynamic config service. The store backend can be specified in two ways: - Using the short name of the store backend defined in the implementation of ConfigStore#getName whose return value can be mapped to the corresponding backend implementation. Available options: FS, DB. - Using the service class name of the store backend implementation. If not provided, it means that dynamic configuration is disabled. |
0.4.0 | |
celeborn.dynamicConfig.store.db.fetch.pageSize | 1000 | false | The page size for db store to query configurations. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.connectionTimeout | 30s | false | The connection timeout that a client will wait for a connection from the pool for db store backend. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.driverClassName | false | The jdbc driver class name of db store backend. | 0.5.0 | ||
celeborn.dynamicConfig.store.db.hikari.idleTimeout | 600s | false | The idle timeout that a connection is allowed to sit idle in the pool for db store backend. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.jdbcUrl | false | The jdbc url of db store backend. | 0.5.0 | ||
celeborn.dynamicConfig.store.db.hikari.maxLifetime | 1800s | false | The maximum lifetime of a connection in the pool for db store backend. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.maximumPoolSize | 2 | false | The maximum pool size of db store backend. | 0.5.0 | |
celeborn.dynamicConfig.store.db.hikari.password | false | The password of db store backend. | 0.5.0 | ||
celeborn.dynamicConfig.store.db.hikari.username | false | The username of db store backend. | 0.5.0 | ||
celeborn.dynamicConfig.store.fs.path | <undefined> | false | The path of dynamic config file for fs store backend. The file format should be yaml. The default path is ${CELEBORN_CONF_DIR}/dynamicConfig.yaml . |
0.5.0 | |
celeborn.internal.port.enabled | false | false | Whether to create a internal port on Masters/Workers for inter-Masters/Workers communication. This is beneficial when SASL authentication is enforced for all interactions between clients and Celeborn Services, but the services can exchange messages without being subject to SASL authentication. | 0.5.0 | |
celeborn.logConf.enabled | false | false | When true , log the CelebornConf for debugging purposes. |
0.5.0 | |
celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of master nodes for celeborn clients to connect. Client uses resolver provided by celeborn.master.endpoints.resolver to resolve the master endpoints. By default Celeborn uses org.apache.celeborn.common.client.StaticMasterEndpointResolver which take static master endpoints as input. Allowed pattern: <host1>:<port1>[,<host2>:<port2>]* , e.g. clb1:9097,clb2:9098,clb3:9099 . If the port is omitted, 9097 will be used. If the master endpoints are not static then users can pass custom resolver implementation to discover master endpoints actively using celeborn.master.endpoints.resolver. |
0.2.0 | |
celeborn.master.endpoints.resolver | org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | Resolver class that can be used for discovering and updating the master endpoints. This allows users to provide a custom master endpoint resolver implementation. This is useful in environments where the master nodes might change due to scaling operations or infrastructure updates. Clients need to ensure that provided resolver class should be present in the classpath. | 0.6.0 | |
celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
celeborn.master.internal.endpoints | <localhost>:8097 | false | Endpoints of master nodes just for celeborn workers to connect, allowed pattern is: <host1>:<port1>[,<host2>:<port2>]* , e.g. clb1:8097,clb2:8097,clb3:8097 . If the port is omitted, 8097 will be used. |
0.5.0 | |
celeborn.redaction.regex | (?i)secret | password | token | access[.]key | false |
celeborn.shuffle.chunk.size | 8m | false | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | |
celeborn.shuffle.sortPartition.block.compactionFactor | 0.25 | false | Combine sorted shuffle blocks such that size of compacted shuffle block does not exceed compactionFactor * celeborn.shuffle.chunk.size | 0.4.2 | |
celeborn.storage.availableTypes | HDD | false | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as identical. | 0.3.0 | celeborn.storage.activeTypes |
celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | |
celeborn.storage.hdfs.kerberos.keytab | <undefined> | false | Kerberos keytab file path for HDFS storage connection. | 0.3.2 | |
celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | |
celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries. | 0.6.0 | |
celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.worker.activeConnection.max | <undefined> | false | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | |
celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size of the application registry on Workers. | 0.5.0 | |
celeborn.worker.bufferStream.threadsPerMountpoint | 8 | false | Threads count for read buffer per mount point. | 0.3.0 | |
celeborn.worker.clean.threads | 64 | false | Thread number of worker to clean up expired shuffle keys. | 0.3.2 | |
celeborn.worker.closeIdleConnections | false | false | Whether worker will close idle connections. | 0.2.0 | |
celeborn.worker.commitFiles.threads | 32 | false | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least 128 when HDFS is enabled in celeborn.storage.availableTypes . |
0.3.0 | celeborn.worker.commit.threads |
celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least 240s when HDFS is enabled in celeborn.storage.availableTypes . |
0.3.0 | celeborn.worker.shuffle.commit.timeout |
celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of worker to wait for commit shuffle data files to finish. | 0.5.0 | |
celeborn.worker.congestionControl.check.interval | 10ms | false | Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. | 0.3.2 | |
celeborn.worker.congestionControl.diskBuffer.high.watermark | 9223372036854775807b | false | If the total bytes in disk buffer exceeds this configure, will start to congest users whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.diskBuffer.low.watermark | 0.3.0 | celeborn.worker.congestionControl.high.watermark |
celeborn.worker.congestionControl.diskBuffer.low.watermark | 9223372036854775807b | false | Will stop congest users if the total pending bytes of disk buffer is lower than this configuration | 0.3.0 | celeborn.worker.congestionControl.low.watermark |
celeborn.worker.congestionControl.enabled | false | false | Whether to enable congestion control or not. | 0.3.0 | |
celeborn.worker.congestionControl.sample.time.window | 10s | false | The worker holds a time sliding list to calculate users' produce/consume rate | 0.3.0 | |
celeborn.worker.congestionControl.user.inactive.interval | 10min | false | How long will consider this user is inactive if it doesn't send data | 0.3.0 | |
celeborn.worker.congestionControl.userProduceSpeed.high.watermark | 9223372036854775807b | false | For those users that produce byte speeds greater than this configuration, start congestion for these users | 0.6.0 | |
celeborn.worker.congestionControl.userProduceSpeed.low.watermark | 9223372036854775807b | false | For those users that produce byte speeds less than this configuration, stop congestion for these users | 0.6.0 | |
celeborn.worker.congestionControl.workerProduceSpeed.high.watermark | 9223372036854775807b | false | Start congestion If worker total produce speed greater than this configuration | 0.6.0 | |
celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | |
celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | |
celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | |
celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | |
celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | false | Max ratio of direct memory for read buffer | 0.2.0 | |
celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | false | If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients. | 0.2.0 | |
celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | false | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | |
celeborn.worker.directMemoryRatioToResume | 0.7 | false | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | |
celeborn.worker.disk.clean.threads | 4 | false | Thread number of worker to clean up directories of expired shuffle keys on disk. | 0.3.2 | |
celeborn.worker.fetch.heartbeat.enabled | false | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | |
celeborn.worker.fetch.io.threads | <undefined> | false | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | |
celeborn.worker.fetch.port | 0 | false | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 | |
celeborn.worker.flusher.buffer.size | 256k | false | Size of buffer used by a single flusher. | 0.2.0 | |
celeborn.worker.flusher.diskTime.slidingWindow.size | 20 | false | The size of sliding windows used to calculate statistics about flushed time and count. | 0.3.0 | celeborn.worker.flusher.avgFlushTime.slidingWindow.size |
celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per disk used for write data to HDD disks. | 0.2.0 | |
celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used by a HDFS flusher. | 0.3.0 | |
celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count used for write data to HDFS. | 0.2.0 | |
celeborn.worker.flusher.s3.buffer.size | 6m | false | Size of buffer used by a S3 flusher. | 0.6.0 | |
celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used for write data to S3. | 0.6.0 | |
celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher to shutdown. | 0.2.0 | |
celeborn.worker.flusher.ssd.threads | 16 | false | Flusher's thread count per disk used for write data to SSD disks. | 0.2.0 | |
celeborn.worker.flusher.threads | 16 | false | Flusher's thread count per disk for unknown-type disks. | 0.2.0 | |
celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | 1s | false | The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown | 0.2.0 | |
celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | false | The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown. | 0.2.0 | |
celeborn.worker.graceful.shutdown.enabled | false | false | When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed. | 0.2.0 | |
celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | 120s | false | The wait time of waiting for sorting partition files during worker graceful shutdown. | 0.2.0 | |
celeborn.worker.graceful.shutdown.recoverDbBackend | ROCKSDB | false | Specifies a disk-based store used in local db. ROCKSDB or LEVELDB (deprecated). | 0.4.0 | |
celeborn.worker.graceful.shutdown.recoverPath | <tmp>/recover | false | The path to store DB. | 0.2.0 | |
celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s | false | Interval for a Celeborn worker to flush committed file infos into Level DB. | 0.3.1 | |
celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false | Whether to call sync method to save committed file infos into Level DB to handle OS crash. | 0.3.1 | |
celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's graceful shutdown timeout time. | 0.2.0 | |
celeborn.worker.http.auth.administers | false | A comma-separated list of users who have admin privileges, Note, when celeborn.worker.http.auth.supportedSchemes is not set, everyone is treated as administrator. | 0.6.0 | ||
celeborn.worker.http.auth.basic.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined password authentication implementation of org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 | |
celeborn.worker.http.auth.bearer.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined token authentication implementation of org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | |
celeborn.worker.http.auth.supportedSchemes | false | A comma-separated list of worker http auth supported schemes.
|
0.6.0 | ||
celeborn.worker.http.host | <localhost> | false | Worker's http host. | 0.4.0 | celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host |
celeborn.worker.http.idleTimeout | 30s | false | Worker http server idle timeout. | 0.5.0 | |
celeborn.worker.http.maxWorkerThreads | 200 | false | Maximum number of threads in the worker http worker thread pool. | 0.5.0 | |
celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 | celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port |
celeborn.worker.http.proxy.client.ip.header | X-Real-IP | false | The HTTP header to record the real client IP address. If your server is behind a load balancer or other proxy, the server will see this load balancer or proxy IP address as the client IP address, to get around this common issue, most load balancers or proxies offer the ability to record the real remote IP address in an HTTP header that will be added to the request for other devices to use. Note that, because the header value can be specified to any IP address, so it will not be used for authentication. | 0.6.0 | |
celeborn.worker.http.spnego.keytab | <undefined> | false | The keytab file for SPNego authentication. | 0.6.0 | |
celeborn.worker.http.spnego.principal | <undefined> | false | SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM. SPNego service principal would be used when celeborn http authentication is enabled. This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 | |
celeborn.worker.http.ssl.disallowed.protocols | SSLv2,SSLv3 | false | SSL versions to disallow. | 0.6.0 | |
celeborn.worker.http.ssl.enabled | false | false | Set this to true for using SSL encryption in http server. | 0.6.0 | |
celeborn.worker.http.ssl.include.ciphersuites | false | A comma-separated list of include SSL cipher suite names. | 0.6.0 | ||
celeborn.worker.http.ssl.keystore.algorithm | <undefined> | false | SSL certificate keystore algorithm. | 0.6.0 | |
celeborn.worker.http.ssl.keystore.password | <undefined> | false | SSL certificate keystore password. | 0.6.0 | |
celeborn.worker.http.ssl.keystore.path | <undefined> | false | SSL certificate keystore location. | 0.6.0 | |
celeborn.worker.http.ssl.keystore.type | <undefined> | false | SSL certificate keystore type. | 0.6.0 | |
celeborn.worker.http.stopTimeout | 5s | false | Worker http server stop timeout. | 0.5.0 | |
celeborn.worker.internal.port | 0 | false | Internal server port on the Worker where the master nodes connect. | 0.5.0 | |
celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling via async_profiler in workers. | 0.5.0 | |
celeborn.worker.jvmProfiler.localDir | . | false | Local file system path on worker where profiler output is saved. Defaults to the working directory of the worker process. | 0.5.0 | |
celeborn.worker.jvmProfiler.options | event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s | false | Options to pass on to the async profiler. | 0.5.0 | |
celeborn.worker.jvmQuake.check.interval | 1s | false | Interval of gc behavior checking for worker jvm quake. | 0.4.0 | |
celeborn.worker.jvmQuake.dump.enabled | true | false | Whether to heap dump for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | |
celeborn.worker.jvmQuake.dump.path | <tmp>/jvm-quake/dump/<pid> | false | The path of heap dump for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | |
celeborn.worker.jvmQuake.dump.threshold | 30s | false | The threshold of heap dump for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. Meanwhile, there is no heap dump generated when dump threshold is greater than kill threshold. | 0.4.0 | |
celeborn.worker.jvmQuake.enabled | false | false | When true, Celeborn worker will start the jvm quake to monitor of gc behavior, which enables early detection of memory management issues and facilitates fast failure. | 0.4.0 | |
celeborn.worker.jvmQuake.exitCode | 502 | false | The exit code of system kill for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | |
celeborn.worker.jvmQuake.kill.threshold | 60s | false | The threshold of system kill for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. | 0.4.0 | |
celeborn.worker.jvmQuake.runtimeWeight | 5.0 | false | The factor by which to multiply running JVM time, when weighing it against GCing time. 'Deficit' is accumulated as gc_time - runtime * runtime_weight , and is compared against threshold to determine whether to take action. |
0.4.0 | |
celeborn.worker.memoryFileStorage.evict.aggressiveMode.enabled | false | false | If this set to true, memory shuffle files will be evicted when worker is in PAUSED state. If the worker's offheap memory is not ample, set this to true and decrease celeborn.worker.directMemoryRatioForMemoryFileStorage will be helpful. |
0.5.1 | |
celeborn.worker.memoryFileStorage.evict.ratio | 0.5 | false | If memory shuffle storage usage rate is above this config, the memory storage shuffle files will evict to free memory. | 0.5.1 | |
celeborn.worker.memoryFileStorage.maxFileSize | 8MB | false | Max size for a memory storage file. It must be less than 2GB. | 0.5.0 | |
celeborn.worker.monitor.disk.check.interval | 30s | false | Intervals between device monitor to check disk. | 0.3.0 | celeborn.worker.monitor.disk.checkInterval |
celeborn.worker.monitor.disk.check.timeout | 30s | false | Timeout time for worker check device status. | 0.3.0 | celeborn.worker.disk.check.timeout |
celeborn.worker.monitor.disk.checklist | readwrite,diskusage | false | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 | |
celeborn.worker.monitor.disk.enabled | true | false | When true, worker will monitor device and report to master. | 0.3.0 | |
celeborn.worker.monitor.disk.notifyError.expireTimeout | 10m | false | The expire timeout of non-critical device error. Only notify critical error when the number of non-critical errors for a period of time exceeds threshold. | 0.3.0 | |
celeborn.worker.monitor.disk.notifyError.threshold | 64 | false | Device monitor will only notify critical error once the accumulated valid non-critical error number exceeding this threshold. | 0.3.0 | |
celeborn.worker.monitor.disk.sys.block.dir | /sys/block | false | The directory where linux file block information is stored. | 0.2.0 | |
celeborn.worker.monitor.memory.check.interval | 10ms | false | Interval of worker direct memory checking. | 0.3.0 | celeborn.worker.memory.checkInterval |
celeborn.worker.monitor.memory.report.interval | 10s | false | Interval of worker direct memory tracker reporting to log. | 0.3.0 | celeborn.worker.memory.reportInterval |
celeborn.worker.monitor.memory.trimChannelWaitInterval | 1s | false | Wait time after worker trigger channel to trim cache. | 0.3.0 | |
celeborn.worker.monitor.memory.trimFlushWaitInterval | 1s | false | Wait time after worker trigger StorageManger to flush data. | 0.3.0 | |
celeborn.worker.partition.initial.readBuffersMax | 1024 | false | Max number of initial read buffers | 0.3.0 | |
celeborn.worker.partition.initial.readBuffersMin | 1 | false | Min number of initial read buffers | 0.3.0 | |
celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | false | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. If this value is set to 0, partition files sorter will skip memory check and ServingState check. | 0.2.0 | |
celeborn.worker.push.heartbeat.enabled | false | false | enable the heartbeat from worker to client when pushing data | 0.3.0 | |
celeborn.worker.push.io.threads | <undefined> | false | Netty IO thread number of worker to handle client push data. The default threads number is the number of flush thread. | 0.2.0 | |
celeborn.worker.push.port | 0 | false | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 | |
celeborn.worker.readBuffer.allocationWait | 50ms | false | The time to wait when buffer dispatcher can not allocate a buffer. | 0.3.0 | |
celeborn.worker.readBuffer.target.changeThreshold | 1mb | false | The target ratio for pre read memory usage. | 0.3.0 | |
celeborn.worker.readBuffer.target.ratio | 0.9 | false | The target ratio for read ahead buffer's memory usage. | 0.3.0 | |
celeborn.worker.readBuffer.target.updateInterval | 100ms | false | The interval for memory manager to calculate new read buffer's target memory. | 0.3.0 | |
celeborn.worker.readBuffer.toTriggerReadMin | 32 | false | Min buffers count for map data partition to trigger read. | 0.3.0 | |
celeborn.worker.register.timeout | 180s | false | Worker register timeout. | 0.2.0 | |
celeborn.worker.replicate.fastFail.duration | 60s | false | If a replicate request not replied during the duration, worker will mark the replicate data request as failed. It's recommended to set at least 240s when HDFS is enabled in celeborn.storage.availableTypes . |
0.2.0 | |
celeborn.worker.replicate.io.threads | <undefined> | false | Netty IO thread number of worker to replicate shuffle data. The default threads number is the number of flush thread. | 0.2.0 | |
celeborn.worker.replicate.port | 0 | false | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 | |
celeborn.worker.replicate.randomConnection.enabled | true | false | Whether worker will create random connection to peer when replicate data. When false, worker tend to reuse the same cached TransportClient to a specific replicate worker; when true, worker tend to use different cached TransportClient. Netty will use the same thread to serve the same connection, so with more connections replicate server can leverage more netty threads | 0.2.1 | |
celeborn.worker.replicate.threads | 64 | false | Thread number of worker to replicate shuffle data. | 0.2.0 | |
celeborn.worker.rpc.port | 0 | false | Server port for Worker to receive RPC request. | 0.2.0 | |
celeborn.worker.shuffle.partitionSplit.enabled | true | false | enable the partition split on worker side | 0.3.0 | celeborn.worker.partition.split.enabled |
celeborn.worker.shuffle.partitionSplit.max | 2g | false | Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit. | 0.3.0 | |
celeborn.worker.shuffle.partitionSplit.min | 1m | false | Min size for a partition to split | 0.3.0 | celeborn.shuffle.partitionSplit.min |
celeborn.worker.sortPartition.indexCache.expire | 180s | false | PartitionSorter's cache item expire time. | 0.4.0 | |
celeborn.worker.sortPartition.indexCache.maxWeight | 100000 | false | PartitionSorter's cache max weight for index buffer. | 0.4.0 | |
celeborn.worker.sortPartition.prefetch.enabled | true | false | When true, partition sorter will prefetch the original partition files to page cache and reserve memory configured by celeborn.worker.sortPartition.reservedMemoryPerPartition to allocate a block of memory for prefetching while sorting a shuffle file off-heap with page cache for non-hdfs files. Otherwise, partition sorter seeks to position of each block and does not prefetch for non-hdfs files. |
0.5.0 | |
celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | false | Reserved memory when sorting a shuffle file off-heap. | 0.3.0 | celeborn.worker.partitionSorter.reservedMemoryPerPartition |
celeborn.worker.sortPartition.threads | <undefined> | false | PartitionSorter's thread counts. It's recommended to set at least 64 when HDFS is enabled in celeborn.storage.availableTypes . |
0.3.0 | celeborn.worker.partitionSorter.threads |
celeborn.worker.sortPartition.timeout | 220s | false | Timeout for a shuffle file to sort. | 0.3.0 | celeborn.worker.partitionSorter.sort.timeout |
celeborn.worker.storage.checkDirsEmpty.maxRetries | 3 | false | The number of retries for a worker to check if the working directory is cleaned up before registering with the master. | 0.3.0 | celeborn.worker.disk.checkFileClean.maxRetries |
celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | false | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.3.0 | celeborn.worker.disk.checkFileClean.timeout |
celeborn.worker.storage.dirs | <undefined> | false | Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=] |
0.2.0 | |
celeborn.worker.storage.disk.reserve.ratio | <undefined> | false | Celeborn worker reserved ratio for each disk. The minimum usable size for each disk is the max space between the reserved space and the space calculate via reserved ratio. | 0.3.2 | |
celeborn.worker.storage.disk.reserve.size | 5G | false | Celeborn worker reserved space for each disk. | 0.3.0 | celeborn.worker.disk.reserve.size |
celeborn.worker.storage.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on disk. | 0.3.2 | |
celeborn.worker.storage.storagePolicy.createFilePolicy | <undefined> | false | This defined the order for creating files across available storages. Available storages options are: MEMORY,SSD,HDD,HDFS,OSS | 0.5.1 | |
celeborn.worker.storage.storagePolicy.evictPolicy | <undefined> | false | This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS. Definition: StorageTypes | StorageTypes | StorageTypes. Example: MEMORY,SSD |
celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir |
celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file writer to close | 0.2.0 | |
celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a file writer to create if its creation was failed. | 0.2.0 |
Client
Key | Default | isDynamic | Description | Since | Deprecated |
---|---|---|---|---|---|
celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval |
celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | |
celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When true , add UUID suffix for unique application id. Currently, this only applies to Spark and MR. |
0.6.0 | |
celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | |
celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | |
celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | |
celeborn.client.eagerlyCreateInputStream.threads | 32 | false | Threads count for streamCreatorPool in CelebornShuffleReader. | 0.3.1 | |
celeborn.client.excludePeerWorkerOnFailure.enabled | true | false | When true, Celeborn will exclude partition's peer worker on failure when push data to replica failed. | 0.3.0 | |
celeborn.client.excludedWorker.expireTimeout | 180s | false | Timeout time for LifecycleManager to clear reserved excluded worker. Default to be 1.5 * celeborn.master.heartbeat.worker.timeout to cover worker heartbeat timeout check period |
0.3.0 | celeborn.worker.excluded.expireTimeout |
celeborn.client.fetch.buffer.size | 64k | false | Size of reducer partition buffer memory for shuffle reader. The fetched data will be buffered in memory before consuming. For performance consideration keep this buffer size not less than celeborn.client.push.buffer.max.size . |
0.4.0 | |
celeborn.client.fetch.dfsReadChunkSize | 8m | false | Max chunk size for DfsPartitionReader. | 0.3.1 | |
celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | false | Whether to enable shuffle client-side fetch exclude workers on failure. | 0.3.0 | |
celeborn.client.fetch.excludedWorker.expireTimeout | <value of celeborn.client.excludedWorker.expireTimeout> | false | ShuffleClient is a static object, it will be used in the whole lifecycle of Executor, We give a expire time for excluded workers to avoid a transient worker issues. | 0.3.0 | |
celeborn.client.fetch.maxReqsInFlight | 3 | false | Amount of in-flight chunk fetch request. | 0.3.0 | celeborn.fetch.maxReqsInFlight |
celeborn.client.fetch.maxRetriesForEachReplica | 3 | false | Max retry times of fetch chunk on each replica | 0.3.0 | celeborn.fetch.maxRetriesForEachReplica,celeborn.fetch.maxRetries |
celeborn.client.fetch.timeout | 600s | false | Timeout for a task to open stream and fetch chunk. | 0.3.0 | celeborn.fetch.timeout |
celeborn.client.flink.compression.enabled | true | false | Whether to compress data in Flink plugin. | 0.3.0 | remote-shuffle.job.enable-data-compression |
celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false | Max concurrent reading channels for a input gate. | 0.3.0 | remote-shuffle.job.concurrent-readings-per-gate |
celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate |
celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | Whether to support floating buffer in Flink input gates. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-input-gate |
celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition |
celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | Whether to support floating buffer for result partitions. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-output-gate |
celeborn.client.flink.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use flink built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use flink built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.6.0 | |
celeborn.client.inputStream.creation.window | 16 | false | Window size that CelebornShuffleReader pre-creates CelebornInputStreams, for coalesced scenario where multiple Partitions are read | 0.6.0 | |
celeborn.client.mr.pushData.max | 32m | false | Max size for a push data sent from mr client. | 0.4.0 | |
celeborn.client.push.buffer.initial.size | 8k | false | 0.3.0 | celeborn.push.buffer.initial.size | |
celeborn.client.push.buffer.max.size | 64k | false | Max size of reducer partition buffer memory for shuffle hash writer. The pushed data will be buffered in memory before sending to Celeborn worker. For performance consideration keep this buffer size higher than 32K. Example: If reducer amount is 2000, buffer size is 64K, then each task will consume up to 64KiB * 2000 = 125MiB heap memory. |
0.3.0 | celeborn.push.buffer.max.size |
celeborn.client.push.excludeWorkerOnFailure.enabled | false | false | Whether to enable shuffle client-side push exclude workers on failures. | 0.3.0 | |
celeborn.client.push.limit.inFlight.sleepInterval | 50ms | false | Sleep interval when check netty in-flight requests to be done. | 0.3.0 | celeborn.push.limit.inFlight.sleepInterval |
celeborn.client.push.limit.inFlight.timeout | <undefined> | false | Timeout for netty in-flight requests to be done. Default value should be celeborn.client.push.timeout * 2 . |
0.3.0 | celeborn.push.limit.inFlight.timeout |
celeborn.client.push.limit.strategy | SIMPLE | false | The strategy used to control the push speed. Valid strategies are SIMPLE and SLOWSTART. The SLOWSTART strategy usually works with congestion control mechanism on the worker side. | 0.3.0 | |
celeborn.client.push.maxReqsInFlight.perWorker | 32 | false | Amount of Netty in-flight requests per worker. Default max memory of in flight requests per worker is celeborn.client.push.maxReqsInFlight.perWorker * celeborn.client.push.buffer.max.size * compression ratio(1 in worst case): 64KiB * 32 = 2MiB. The maximum memory will not exceed celeborn.client.push.maxReqsInFlight.total . |
0.3.0 | |
celeborn.client.push.maxReqsInFlight.total | 256 | false | Amount of total Netty in-flight requests. The maximum memory is celeborn.client.push.maxReqsInFlight.total * celeborn.client.push.buffer.max.size * compression ratio(1 in worst case): 64KiB * 256 = 16MiB |
0.3.0 | celeborn.push.maxReqsInFlight |
celeborn.client.push.queue.capacity | 512 | false | Push buffer queue size for a task. The maximum memory is celeborn.client.push.buffer.max.size * celeborn.client.push.queue.capacity , default: 64KiB * 512 = 32MiB |
0.3.0 | celeborn.push.queue.capacity |
celeborn.client.push.replicate.enabled | false | false | When true, Celeborn worker will replicate shuffle data to another Celeborn worker asynchronously to ensure the pushed shuffle data won't be lost after the node failure. It's recommended to set false when HDFS is enabled in celeborn.storage.availableTypes . |
0.3.0 | celeborn.push.replicate.enabled |
celeborn.client.push.retry.threads | 8 | false | Thread number to process shuffle re-send push data requests. | 0.3.0 | celeborn.push.retry.threads |
celeborn.client.push.revive.batchSize | 2048 | false | Max number of partitions in one Revive request. | 0.3.0 | |
celeborn.client.push.revive.interval | 100ms | false | Interval for client to trigger Revive to LifecycleManager. The number of partitions in one Revive request is celeborn.client.push.revive.batchSize . |
0.3.0 | |
celeborn.client.push.revive.maxRetries | 5 | false | Max retry times for reviving when celeborn push data failed. | 0.3.0 | |
celeborn.client.push.sendBufferPool.checkExpireInterval | 30s | false | Interval to check expire for send buffer pool. If the pool has been idle for more than celeborn.client.push.sendBufferPool.expireTimeout , the pooled send buffers and push tasks will be cleaned up. |
0.3.1 | |
celeborn.client.push.sendBufferPool.expireTimeout | 60s | false | Timeout before clean up SendBufferPool. If SendBufferPool is idle for more than this time, the send buffers and push tasks will be cleaned up. | 0.3.1 | |
celeborn.client.push.slowStart.initialSleepTime | 500ms | false | The initial sleep time if the current max in flight requests is 0 | 0.3.0 | |
celeborn.client.push.slowStart.maxSleepTime | 2s | false | If celeborn.client.push.limit.strategy is set to SLOWSTART, push side will take a sleep strategy for each batch of requests, this controls the max sleep time if the max in flight requests limit is 1 for a long time | 0.3.0 | |
celeborn.client.push.sort.randomizePartitionId.enabled | false | false | Whether to randomize partitionId in push sorter. If true, partitionId will be randomized when sort data to avoid skew when push to worker | 0.3.0 | celeborn.push.sort.randomizePartitionId.enabled |
celeborn.client.push.stageEnd.timeout | <value of celeborn.<module>.io.connectionTimeout> | false | Timeout for waiting StageEnd. During this process, there are celeborn.client.requestCommitFiles.maxRetries times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value celeborn.<module>.io.connectionTimeout . |
0.3.0 | celeborn.push.stageEnd.timeout |
celeborn.client.push.takeTaskMaxWaitAttempts | 1 | false | Max wait times if no task available to push to worker. | 0.3.0 | |
celeborn.client.push.takeTaskWaitInterval | 50ms | false | Wait interval if no task available to push to worker. | 0.3.0 | |
celeborn.client.push.timeout | 120s | false | Timeout for a task to push data rpc message. This value should better be more than twice of celeborn.<module>.push.timeoutCheck.interval |
0.3.0 | celeborn.push.data.timeout |
celeborn.client.readLocalShuffleFile.enabled | false | false | Enable read local shuffle file for clusters that co-deployed with yarn node manager. | 0.3.1 | |
celeborn.client.readLocalShuffleFile.threads | 4 | false | Threads count for read local shuffle file. | 0.3.1 | |
celeborn.client.registerShuffle.maxRetries | 3 | false | Max retry times for client to register shuffle. | 0.3.0 | celeborn.shuffle.register.maxRetries |
celeborn.client.registerShuffle.retryWait | 3s | false | Wait time before next retry if register shuffle failed. | 0.3.0 | celeborn.shuffle.register.retryWait |
celeborn.client.requestCommitFiles.maxRetries | 4 | false | Max retry times for requestCommitFiles RPC. | 0.3.0 | |
celeborn.client.reserveSlots.maxRetries | 3 | false | Max retry times for client to reserve slots. | 0.3.0 | celeborn.slots.reserve.maxRetries |
celeborn.client.reserveSlots.rackaware.enabled | false | false | Whether need to place different replicates on different racks when allocating slots. | 0.3.1 | celeborn.client.reserveSlots.rackware.enabled |
celeborn.client.reserveSlots.retryWait | 3s | false | Wait time before next retry if reserve slots failed. | 0.3.0 | celeborn.slots.reserve.retryWait |
celeborn.client.rpc.cache.concurrencyLevel | 32 | false | The number of write locks to update rpc cache. | 0.3.0 | celeborn.rpc.cache.concurrencyLevel |
celeborn.client.rpc.cache.expireTime | 15s | false | The time before a cache item is removed. | 0.3.0 | celeborn.rpc.cache.expireTime |
celeborn.client.rpc.cache.size | 256 | false | The max cache items count for rpc cache. | 0.3.0 | celeborn.rpc.cache.size |
celeborn.client.rpc.commitFiles.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for CommitHandler commit files. | 0.4.1 | |
celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during getting reducer file group information. During this process, there are celeborn.client.requestCommitFiles.maxRetries times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. |
0.2.0 | |
celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in LifecycleManager. | 0.3.2 | |
celeborn.client.rpc.registerShuffle.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and celeborn.client.reserveSlots.maxRetries times for retry opportunities for reserving slots. User can customize this value according to your setting. |
0.3.0 | celeborn.rpc.registerShuffle.askTimeout |
celeborn.client.rpc.requestPartition.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are celeborn.client.reserveSlots.maxRetries times for retry opportunities for reserving slots. User can customize this value according to your setting. |
0.2.0 | |
celeborn.client.rpc.reserveSlots.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for LifecycleManager request reserve slots. | 0.3.0 | |
celeborn.client.rpc.shared.threads | 16 | false | Number of shared rpc threads in LifecycleManager. | 0.3.2 | |
celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms | false | Interval for LifecycleManager to schedule handling change partition requests in batch. | 0.3.0 | celeborn.shuffle.batchHandleChangePartition.interval |
celeborn.client.shuffle.batchHandleChangePartition.partitionBuckets | 256 | false | Max number of change partition requests which can be concurrently processed. | 0.5.0 | |
celeborn.client.shuffle.batchHandleChangePartition.threads | 8 | false | Threads number for LifecycleManager to handle change partition request in batch. | 0.3.0 | celeborn.shuffle.batchHandleChangePartition.threads |
celeborn.client.shuffle.batchHandleCommitPartition.interval | 5s | false | Interval for LifecycleManager to schedule handling commit partition requests in batch. | 0.3.0 | celeborn.shuffle.batchHandleCommitPartition.interval |
celeborn.client.shuffle.batchHandleCommitPartition.threads | 8 | false | Threads number for LifecycleManager to handle commit partition request in batch. | 0.3.0 | celeborn.shuffle.batchHandleCommitPartition.threads |
celeborn.client.shuffle.batchHandleReleasePartition.interval | 5s | false | Interval for LifecycleManager to schedule handling release partition requests in batch. | 0.3.0 | |
celeborn.client.shuffle.batchHandleReleasePartition.threads | 8 | false | Threads number for LifecycleManager to handle release partition request in batch. | 0.3.0 | |
celeborn.client.shuffle.batchHandleRemoveExpiredShuffles.enabled | false | false | Whether to batch remove expired shuffles. This is an optimization switch on removing expired shuffles. | 0.6.0 | |
celeborn.client.shuffle.checkWorker.enabled | true | false | When true, before registering shuffle, LifecycleManager should check if current cluster have available workers, if cluster don't have available workers, fallback to default shuffle. | 0.5.0 | celeborn.client.spark.shuffle.checkWorker.enabled |
celeborn.client.shuffle.compression.codec | LZ4 | false | The codec used to compress shuffle data. By default, Celeborn provides three codecs: lz4 , zstd , none . none means that shuffle compression is disabled. Since Flink version 1.16, zstd is supported for Flink shuffle client. |
0.3.0 | celeborn.shuffle.compression.codec,remote-shuffle.job.compression.codec |
celeborn.client.shuffle.compression.zstd.level | 1 | false | Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. | 0.3.0 | celeborn.shuffle.compression.zstd.level |
celeborn.client.shuffle.decompression.lz4.xxhash.instance | <undefined> | false | Decompression XXHash instance for Lz4. Available options: JNI, JAVASAFE, JAVAUNSAFE. | 0.3.2 | |
celeborn.client.shuffle.dynamicResourceEnabled | false | false | When enabled, the ChangePartitionManager will obtain candidate workers from the availableWorkers pool during heartbeats when worker resource change. | 0.6.0 | |
celeborn.client.shuffle.dynamicResourceFactor | 0.5 | false | The ChangePartitionManager will check whether (unavailable workers / shuffle allocated workers) is more than the factor before obtaining candidate workers from the requestSlots RPC response when celeborn.client.shuffle.dynamicResourceEnabled set true |
0.6.0 | |
celeborn.client.shuffle.expired.checkInterval | 60s | false | Interval for client to check expired shuffles. | 0.3.0 | celeborn.shuffle.expired.checkInterval |
celeborn.client.shuffle.manager.port | 0 | false | Port used by the LifecycleManager on the Driver. | 0.3.0 | celeborn.shuffle.manager.port |
celeborn.client.shuffle.mapPartition.split.enabled | false | false | whether to enable shuffle partition split. Currently, this only applies to MapPartition. | 0.3.1 | |
celeborn.client.shuffle.partition.type | REDUCE | false | Type of shuffle's partition. | 0.3.0 | celeborn.shuffle.partition.type |
celeborn.client.shuffle.partitionSplit.mode | SOFT | false | soft: the shuffle file size might be larger than split threshold. hard: the shuffle file size will be limited to split threshold. | 0.3.0 | celeborn.shuffle.partitionSplit.mode |
celeborn.client.shuffle.partitionSplit.threshold | 1G | false | Shuffle file size threshold, if file size exceeds this, trigger split. | 0.3.0 | celeborn.shuffle.partitionSplit.threshold |
celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark application have skewed partition, this value can set to true to improve performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled |
celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | false | Whether to filter excluded worker when register shuffle. | 0.4.0 | |
celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether to revise lost shuffles. | 0.6.0 | |
celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see celeborn.master.slot.assign.maxWorkers . |
0.3.1 | |
celeborn.client.spark.push.dynamicWriteMode.enabled | false | false | Whether to dynamically switch push write mode based on conditions.If true, shuffle mode will be only determined by partition count | 0.5.0 | |
celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 | false | Threshold of shuffle partition number for dynamically switching push writer mode. When the shuffle partition number is greater than this value, use the sort-based shuffle writer for memory efficiency; otherwise use the hash-based shuffle writer for speed. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 | |
celeborn.client.spark.push.sort.memory.maxMemoryFactor | 0.4 | false | the max portion of executor memory which can be used for SortBasedWriter buffer (only valid when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is enabled | 0.5.0 | |
celeborn.client.spark.push.sort.memory.smallPushTolerateFactor | 0.2 | false | Only be in effect when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is turned on. The larger this value is, the more aggressive Celeborn will enlarge the Sort-based Shuffle writer's memory threshold. Specifically, this config controls when to enlarge the sort shuffle writer's memory threshold. With N bytes data in memory and V as the value of this config, if the number of pushes, C, when using sort based shuffle writer C >= (1 + V) * C' where C' is the number of pushes if we were using hash based writer, we will enlarge the memory threshold by 2X. | 0.5.0 | |
celeborn.client.spark.push.sort.memory.threshold | 64m | false | When SortBasedPusher use memory over the threshold, will trigger push data. | 0.3.0 | celeborn.push.sortMemory.threshold |
celeborn.client.spark.push.sort.memory.useAdaptiveThreshold | false | false | Adaptively adjust sort-based shuffle writer's memory threshold | 0.5.0 | |
celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | false | This is Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you have changed UnsafeRow's memory layout set this to false. | 0.2.2 | |
celeborn.client.spark.shuffle.fallback.numPartitionsThreshold | 2147483647 | false | Celeborn will only accept shuffle of partition number lower than this configuration value. This configuration only takes effect when celeborn.client.spark.shuffle.fallback.policy is AUTO . |
0.5.0 | celeborn.shuffle.forceFallback.numPartitionsThreshold,celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold |
celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use spark built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use spark built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota, shuffle partition number; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.5.0 | |
celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always use spark built-in shuffle implementation. This configuration is deprecated, consider configuring celeborn.client.spark.shuffle.fallback.policy instead. |
0.3.0 | celeborn.shuffle.forceFallback.enabled |
celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | celeborn.shuffle.writer |
celeborn.client.spark.stageRerun.enabled | true | false | Whether to enable stage rerun. If true, client throws FetchFailedException instead of CelebornIOException. | 0.4.0 | celeborn.client.spark.fetch.throwsFetchFailure |
celeborn.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | false | IdentityProvider class name. Default class is org.apache.celeborn.common.identity.DefaultIdentityProvider . Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. |
0.6.0 | celeborn.quota.identity.provider |
celeborn.identity.user-specific.tenant | default | false | Tenant id if celeborn.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.6.0 | celeborn.quota.identity.user-specific.tenant |
celeborn.identity.user-specific.userName | default | false | User name if celeborn.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.6.0 | celeborn.quota.identity.user-specific.userName |
celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of master nodes for celeborn clients to connect. Client uses resolver provided by celeborn.master.endpoints.resolver to resolve the master endpoints. By default Celeborn uses org.apache.celeborn.common.client.StaticMasterEndpointResolver which take static master endpoints as input. Allowed pattern: <host1>:<port1>[,<host2>:<port2>]* , e.g. clb1:9097,clb2:9098,clb3:9099 . If the port is omitted, 9097 will be used. If the master endpoints are not static then users can pass custom resolver implementation to discover master endpoints actively using celeborn.master.endpoints.resolver. |
0.2.0 | |
celeborn.master.endpoints.resolver | org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | Resolver class that can be used for discovering and updating the master endpoints. This allows users to provide a custom master endpoint resolver implementation. This is useful in environments where the master nodes might change due to scaling operations or infrastructure updates. Clients need to ensure that provided resolver class should be present in the classpath. | 0.6.0 | |
celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
celeborn.storage.availableTypes | HDD | false | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as identical. | 0.3.0 | celeborn.storage.activeTypes |
celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | |
celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
celeborn.tags.tagsExpr | true | Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example, prod,high-io filters workers that have both the prod and high-io tags. |
0.6.0 |
Quota
Key | Default | isDynamic | Description | Since | Deprecated |
---|---|---|---|---|---|
celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | |
celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. | 0.5.0 |
Network
The various transport modules which can be configured are:
Module | Parent Module | Description |
---|---|---|
rpc_app | rpc | Configure control plane RPC environment used by Celeborn within the application. For backward compatibility, supports fallback to rpc parent module for missing configuration.Note, this is for RPC environment - see below for other transport modules |
rpc_service | rpc | Configure control plane RPC environment when communicating with Celeborn service hosts. This includes all RPC communication from application to Celeborn Master/Workers, as well as between Celeborn masters/workers themselves. For backward compatibility, supports fallback to rpc parent module for missing configuration.As with rpc_app , this is only for RPC environment see below for other transport modules. |
rpc | - | Fallback parent transport module for rpc_app and rpc_service . It is advisible to use the specific transport modules while configuring - rpc exists primarily for backward compatibility |
push | - | Configure transport module for handling data push at Celeborn workers |
fetch | - | Configure transport module for handling data fetch at Celeborn workers |
data | - | Configure transport module for handling data push and fetch at Celeborn apps |
replicate | - | Configure transport module for handling data replication between Celeborn workers |
Some network configurations might apply in specific scenarios, for example push
module for io.maxRetries
and io.retryWait
in flink client. Please see the full list below for details.
Key | Default | isDynamic | Description | Since | Deprecated |
---|---|---|---|---|---|
celeborn.<module>.fetch.timeoutCheck.interval | 5s | false | Interval for checking fetch data timeout. It only support setting data since it works for shuffle client fetch data. |
0.3.0 | |
celeborn.<module>.fetch.timeoutCheck.threads | 4 | false | Threads num for checking fetch data timeout. It only support setting data since it works for shuffle client fetch data. |
0.3.0 | |
celeborn.<module>.heartbeat.interval | 60s | false | The heartbeat interval between worker and client. If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting data , it works for shuffle client push and fetch data. If setting replicate , it works for replicate client of worker replicating data to peer worker. If you are using the "celeborn.client.heartbeat.interval", please use the new configs for each module according to your needs or replace it with "celeborn.rpc.heartbeat.interval", "celeborn.data.heartbeat.interval" and "celeborn.replicate.heartbeat.interval". |
0.3.0 | celeborn.client.heartbeat.interval |
celeborn.<module>.io.backLog | 0 | false | Requested maximum length of the queue of incoming connections. Default 0 for no backlog. If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting push , it works for worker receiving push data. If setting replicate , it works for replicate server of worker replicating data to peer worker. If setting fetch , it works for worker fetch server. |
||
celeborn.<module>.io.clientThreads | 0 | false | Number of threads used in the client thread pool. Default to 0, which is 2x#cores. If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting data , it works for shuffle client push and fetch data. If setting replicate , it works for replicate client of worker replicating data to peer worker. |
||
celeborn.<module>.io.connectTimeout | <value of celeborn.network.connect.timeout> | false | Socket connect timeout. If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting data , it works for shuffle client push and fetch data. If setting replicate , it works for the replicate client of worker replicating data to peer worker. |
||
celeborn.<module>.io.connectionTimeout | <value of celeborn.network.timeout> | false | Connection active timeout. If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting data , it works for shuffle client push and fetch data. If setting push , it works for worker receiving push data. If setting replicate , it works for replicate server or client of worker replicating data to peer worker. If setting fetch , it works for worker fetch server. |
||
celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | ||
celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting fetch , it works for worker fetch server. |
||
celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting data , it works for shuffle client push and fetch data. If setting replicate , it works for replicate client of worker replicating data to peer worker. If setting push , it works for Flink shuffle client push data. |
||
celeborn.<module>.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | ||
celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting data , it works for shuffle client push and fetch data. If setting replicate , it works for replicate client of worker replicating data to peer worker. |
||
celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting data , it works for shuffle client push and fetch data. If setting push , it works for worker receiving push data. If setting replicate , it works for replicate server or client of worker replicating data to peer worker. If setting fetch , it works for worker fetch server. |
||
celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting data , it works for shuffle client push and fetch data. If setting push , it works for worker receiving push data. If setting replicate , it works for replicate server or client of worker replicating data to peer worker. If setting fetch , it works for worker fetch server. |
0.2.0 | |
celeborn.<module>.io.retryWait | 5s | false | Time that we will wait in order to perform a retry after an IOException. Only relevant if maxIORetries > 0. If setting data , it works for shuffle client push and fetch data. If setting replicate , it works for replicate client of worker replicating data to peer worker. If setting push , it works for Flink shuffle client push data. |
0.2.0 | |
celeborn.<module>.io.saslTimeout | 30s | false | Timeout for a single round trip of auth message exchange, in milliseconds. | 0.5.0 | |
celeborn.<module>.io.sendBuffer | 0b | false | Send buffer size (SO_SNDBUF). If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting data , it works for shuffle client push and fetch data. If setting push , it works for worker receiving push data. If setting replicate , it works for replicate server or client of worker replicating data to peer worker. If setting fetch , it works for worker fetch server. |
0.2.0 | |
celeborn.<module>.io.serverThreads | 0 | false | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. If setting rpc_app , works for shuffle client. If setting rpc_service , works for master or worker. If setting push , it works for worker receiving push data. If setting replicate , it works for replicate server of worker replicating data to peer worker. If setting fetch , it works for worker fetch server. |
||
celeborn.<module>.push.timeoutCheck.interval | 5s | false | Interval for checking push data timeout. If setting data , it works for shuffle client push data. If setting push , it works for Flink shuffle client push data. If setting replicate , it works for replicate client of worker replicating data to peer worker. |
0.3.0 | |
celeborn.<module>.push.timeoutCheck.threads | 4 | false | Threads num for checking push data timeout. If setting data , it works for shuffle client push data. If setting push , it works for Flink shuffle client push data. If setting replicate , it works for replicate client of worker replicating data to peer worker. |
0.3.0 | |
celeborn.<role>.rpc.dispatcher.threads | <value of celeborn.rpc.dispatcher.threads> | false | Threads number of message dispatcher event loop for roles | ||
celeborn.io.maxDefaultNettyThreads | 64 | false | Max default netty threads | 0.3.2 | |
celeborn.network.advertise.preferIpAddress | <value of celeborn.network.bind.preferIpAddress> | false | When true , prefer to use IP address, otherwise FQDN for advertise address. |
0.6.0 | |
celeborn.network.bind.preferIpAddress | true | false | When true , prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind. |
0.3.0 | |
celeborn.network.bind.wildcardAddress | false | false | When true , the bind address will be set to a wildcard address, while the advertise address will remain as whatever is set by celeborn.network.advertise.preferIpAddress . The wildcard address is a special local IP address, and usually refers to 'any' and can only be used for bind operations. In the case of IPv4, this is 0.0.0.0 and in the case of IPv6 this is ::0. This is helpful in dual-stack environments, where the service must listen to both IPv4 and IPv6 clients. |
0.6.0 | |
celeborn.network.connect.timeout | 10s | false | Default socket connect timeout. | 0.2.0 | |
celeborn.network.memory.allocator.numArenas | <undefined> | false | Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 | |
celeborn.network.memory.allocator.verbose.metric | false | false | Whether to enable verbose metric for pooled allocator. | 0.3.0 | |
celeborn.network.timeout | 240s | false | Default timeout for network operations. | 0.2.0 | |
celeborn.port.maxRetries | 1 | false | When port is occupied, we will retry for max retry times. | 0.2.0 | |
celeborn.rpc.askTimeout | 60s | false | Timeout for RPC ask operations. It's recommended to set at least 240s when HDFS is enabled in celeborn.storage.availableTypes |
0.2.0 | |
celeborn.rpc.connect.threads | 64 | false | 0.2.0 | ||
celeborn.rpc.dispatcher.threads | 0 | false | Threads number of message dispatcher event loop. Default to 0, which is availableCore. | 0.3.0 | celeborn.rpc.dispatcher.numThreads |
celeborn.rpc.dump.interval | 60s | false | min interval (ms) for RPC framework to dump performance summary | 0.6.0 | |
celeborn.rpc.inbox.capacity | 0 | false | Specifies size of the in memory bounded capacity. | 0.5.0 | |
celeborn.rpc.io.threads | <undefined> | false | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | |
celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup operations. | 0.2.0 | |
celeborn.rpc.slow.interval | <undefined> | false | min interval (ms) for RPC framework to log slow RPC | 0.6.0 | |
celeborn.rpc.slow.threshold | 1s | false | threshold for RPC framework to log slow RPC | 0.6.0 | |
celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | false | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see celeborn.<module>.io.maxRetries and celeborn.<module>.io.retryWait ), if those limits are reached the task will fail with fetch failure. |
0.2.0 | |
celeborn.ssl.<module>.enabled | false | false | Enables SSL for securing wire traffic. | 0.5.0 | |
celeborn.ssl.<module>.enabledAlgorithms | <undefined> | false | A comma-separated list of ciphers. The specified ciphers must be supported by JVM. The reference list of protocols can be found in the "JSSE Cipher Suite Names" section of the Java security guide. The list for Java 11, for example, can be found at this page Note: If not set, the default cipher suite for the JRE will be used |
0.5.0 | |
celeborn.ssl.<module>.keyStore | <undefined> | false | Path to the key store file. The path can be absolute or relative to the directory in which the process is started. |
0.5.0 | |
celeborn.ssl.<module>.keyStorePassword | <undefined> | false | Password to the key store. | 0.5.0 | |
celeborn.ssl.<module>.protocol | TLSv1.2 | false | TLS protocol to use. The protocol must be supported by JVM. The reference list of protocols can be found in the "Additional JSSE Standard Names" section of the Java security guide. For Java 11, for example, the list can be found here |
0.5.0 | |
celeborn.ssl.<module>.trustStore | <undefined> | false | Path to the trust store file. The path can be absolute or relative to the directory in which the process is started. |
0.5.0 | |
celeborn.ssl.<module>.trustStorePassword | <undefined> | false | Password for the trust store. | 0.5.0 | |
celeborn.ssl.<module>.trustStoreReloadIntervalMs | 10s | false | The interval at which the trust store should be reloaded (in milliseconds), when enabled. This setting is mostly only useful for server components, not applications. | 0.5.0 | |
celeborn.ssl.<module>.trustStoreReloadingEnabled | false | false | Whether the trust store should be reloaded periodically. This setting is mostly only useful for Celeborn services (masters, workers), and not applications. |
0.5.0 |
Columnar Shuffle
Key | Default | isDynamic | Description | Since | Deprecated |
---|---|---|---|---|---|
celeborn.columnarShuffle.batch.size | 10000 | false | Vector batch size for columnar shuffle. | 0.3.0 | celeborn.columnar.shuffle.batch.size |
celeborn.columnarShuffle.codegen.enabled | false | false | Whether to use codegen for columnar-based shuffle. | 0.3.0 | celeborn.columnar.shuffle.codegen.enabled |
celeborn.columnarShuffle.enabled | false | false | Whether to enable columnar-based shuffle. | 0.2.0 | celeborn.columnar.shuffle.enabled |
celeborn.columnarShuffle.encoding.dictionary.enabled | false | false | Whether to use dictionary encoding for columnar-based shuffle data. | 0.3.0 | celeborn.columnar.shuffle.encoding.dictionary.enabled |
celeborn.columnarShuffle.encoding.dictionary.maxFactor | 0.3 | false | Max factor for dictionary size. The max dictionary size is min(32.0 KiB, celeborn.columnarShuffle.batch.size * celeborn.columnar.shuffle.encoding.dictionary.maxFactor) . |
0.3.0 | celeborn.columnar.shuffle.encoding.dictionary.maxFactor |
celeborn.columnarShuffle.offHeap.enabled | false | false | Whether to use off heap columnar vector. | 0.3.0 | celeborn.columnar.offHeap.enabled |
Metrics
Below metrics configuration both work for master and worker.
Key | Default | isDynamic | Description | Since | Deprecated |
---|---|---|---|---|---|
celeborn.metrics.capacity | 4096 | false | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 | |
celeborn.metrics.collectPerfCritical.enabled | false | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 | |
celeborn.metrics.conf | <undefined> | false | Custom metrics configuration file path. Default use metrics.properties in classpath. |
0.3.0 | |
celeborn.metrics.enabled | true | false | When true, enable metrics system. | 0.2.0 | |
celeborn.metrics.extraLabels | false | If default metric labels are not enough, extra metric labels can be customized. Labels' pattern is: <label1_key>=<label1_value>[,<label2_key>=<label2_value>]* ; e.g. env=prod,version=1 |
0.3.0 | ||
celeborn.metrics.json.path | /metrics/json | false | URI context path of json metrics HTTP server. | 0.4.0 | |
celeborn.metrics.json.pretty.enabled | true | false | When true, view metrics in json pretty format | 0.4.0 | |
celeborn.metrics.prometheus.path | /metrics/prometheus | false | URI context path of prometheus metrics HTTP server. | 0.4.0 | |
celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | |
celeborn.metrics.timer.slidingWindow.size | 4096 | false | The sliding window size of timer metric. | 0.2.0 | |
celeborn.metrics.worker.app.topResourceConsumption.count | 50 | false | Size for top items about top resource consumption applications list of worker. The top resource consumption is determined by sum of diskBytesWritten and hdfsBytesWritten. The top resource consumption count prevents the total number of metrics from exceeding the metrics capacity. | 0.6.0 | |
celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold | 10 | false | Force append worker pause spent time even if worker still in pause serving state. Help user can find worker pause spent time increase, when worker always been pause state. |
metrics.properties
*.sink.csv.class=org.apache.celeborn.common.metrics.sink.CsvSink
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
Environment Variables
Recommend configuring in conf/celeborn-env.sh
.
Key | Default | Description |
---|---|---|
CELEBORN_HOME |
$(cd "`dirname "$0"`"/..; pwd) |
|
CELEBORN_CONF_DIR |
${CELEBORN_CONF_DIR:-"${CELEBORN_HOME}/conf"} |
|
CELEBORN_MASTER_MEMORY |
1 GB | |
CELEBORN_WORKER_MEMORY |
1 GB | |
CELEBORN_WORKER_OFFHEAP_MEMORY |
1 GB | |
CELEBORN_MASTER_JAVA_OPTS |
||
CELEBORN_WORKER_JAVA_OPTS |
||
CELEBORN_PID_DIR |
${CELEBORN_HOME}/pids |
|
CELEBORN_LOG_DIR |
${CELEBORN_HOME}/logs |
|
CELEBORN_SSH_OPTS |
-o StrictHostKeyChecking=no |
ssh opts for start-all and stop-all operations |
CELEBORN_SLEEP |
Waiting time for start-all and stop-all operations |
|
CELEBORN_PREFER_JEMALLOC |
set true to enable jemalloc memory allocator |
|
CELEBORN_JEMALLOC_PATH |
jemalloc library path | |
CELEBORN_NO_DAEMONIZE |
set true to run the proposed command in the foreground |
Tuning
Assume we have a cluster described as below: 5 Celeborn Workers with 20 GB off-heap memory and 10 disks. As we need to reserve 20% off-heap memory for netty, so we could assume 16 GB off-heap memory can be used for flush buffers.
If spark.celeborn.client.push.buffer.max.size
is 64 KB, we can have in-flight requests up to 1310720.
If you have 8192 mapper tasks, you could set spark.celeborn.client.push.maxReqsInFlight=160
to gain performance improvements.
If celeborn.worker.flusher.buffer.size
is 256 KB, we can have total slots up to 327680 slots.
Rack Awareness
Celeborn can be rack-aware by setting celeborn.client.reserveSlots.rackware.enabled
to true
on client side.
Shuffle partition block replica placement will use rack awareness for fault tolerance by placing one shuffle partition replica
on a different rack. This provides data availability in the event of a network switch failure or partition within the cluster.
Celeborn master daemons obtain the rack id of the cluster workers by invoking either an external script or Java class as specified by configuration files.
Using either the Java class or external script for topology, output must adhere to the java org.apache.hadoop.net.DNSToSwitchMapping
interface.
The interface expects a one-to-one correspondence to be maintained and the topology information in the format of /myrack/myhost
,
where /
is the topology delimiter, myrack
is the rack identifier, and myhost
is the individual host.
Assuming a single /24
subnet per rack, one could use the format of /192.168.100.0/192.168.100.5
as a unique rack-host topology mapping.
To use the Java class for topology mapping, the class name is specified by the celeborn.hadoop.net.topology.node.switch.mapping.impl
parameter in the master configuration file.
An example, NetworkTopology.java
, is included with the Celeborn distribution and can be customized by the Celeborn administrator.
Using a Java class instead of an external script has a performance benefit in that Celeborn doesn't need to fork an external process when a new worker node registers itself.
If implementing an external script, it will be specified with the celeborn.hadoop.net.topology.script.file.name
parameter in the master side configuration files.
Unlike the Java class, the external topology script is not included with the Celeborn distribution and is provided by the administrator.
Celeborn will send multiple IP addresses to ARGV when forking the topology script. The number of IP addresses sent to the topology script
is controlled with celeborn.hadoop.net.topology.script.number.args
and defaults to 100.
If celeborn.hadoop.net.topology.script.number.args
was changed to 1, a topology script would get forked for each IP submitted by workers.
If celeborn.hadoop.net.topology.script.file.name
or celeborn.hadoop.net.topology.node.switch.mapping.impl
is not set, the rack id /default-rack
is returned for any passed IP address.
While this behavior appears desirable, it can cause issues with shuffle partition block replication as default behavior
is to write one replicated block off rack and is unable to do so as there is only a single rack named /default-rack
.
Example can refer to Hadoop Rack Awareness since Celeborn use hadoop's code about rack-aware.
Worker Recover Status After Restart
ShuffleClient
records the shuffle partition location's host, service port, and filename,
to support workers recovering reading existing shuffle data after worker restart,
during worker shutdown, workers should store the meta about reading shuffle partition files in RocksDB or LevelDB(deprecated),
and restore the meta after restarting workers, also workers should keep a stable service port to support
ShuffleClient
retry reading data. Users should set celeborn.worker.graceful.shutdown.enabled
to true
and
set below service port with stable port to support worker recover status.
celeborn.worker.rpc.port
celeborn.worker.fetch.port
celeborn.worker.push.port
celeborn.worker.replicate.port