JMX Scraper: YAML file and integration test hadoop (#1675)

Co-authored-by: Magda Wojtowicz <mwojtowicz@splunk.com>
This commit is contained in:
Robert Niedziela 2025-02-04 00:38:39 +01:00 committed by GitHub
parent a2bb444d83
commit d2a97f4f48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 291 additions and 18 deletions

View File

@ -46,63 +46,63 @@ class HadoopIntegrationTest extends AbstractIntegrationTest {
metric,
"hadoop.name_node.capacity.usage",
"The current used capacity across all data nodes reporting to the name node.",
"by",
"By",
attrs -> attrs.contains(entry("node_name", "test-host"))),
metric ->
assertSumWithAttributes(
metric,
"hadoop.name_node.capacity.limit",
"The total capacity allotted to data nodes reporting to the name node.",
"by",
"By",
attrs -> attrs.containsOnly(entry("node_name", "test-host"))),
metric ->
assertSumWithAttributes(
metric,
"hadoop.name_node.block.count",
"The total number of blocks on the name node.",
"{blocks}",
"{block}",
attrs -> attrs.containsOnly(entry("node_name", "test-host"))),
metric ->
assertSumWithAttributes(
metric,
"hadoop.name_node.block.missing",
"The number of blocks reported as missing to the name node.",
"{blocks}",
"{block}",
attrs -> attrs.containsOnly(entry("node_name", "test-host"))),
metric ->
assertSumWithAttributes(
metric,
"hadoop.name_node.block.corrupt",
"The number of blocks reported as corrupt to the name node.",
"{blocks}",
"{block}",
attrs -> attrs.containsOnly(entry("node_name", "test-host"))),
metric ->
assertSumWithAttributes(
metric,
"hadoop.name_node.volume.failed",
"The number of failed volumes reported to the name node.",
"{volumes}",
"{volume}",
attrs -> attrs.containsOnly(entry("node_name", "test-host"))),
metric ->
assertSumWithAttributes(
metric,
"hadoop.name_node.file.count",
"The total number of files being tracked by the name node.",
"{files}",
"{file}",
attrs -> attrs.containsOnly(entry("node_name", "test-host"))),
metric ->
assertSumWithAttributes(
metric,
"hadoop.name_node.file.load",
"The current number of concurrent file accesses.",
"{operations}",
"{operation}",
attrs -> attrs.containsOnly(entry("node_name", "test-host"))),
metric ->
assertSumWithAttributes(
metric,
"hadoop.name_node.data_node.count",
"The number of data nodes reporting to the name node.",
"{nodes}",
"{node}",
attrs ->
attrs.containsOnly(entry("node_name", "test-host"), entry("state", "live")),
attrs ->

View File

@ -15,31 +15,31 @@
*/
def beanHadoopNameNodeFS = otel.mbean("Hadoop:service=NameNode,name=FSNamesystem")
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.capacity.usage", "The current used capacity across all data nodes reporting to the name node.", "by",
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.capacity.usage", "The current used capacity across all data nodes reporting to the name node.", "By",
["node_name" : { mbean -> mbean.getProperty("tag.Hostname") }],
"CapacityUsed", otel.&longUpDownCounterCallback)
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.capacity.limit", "The total capacity allotted to data nodes reporting to the name node.", "by",
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.capacity.limit", "The total capacity allotted to data nodes reporting to the name node.", "By",
["node_name" : { mbean -> mbean.getProperty("tag.Hostname") }],
"CapacityTotal", otel.&longUpDownCounterCallback)
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.block.count", "The total number of blocks on the name node.", "{blocks}",
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.block.count", "The total number of blocks on the name node.", "{block}",
["node_name" : { mbean -> mbean.getProperty("tag.Hostname") }],
"BlocksTotal", otel.&longUpDownCounterCallback)
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.block.missing", "The number of blocks reported as missing to the name node.", "{blocks}",
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.block.missing", "The number of blocks reported as missing to the name node.", "{block}",
["node_name" : { mbean -> mbean.getProperty("tag.Hostname") }],
"MissingBlocks", otel.&longUpDownCounterCallback)
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.block.corrupt", "The number of blocks reported as corrupt to the name node.", "{blocks}",
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.block.corrupt", "The number of blocks reported as corrupt to the name node.", "{block}",
["node_name" : { mbean -> mbean.getProperty("tag.Hostname") }],
"CorruptBlocks", otel.&longUpDownCounterCallback)
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.volume.failed", "The number of failed volumes reported to the name node.", "{volumes}",
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.volume.failed", "The number of failed volumes reported to the name node.", "{volume}",
["node_name" : { mbean -> mbean.getProperty("tag.Hostname") }],
"VolumeFailuresTotal", otel.&longUpDownCounterCallback)
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.file.count", "The total number of files being tracked by the name node.", "{files}",
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.file.count", "The total number of files being tracked by the name node.", "{file}",
["node_name" : { mbean -> mbean.getProperty("tag.Hostname") }],
"FilesTotal", otel.&longUpDownCounterCallback)
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.file.load", "The current number of concurrent file accesses.", "{operations}",
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.file.load", "The current number of concurrent file accesses.", "{operation}",
["node_name" : { mbean -> mbean.getProperty("tag.Hostname") }],
"TotalLoad", otel.&longUpDownCounterCallback)
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.data_node.count", "The number of data nodes reporting to the name node.", "{nodes}",
otel.instrument(beanHadoopNameNodeFS, "hadoop.name_node.data_node.count", "The number of data nodes reporting to the name node.", "{node}",
["node_name" : { mbean -> mbean.getProperty("tag.Hostname") }],
["NumLiveDataNodes":["state":{"live"}], "NumDeadDataNodes": ["state":{"dead"}]],
otel.&longUpDownCounterCallback)

