Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ allprojects {
compile 'org.json:json:20140107'
compile 'org.jolokia:jolokia-jvm:1.6.2'
compile 'net.savantly:graphite-client:1.1.0-RELEASE'
compile 'com.timgroup:java-statsd-client:3.0.1'
compile 'com.datadoghq:java-dogstatsd-client:4.0.0'
compile 'com.signalfx.public:signalfx-codahale:0.0.47'
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.xinfra.monitor.common.MbeanAttributeValue;
import com.linkedin.xinfra.monitor.common.Utils;
import com.linkedin.xinfra.monitor.services.configs.StatsdMetricsReporterServiceConfig;
import com.timgroup.statsd.NonBlockingStatsDClientBuilder;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import java.util.List;
Expand Down Expand Up @@ -41,9 +42,12 @@ public StatsdMetricsReporterService(Map<String, Object> props, String name) {
_metricNames = config.getList(StatsdMetricsReporterServiceConfig.REPORT_METRICS_CONFIG);
_reportIntervalSec = config.getInt(StatsdMetricsReporterServiceConfig.REPORT_INTERVAL_SEC_CONFIG);
_executor = Executors.newSingleThreadScheduledExecutor();
_statsdClient = new NonBlockingStatsDClient(config.getString(StatsdMetricsReporterServiceConfig.REPORT_STATSD_PREFIX),
config.getString(StatsdMetricsReporterServiceConfig.REPORT_STATSD_HOST),
config.getInt(StatsdMetricsReporterServiceConfig.REPORT_STATSD_PORT));
_statsdClient = new NonBlockingStatsDClientBuilder()
.prefix(config.getString(StatsdMetricsReporterServiceConfig.REPORT_STATSD_PREFIX))
.hostname(config.getString(StatsdMetricsReporterServiceConfig.REPORT_STATSD_HOST))
.port(config.getInt(StatsdMetricsReporterServiceConfig.REPORT_STATSD_PORT))
.constantTags(config.getString(StatsdMetricsReporterServiceConfig.REPORT_STATSD_TAGS).split(" "))
.build();
}

@Override
Expand Down Expand Up @@ -97,7 +101,7 @@ private void reportMetrics() {

for (MbeanAttributeValue attributeValue: attributeValues) {
final String statsdMetricName = generateStatsdMetricName(attributeValue.mbean(), attributeValue.attribute());
_statsdClient.recordGaugeValue(statsdMetricName, new Double(attributeValue.value()).longValue());
_statsdClient.recordGaugeValue(statsdMetricName, attributeValue.value());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public class StatsdMetricsReporterServiceConfig extends AbstractConfig {
public static final String REPORT_STATSD_PREFIX = "report.statsd.prefix";
public static final String REPORT_STATSD_PREFIX_DOC = "The prefix of statsd metric name that will be generated with metric name to report to graphite server.";

public static final String REPORT_STATSD_TAGS = "report.statsd.tags";
public static final String REPORT_STATSD_TAGS_DOC = "Tags to apply to metrics reported to Datadog server.";

static {
CONFIG = new ConfigDef().define(REPORT_METRICS_CONFIG,
ConfigDef.Type.LIST, Collections.singletonList("kmf.services:*:*"),
Expand All @@ -61,7 +64,12 @@ public class StatsdMetricsReporterServiceConfig extends AbstractConfig {
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
REPORT_STATSD_PREFIX_DOC);
REPORT_STATSD_PREFIX_DOC)
.define(REPORT_STATSD_TAGS,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
REPORT_STATSD_TAGS_DOC);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,20 @@ public class CommitAvailabilityMetrics {
public CommitAvailabilityMetrics(final Metrics metrics, final Map<String, String> tags) {
LOG.info("{} called.", this.getClass().getSimpleName());
_offsetsCommitted = metrics.sensor("offsets-committed");
_offsetsCommitted.add(new MetricName("offsets-committed-rate", METRIC_GROUP_NAME,
"The average number of offsets per second that are committed", tags), new Rate());
_offsetsCommitted.add(new MetricName("offsets-committed-total", METRIC_GROUP_NAME,
"The total number of offsets per second that are committed.", tags), new CumulativeSum());
"The total number of offsets that are committed", tags), new CumulativeSum());

_failedCommitOffsets = metrics.sensor("failed-commit-offsets");
_failedCommitOffsets.add(new MetricName("failed-commit-offsets-avg", METRIC_GROUP_NAME,
"The average number of offsets per second that have failed.", tags), new Rate());
"The average number of offsets per second that have failed to be committed", tags), new Rate());
_failedCommitOffsets.add(new MetricName("failed-commit-offsets-rate", METRIC_GROUP_NAME,
"The average number of offsets per second that have failed to be committed", tags), new Rate());
_failedCommitOffsets.add(new MetricName("failed-commit-offsets-total", METRIC_GROUP_NAME,
"The total number of offsets per second that have failed.", tags), new CumulativeSum());
"The total number of offsets that have failed to be committed", tags), new CumulativeSum());

metrics.addMetric(new MetricName("offsets-committed-avg", METRIC_GROUP_NAME, "The average offset commits availability.", tags),
metrics.addMetric(new MetricName("offsets-committed-avg", METRIC_GROUP_NAME, "The average offset commit availability since startup", tags),
(MetricConfig config, long now) -> {
Object offsetCommitTotal = metrics.metrics().get(metrics.metricName("offsets-committed-total", METRIC_GROUP_NAME, tags)).metricValue();
Object offsetCommitFailTotal = metrics.metrics().get(metrics.metricName("failed-commit-offsets-total", METRIC_GROUP_NAME, tags)).metricValue();
Expand All @@ -57,5 +61,18 @@ public CommitAvailabilityMetrics(final Metrics metrics, final Map<String, String
return 0;
}
});

metrics.addMetric(new MetricName("commit-availability-avg", METRIC_GROUP_NAME, "The average commit availability", tags),
(MetricConfig config, long now) -> {
Object offsetCommitRate = metrics.metrics().get(metrics.metricName("offsets-committed-rate", METRIC_GROUP_NAME, tags)).metricValue();
Object offsetCommitFailRate = metrics.metrics().get(metrics.metricName("failed-commit-offsets-rate", METRIC_GROUP_NAME, tags)).metricValue();
if (offsetCommitRate != null && offsetCommitFailRate != null) {
double offsetsCommittedCount = (double) offsetCommitRate;
double offsetsCommittedErrorCount = (double) offsetCommitFailRate;
return offsetsCommittedCount / (offsetsCommittedCount + offsetsCommittedErrorCount);
} else {
return 0;
}
});
}
}