Skip to content

Monitoring

There are two ways to monitor Celeborn cluster: Prometheus metrics and REST API.

Metrics

Celeborn has a configurable metrics system based on the Dropwizard Metrics Library. This allows users to report Celeborn metrics to a variety of sinks including HTTP, JMX, CSV files and prometheus servlet. The metrics are generated by sources embedded in the Celeborn code base. They provide instrumentation for specific activities and Celeborn components. The metrics system is configured via a configuration file that Celeborn expects to be present at $CELEBORN_HOME/conf/metrics.properties. A custom file location can be specified via the celeborn.metrics.conf configuration property. Instead of using the configuration file, a set of configuration parameters with prefix celeborn.metrics.conf. can be used.

Celeborn's metrics are divided into two instances corresponding to Celeborn components. The following instances are currently supported:

  • master: The Celeborn cluster master process.
  • worker: The Celeborn cluster worker process.

Each instance can report to zero or more sinks. Sinks are contained in the org.apache.celeborn.common.metrics.sink package:

  • CSVSink: Exports metrics data to CSV files at regular intervals.
  • PrometheusServlet: Adds a servlet within the existing Celeborn REST API to serve metrics data in Prometheus format.
  • GraphiteSink: Sends metrics to a Graphite node.

The syntax of the metrics configuration file and the parameters available for each sink are defined in an example configuration file, $CELEBORN_HOME/conf/metrics.properties.template.

When using Celeborn configuration parameters instead of the metrics configuration file, the relevant parameter names are composed by the prefix celeborn.metrics.conf. followed by the configuration details, i.e. the parameters take the following form: celeborn.metrics.conf.[instance|*].sink.[sink_name].[parameter_name]. This example shows a list of Celeborn configuration parameters for a CSV sink:

"celeborn.metrics.conf.*.sink.csv.class"="org.apache.celeborn.common.metrics.sink.CsvSink"
"celeborn.metrics.conf.*.sink.csv.period"="1"
"celeborn.metrics.conf.*.sink.csv.unit"=minutes
"celeborn.metrics.conf.*.sink.csv.directory"=/tmp/

Default values of the Celeborn metrics configuration are as follows:

*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet

Additional sources can be configured using the metrics configuration file or the configuration parameter celeborn.metrics.conf.[component_name].source.jvm.class=[source_name]. At present the no source is the available optional source. For example the following configuration parameter activates the Example source: "celeborn.metrics.conf.*.source.jvm.class"="org.apache.celeborn.common.metrics.source.ExampleSource"

Available metrics providers

Metrics used by Celeborn are of multiple types: gauge, counter, histogram, meter and timer, see Dropwizard library documentation for details. The following list of components and metrics reports the name and some details about the available metrics, grouped per component instance and source namespace. The most common time of metrics used in Celeborn instrumentation are gauges and counters. Counters can be recognized as they have the .count suffix. Timers, meters and histograms are annotated in the list, the rest of the list elements are metrics of type gauge. The large majority of metrics are active as soon as their parent component instance is configured, some metrics require also to be enabled via an additional configuration parameter, the details are reported in the list.

Master

These metrics are exposed by Celeborn master.

  • namespace=master

    • RegisteredShuffleCount
    • DeviceCelebornFreeBytes
    • DeviceCelebornTotalBytes
    • RunningApplicationCount
    • ActiveShuffleSize
      • The active shuffle size of workers.
    • ActiveShuffleFileCount
      • The active shuffle file count of workers.
    • WorkerCount
    • LostWorkerCount
    • ExcludedWorkerCount
    • ShutdownWorkerCount
    • IsActiveMaster
    • PartitionSize
      • The size of estimated shuffle partition.
    • OfferSlotsTime
      • The time for masters to handle RequestSlots request when registering shuffle.
  • namespace=CPU

    • JVMCPUTime
  • namespace=system

    • LastMinuteSystemLoad
      • The average system load for the last minute.
    • AvailableProcessors
  • namespace=JVM

  • namespace=ResourceConsumption

    • notes:
      • This metrics data is generated for each user and they are identified using a metric tag.
      • This metrics also include subResourceConsumptions generated for each application of user and they are identified using applicationId tag.
    • diskFileCount
    • diskBytesWritten
    • hdfsFileCount
    • hdfsBytesWritten
  • namespace=ThreadPool

    • notes:
      • This metrics data is generated for each thread pool and they are identified using a metric tag by thread pool name.
    • active_thread_count
    • pending_task_count
    • pool_size
    • core_pool_size
    • maximum_pool_size
    • largest_pool_size
    • is_terminating
    • is_terminated
    • is_shutdown
    • thread_count
    • thread_is_terminated_count
    • thread_is_shutdown_count