View File

@ -0,0 +1,122 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxscraper.target_systems;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup;
import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer;
import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcher;
import java.nio.file.Path;
import java.time.Duration;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.MountableFile;
public class HadoopIntegrationTest extends TargetSystemIntegrationTest {
private static final int HADOOP_PORT = 50070;
@Override
protected GenericContainer<?> createTargetContainer(int jmxPort) {
return new GenericContainer<>("bmedora/hadoop:2.9-base")
.withCopyFileToContainer(
MountableFile.forClasspathResource("hadoop-env.sh", 0400),
"/hadoop/etc/hadoop/hadoop-env.sh")
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)))
.withExposedPorts(HADOOP_PORT, jmxPort)
.withCreateContainerCmdModifier(cmd -> cmd.withHostName("test-host"))
.waitingFor(Wait.forListeningPorts(HADOOP_PORT, jmxPort));
}
@Override
protected JmxScraperContainer customizeScraperContainer(
JmxScraperContainer scraper, GenericContainer<?> target, Path tempDir) {
return scraper.withTargetSystem("hadoop");
}
@Override
protected MetricsVerifier createMetricsVerifier() {
AttributeMatcher nodeNameAttribute = attribute("node_name", "test-host");
return MetricsVerifier.create()
.add(
"hadoop.name_node.capacity.usage",
metric ->
metric
.hasDescription(
"The current used capacity across all data nodes reporting to the name node.")
.hasUnit("By")
.isUpDownCounter()
.hasDataPointsWithOneAttribute(nodeNameAttribute))
.add(
"hadoop.name_node.capacity.limit",
metric ->
metric
.hasDescription(
"The total capacity allotted to data nodes reporting to the name node.")
.hasUnit("By")
.isUpDownCounter()
.hasDataPointsWithOneAttribute(nodeNameAttribute))
.add(
"hadoop.name_node.block.count",
metric ->
metric
.hasDescription("The total number of blocks on the name node.")
.hasUnit("{block}")
.isUpDownCounter()
.hasDataPointsWithOneAttribute(nodeNameAttribute))
.add(
"hadoop.name_node.block.missing",
metric ->
metric
.hasDescription("The number of blocks reported as missing to the name node.")
.hasUnit("{block}")
.isUpDownCounter()
.hasDataPointsWithOneAttribute(nodeNameAttribute))
.add(
"hadoop.name_node.block.corrupt",
metric ->
metric
.hasDescription("The number of blocks reported as corrupt to the name node.")
.hasUnit("{block}")
.isUpDownCounter()
.hasDataPointsWithOneAttribute(nodeNameAttribute))
.add(
"hadoop.name_node.volume.failed",
metric ->
metric
.hasDescription("The number of failed volumes reported to the name node.")
.hasUnit("{volume}")
.isUpDownCounter()
.hasDataPointsWithOneAttribute(nodeNameAttribute))
.add(
"hadoop.name_node.file.count",
metric ->
metric
.hasDescription("The total number of files being tracked by the name node.")
.hasUnit("{file}")
.isUpDownCounter()
.hasDataPointsWithOneAttribute(nodeNameAttribute))
.add(
"hadoop.name_node.file.load",
metric ->
metric
.hasDescription("The current number of concurrent file accesses.")
.hasUnit("{operation}")
.isUpDownCounter()
.hasDataPointsWithOneAttribute(nodeNameAttribute))
.add(
"hadoop.name_node.data_node.count",
metric ->
metric
.hasDescription("The number of data nodes reporting to the name node.")
.hasUnit("{node}")
.isUpDownCounter()
.hasDataPointsWithAttributes(
attributeGroup(nodeNameAttribute, attribute("state", "live")),
attributeGroup(nodeNameAttribute, attribute("state", "dead"))));
}
}

