Producer and Consumer using Custom Attributes and Distributed Tracing
Version Requirements:
Java Agent 4.12 and greater
Distributed Tracing requires Kafka Client 0.11.0.0 or higher
Intro
This is a quick up and running example to see trace data from your Kafka producer and consumer. We’ll go over how to add a custom attribute to the Trace and Span that you’ll see on a Transaction Trace and on the Span in the Distribute Trace section of New Relic. In this example I’ll use excerpts for the relevant methods where I’ll add calls to the New Relic Java Agent API. The example uses the Kafka’s own example code from the github repository and I’ve just added in the New Relic code.
Kafka Project Examples
New Relic Kafka Documentation
Why this topic?
Most modules that we provide for a framework work for a set of Class and Methods that are most relevant to the type of Transaction data that we expect a customer to need. Because Kafka is a high-performance messaging system that generates a lot of data, you can customize the agent for your app’s specific throughput and use cases. We collect metric data out of the box, however, we leave it to you to decide whether to record a Transaction.
Our documentation covers much of this, however, I wanted to provide a working example using Kafka’s own simple example app. Similarly, I found a common request is to see the topic and offset as attributes, so I’m adding in a simple example.
What will this solve/make easier?
This should give you an idea of the happy path for a basic implementation along with a simple extension by adding custom attributes to span and trace events.
For initial setup, you’ll of course need to install the Java Agent and enable Distributed Tracing in the newrelic.yml file.
To start with a complete working Kafka example, download the project from the link below, then add the New Relic Custom Instrumentation to the points noted below.
Kafka Project Examples
New Relic Kafka Documentation
In the code below we are starting Transactions with the @Trace dispatcher argument and naming them MyConsumerTransaction and MyProducerTransaction. Likewise, we want to get the offset and and the topic from each request. One thing to note is that offset
is a keyword in New Relic Insights and will cause issues with queries for the attribute. Make sure to name the offset
to something other than offset
. Below, I’m using kafka_offset
.
From the Consumer:
@Trace(dispatcher = true, metricName = "MyConsumerTransaction")
public void doWork() {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
NewRelic.addCustomParameter("mytopic",topic);
for (ConsumerRecord<Integer, String> record : records) {
NewRelic.addCustomParameter("kafka__offset", record.offset());
Iterable<Header> headers = record.headers().headers("newrelic");
for(Header header : headers){
NewRelic.getAgent().getTransaction().acceptDistributedTracePayload(new
String(header.value(), StandardCharsets.UTF_8));
}
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
An important thing worth noting in the snippet below is that instead of creating an annotation on the run() method, I am calling the producer work from another method. Tracing the run would essentially prevent the created Transaction from ever finishing as it would run the entire life of the while loop. In actuality you’d get a long transaction report when the application is finally shutdown, but you wouldn’t see any useful Transaction data because it would be all one Transaction that never shows in the New Relic UI.
From the Producer:
public void run() {
int messageNo = 1;
while (true) {
send(messageNo);
++messageNo;
}
}
@Trace(dispatcher = true, metricName = "MyProducerTransaction")
public void send(int messageNo){
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
Here’s a few screenshots where you can see the results of the Trace annotations and custom attribute calls.
Transactions
Span Attributes
Attributes on Trace Detail
Producer Span
Make sure to check out our Kafka Documentation and the github for Kafka code samples linked above. Hopefully, this gives you an idea of the “happy path” with a little extra by adding the custom attributes for Trace and Span attributes.