Worker

These metrics are exposed by Celeborn worker.

  • namespace=worker

    Metric Name Description
    RegisteredShuffleCount The count of registered shuffle.
    RunningApplicationCount The count of running applications.
    ActiveShuffleSize The active shuffle size of a worker including master replica and slave replica.
    ActiveShuffleFileCount The active shuffle file count of a worker including master replica and slave replica.
    OpenStreamTime The time for a worker to process openStream RPC and return StreamHandle.
    FetchChunkTime The time for a worker to fetch a chunk which is 8MB by default from a reduced partition.
    ActiveChunkStreamCount Active stream count for reduce partition reading streams.
    OpenStreamSuccessCount The count of opening stream succeed in current worker.
    OpenStreamFailCount The count of opening stream failed in current worker.
    FetchChunkSuccessCount The count of fetching chunk succeed in current worker.
    FetchChunkFailCount The count of fetching chunk failed in current worker.
    PrimaryPushDataTime The time for a worker to handle a pushData RPC sent from a celeborn client.
    ReplicaPushDataTime The time for a worker to handle a pushData RPC sent from a celeborn worker by replicating.
    WriteDataHardSplitCount The count of writing PushData or PushMergedData to HARD_SPLIT partition in current worker.
    WriteDataSuccessCount The count of writing PushData or PushMergedData succeed in current worker.
    WriteDataFailCount The count of writing PushData or PushMergedData failed in current worker.
    ReplicateDataFailCount The count of replicating PushData or PushMergedData failed in current worker.
    ReplicateDataWriteFailCount The count of replicating PushData or PushMergedData failed caused by write failure in peer worker.
    ReplicateDataCreateConnectionFailCount The count of replicating PushData or PushMergedData failed caused by creating connection failed in peer worker.
    ReplicateDataConnectionExceptionCount The count of replicating PushData or PushMergedData failed caused by connection exception in peer worker.
    ReplicateDataFailNonCriticalCauseCount The count of replicating PushData or PushMergedData failed caused by non-critical exception in peer worker.
    ReplicateDataTimeoutCount The count of replicating PushData or PushMergedData failed caused by push timeout in peer worker.
    PushDataHandshakeFailCount The count of PushDataHandshake failed in current worker.
    RegionStartFailCount The count of RegionStart failed in current worker.
    RegionFinishFailCount The count of RegionFinish failed in current worker.
    PrimaryPushDataHandshakeTime PrimaryPushDataHandshake means handle PushData of primary partition location.
    ReplicaPushDataHandshakeTime ReplicaPushDataHandshake means handle PushData of replica partition location.
    PrimaryRegionStartTime PrimaryRegionStart means handle RegionStart of primary partition location.
    ReplicaRegionStartTime ReplicaRegionStart means handle RegionStart of replica partition location.
    PrimaryRegionFinishTime PrimaryRegionFinish means handle RegionFinish of primary partition location.
    ReplicaRegionFinishTime ReplicaRegionFinish means handle RegionFinish of replica partition location.
    PausePushDataTime The time for a worker to stop receiving pushData from clients because of back pressure.
    PausePushDataAndReplicateTime The time for a worker to stop receiving pushData from clients and other workers because of back pressure.
    PausePushData The count for a worker to stop receiving pushData from clients because of back pressure.
    PausePushDataAndReplicate The count for a worker to stop receiving pushData from clients and other workers because of back pressure.
    TakeBufferTime The time for a worker to take out a buffer from a disk flusher.
    FlushDataTime The time for a worker to write a buffer which is 256KB by default to storage.
    CommitFilesTime The time for a worker to flush buffers and close files related to specified shuffle.
    SlotsAllocated Slots allocated in last hour.
    ActiveSlotsCount The number of slots currently being used in a worker.
    ReserveSlotsTime ReserveSlots means acquire a disk buffer and record partition location.
    ActiveConnectionCount The count of active network connection.
    NettyMemory The total amount of off-heap memory used by celeborn worker.
    SortTime The time for a worker to sort a shuffle file.
    SortMemory The memory used by sorting shuffle files.
    SortingFiles The count of sorting shuffle files.
    SortedFiles The count of sorted shuffle files.
    SortedFileSize The count of sorted shuffle files 's total size.
    DiskBuffer The memory occupied by pushData and pushMergedData which should be written to disk.
    BufferStreamReadBuffer The memory used by credit stream read buffer.
    ReadBufferDispatcherRequestsLength The queue size of read buffer allocation requests.
    ReadBufferAllocatedCount Allocated read buffer count.
    ActiveCreditStreamCount Active stream count for map partition reading streams.
    ActiveMapPartitionCount The count of active map partition reading streams.
    CleanTaskQueueSize The count of task for cleaning up expired shuffle keys.
    CleanExpiredShuffleKeysTime The time for a worker to clean up shuffle data of expired shuffle keys.
    DeviceOSFreeBytes The actual usable space of OS for device monitor.
    DeviceOSTotalBytes The total usable space of OS for device monitor.
    DeviceCelebornFreeBytes The actual usable space of Celeborn for device.
    DeviceCelebornTotalBytes The total space of Celeborn for device.
    PotentialConsumeSpeed The speed of potential consumption for congestion control.
    UserProduceSpeed The speed of user production for congestion control.
    WorkerConsumeSpeed The speed of worker consumption for congestion control.
    IsDecommissioningWorker 1 means worker decommissioning, 0 means not decommissioning.
    MemoryStorageFileCount The count of files in Memory Storage of a worker.
    MemoryFileStorageSize The total amount of memory used by Memory Storage.
    EvictedFileCount The count of files evicted from Memory Storage to Disk
    DirectMemoryUsageRatio Ratio of direct memory used and max direct memory.
    push_server_usedHeapMemory
    push_server_usedDirectMemory
    push_server_numAllocations
    push_server_numTinyAllocations
    push_server_numSmallAllocations
    push_server_numNormalAllocations
    push_server_numHugeAllocations
    push_server_numDeallocations
    push_server_numTinyDeallocations
    push_server_numSmallDeallocations
    push_server_numNormalDeallocations
    push_server_numHugeDeallocations
    push_server_numActiveAllocations
    push_server_numActiveTinyAllocations
    push_server_numActiveSmallAllocations
    push_server_numActiveNormalAllocations
    push_server_numActiveHugeAllocations
    push_server_numActiveBytes
    replicate_server_usedHeapMemory
    replicate_server_usedDirectMemory
    replicate_server_numAllocations
    replicate_server_numTinyAllocations
    replicate_server_numSmallAllocations
    replicate_server_numNormalAllocations
    replicate_server_numHugeAllocations
    replicate_server_numDeallocations
    replicate_server_numTinyDeallocations
    replicate_server_numSmallDeallocations
    replicate_server_numNormalDeallocations
    replicate_server_numHugeDeallocations
    replicate_server_numActiveAllocations
    replicate_server_numActiveTinyAllocations
    replicate_server_numActiveSmallAllocations
    replicate_server_numActiveNormalAllocations
    replicate_server_numActiveHugeAllocations
    replicate_server_numActiveBytes
    fetch_server_usedHeapMemory
    fetch_server_usedDirectMemory
    fetch_server_numAllocations
    fetch_server_numTinyAllocations
    fetch_server_numSmallAllocations
    fetch_server_numNormalAllocations
    fetch_server_numHugeAllocations
    fetch_server_numDeallocations
    fetch_server_numTinyDeallocations
    fetch_server_numSmallDeallocations
    fetch_server_numNormalDeallocations
    fetch_server_numHugeDeallocations
    fetch_server_numActiveAllocations
    fetch_server_numActiveTinyAllocations
    fetch_server_numActiveSmallAllocations
    fetch_server_numActiveNormalAllocations
    fetch_server_numActiveHugeAllocations
    fetch_server_numActiveBytes
  • namespace=CPU

    • JVMCPUTime
  • namespace=system

    • LastMinuteSystemLoad
      • Returns the system load average for the last minute.
    • AvailableProcessors
  • namespace=JVM

  • namespace=ResourceConsumption

    • notes:
      • This metrics data is generated for each user and they are identified using a metric tag.
      • This metrics also include subResourceConsumptions generated for each application of user and they are identified using applicationId tag.
    • diskFileCount
    • diskBytesWritten
    • hdfsFileCount
    • hdfsBytesWritten
  • namespace=ThreadPool

    • notes:
      • This metrics data is generated for each thread pool and they are identified using a metric tag by thread pool name.
    • active_thread_count
    • pending_task_count
    • pool_size
    • core_pool_size
    • maximum_pool_size
    • largest_pool_size
    • is_terminating
    • is_terminated
    • is_shutdown