View File

@ -0,0 +1,99 @@
#!/bin/bash
# Set Hadoop-specific environment variables here.
# The only required environment variable is JAVA_HOME. All others are
# optional. When running a distributed configuration it is best to
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.
# The java implementation to use.
export JAVA_HOME=${JAVA_HOME}
# The jsvc implementation to use. Jsvc is required to run secure datanodes
# that bind to privileged ports to provide authentication of data transfer
# protocol. Jsvc is not required if SASL is configured for authentication of
# data transfer protocol using non-privileged ports.
#export JSVC_HOME=${JSVC_HOME}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/etc/hadoop"}
# Extra Java CLASSPATH elements. Automatically insert capacity-scheduler.
for f in "$HADOOP_HOME"/contrib/capacity-scheduler/*.jar; do
if [ "$HADOOP_CLASSPATH" ]; then
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f
else
export HADOOP_CLASSPATH=$f
fi
done
# The maximum amount of heap to use, in MB. Default is 1000.
#export HADOOP_HEAPSIZE=
#export HADOOP_NAMENODE_INIT_HEAPSIZE=""
# Enable extra debugging of Hadoop's JAAS binding, used to set up
# Kerberos security.
# export HADOOP_JAAS_DEBUG=true
# Extra Java runtime options. Empty by default.
# For Kerberos debugging, an extended option set logs more invormation
# export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug"
export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"
# Command specific options appended to HADOOP_OPTS when specified
export HADOOP_NAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_NAMENODE_OPTS"
export HADOOP_NAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_NAMENODE_OPTS"
export HADOOP_NAMENODE_OPTS="$HADOOP_NAMENODE_OPTS -Dcom.sun.management.jmxremote.authenticate=false"
export HADOOP_NAMENODE_OPTS="$HADOOP_NAMENODE_OPTS -Dcom.sun.management.jmxremote.ssl=false"
export HADOOP_NAMENODE_OPTS="$HADOOP_NAMENODE_OPTS -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.rmi.port=9999"
export HADOOP_DATANODE_OPTS="-Dhadoop.security.logger=ERROR,RFAS $HADOOP_DATANODE_OPTS"
export HADOOP_SECONDARYNAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_SECONDARYNAMENODE_OPTS"
export HADOOP_NFS3_OPTS="$HADOOP_NFS3_OPTS"
export HADOOP_PORTMAP_OPTS="-Xmx512m $HADOOP_PORTMAP_OPTS"
# The following applies to multiple commands (fs, dfs, fsck, distcp etc)
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS"
# set heap args when HADOOP_HEAPSIZE is empty
if [ "$HADOOP_HEAPSIZE" = "" ]; then
export HADOOP_CLIENT_OPTS="-Xmx512m $HADOOP_CLIENT_OPTS"
fi
#HADOOP_JAVA_PLATFORM_OPTS="-XX:-UsePerfData $HADOOP_JAVA_PLATFORM_OPTS"
# On secure datanodes, user to run the datanode as after dropping privileges.
# This **MUST** be uncommented to enable secure HDFS if using privileged ports
# to provide authentication of data transfer protocol. This **MUST NOT** be
# defined if SASL is configured for authentication of data transfer protocol
# using non-privileged ports.
export HADOOP_SECURE_DN_USER=${HADOOP_SECURE_DN_USER}
# Where log files are stored. $HADOOP_HOME/logs by default.
#export HADOOP_LOG_DIR=${HADOOP_LOG_DIR}/$USER
# Where log files are stored in the secure data environment.
#export HADOOP_SECURE_DN_LOG_DIR=${HADOOP_LOG_DIR}/${HADOOP_HDFS_USER}
###
# HDFS Mover specific parameters
###
# Specify the JVM options to be used when starting the HDFS Mover.
# These options will be appended to the options specified as HADOOP_OPTS
# and therefore may override any similar flags set in HADOOP_OPTS
#
# export HADOOP_MOVER_OPTS=""
###
# Advanced Users Only!
###
# The directory where pid files are stored. /tmp by default.
# NOTE: this should be set to a directory that can only be written to by
# the user that will run the hadoop daemons. Otherwise there is the
# potential for a symlink attack.
export HADOOP_PID_DIR=${HADOOP_PID_DIR}
export HADOOP_SECURE_DN_PID_DIR=${HADOOP_PID_DIR}
# A string representing this instance of hadoop. $USER by default.
export HADOOP_IDENT_STRING=$USER

View File

@ -0,0 +1,52 @@
---
rules:
- bean: Hadoop:service=NameNode,name=FSNamesystem
prefix: hadoop.name_node.
type: updowncounter
metricAttribute:
node_name: beanattr(tag\.Hostname)
mapping:
CapacityUsed:
metric: capacity.usage
unit: By
desc: The current used capacity across all data nodes reporting to the name node.
CapacityTotal:
metric: capacity.limit
unit: By
desc: The total capacity allotted to data nodes reporting to the name node.
BlocksTotal:
metric: block.count
unit: "{block}"
desc: The total number of blocks on the name node.
MissingBlocks:
metric: block.missing
unit: "{block}"
desc: The number of blocks reported as missing to the name node.
CorruptBlocks:
metric: block.corrupt
unit: "{block}"
desc: The number of blocks reported as corrupt to the name node.
VolumeFailuresTotal:
metric: volume.failed
unit: "{volume}"
desc: The number of failed volumes reported to the name node.
FilesTotal:
metric: file.count
unit: "{file}"
desc: The total number of files being tracked by the name node.
TotalLoad:
metric: file.load
unit: "{operation}"
desc: The current number of concurrent file accesses.
NumLiveDataNodes:
metric: &metric data_node.count
unit: &unit "{node}"
desc: &desc The number of data nodes reporting to the name node.
metricAttribute:
state: const(live)
NumDeadDataNodes:
metric: *metric
unit: *unit
desc: *desc
metricAttribute:
state: const(dead)