Note:

The Netty DirectArenaMetrics named like push/fetch/replicate_server_numXX are not exposed by default, nor in Grafana dashboard. If there is a need, you can enable celeborn.network.memory.allocator.verbose.metric to expose these metrics.

REST API

In addition to viewing the metrics, Celeborn also support REST API. This gives developers an easy way to create new visualizations and monitoring tools for Celeborn and also easy for users to get the running status of the service. The REST API is available for both master and worker. The endpoints are mounted at host:port. For example, for the master, they would typically be accessible at http://<master-http-host>:<master-http-port><path>, and for the worker, at http://<worker-http-host>:<worker-http-port><path>.

The configuration of <master-http-host>, <master-http-port>, <worker-http-host>, <worker-http--port> as below:

Key Default Description Since
celeborn.master.http.host 0.0.0.0 Master's http host. 0.4.0
celeborn.master.http.port 9098 Master's http port. 0.4.0
celeborn.worker.http.host 0.0.0.0 Worker's http host. 0.4.0
celeborn.worker.http.port 9096 Worker's http port. 0.4.0

Available API providers

API path listed as below:

Master

Path Method Parameters Meaning
/applications GET List all running application's ids of the cluster.
/conf GET List the conf setting of the master.
/excludedWorkers GET List all excluded workers of the master.
/help GET List the available API providers of the master.
/hostnames GET List all running application's LifecycleManager's hostnames of the cluster.
/listDynamicConfigs GET level=${LEVEL} tenant=${TENANT} name=${NAME} List the dynamic configs of the master. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
/listTopDiskUsedApps GET List the top disk usage application ids. It will return the top disk usage application ids for the cluster.
/lostWorkers GET List all lost workers of the master.
/masterGroupInfo GET List master group information of the service. It will list all master's LEADER, FOLLOWER information.
/metrics/prometheus GET List the metrics data in prometheus format of the master. The url path is defined by configure celeborn.metrics.prometheus.path.
/shuffles GET List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.
/shutdownWorkers GET List all shutdown workers of the master.
/threadDump GET List the current thread dump of the master.
/workerEventInfo GET List all worker event information of the master.
/workerInfo GET List worker information of the service. It will list all registered workers' information.
/exclude POST add=${ADD_WORKERS} remove=${REMOVE_WORKERS} Excluded workers of the master add or remove the worker manually given worker id. The parameter add or remove specifies the excluded workers to add or remove, which value is separated by commas.
/sendWorkerEvent POST type=${EVENT_TYPE} workers=${WORKERS} For Master(Leader) can send worker event to manager workers. Legal types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful', 'Recommission', and the parameter workers is separated by commas.

Worker

Path Method Parameters Meaning
/applications GET List all running application's ids of the worker. It only return application ids running in that worker.
/conf GET List the conf setting of the worker.
/help GET List the available API providers of the worker.
/isRegistered GET Show if the worker is registered to the master success.
/isShutdown GET Show if the worker is during the process of shutdown.
/listDynamicConfigs GET level=${LEVEL} tenant=${TENANT} name=${NAME} List the dynamic configs of the worker. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
/listPartitionLocationInfo GET List all the living PartitionLocation information in that worker.
/listTopDiskUsedApps GET List the top disk usage application ids. It only return application ids running in that worker.
/metrics/prometheus GET List the metrics data in prometheus format of the worker. The url path is defined by configure celeborn.metrics.prometheus.path.
/shuffles GET List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker.
/threadDump GET List the current thread dump of the worker.
/unavailablePeers GET List the unavailable peers of the worker, this always means the worker connect to the peer failed.
/workerInfo GET List the worker information of the worker.
/exit POST type=${EXIT_TYPE} Trigger this worker to exit. Legal types are 'Decommission', 'Graceful' and 'Immediately'.