Relic Solution: Kafka up and running with Custom Attributes and Distributed Tracing

Producer and Consumer using Custom Attributes and Distributed Tracing
Version Requirements:
Java Agent 4.12 and greater
Distributed Tracing requires Kafka Client or higher

This is a quick up and running example to see trace data from your Kafka producer and consumer. We’ll go over how to add a custom attribute to the Trace and Span that you’ll see on a Transaction Trace and on the Span in the Distribute Trace section of New Relic. In this example I’ll use excerpts for the relevant methods where I’ll add calls to the New Relic Java Agent API. The example uses the Kafka’s own example code from the github repository and I’ve just added in the New Relic code.
Kafka Project Examples
New Relic Kafka Documentation

Why this topic?
Most modules that we provide for a framework work for a set of Class and Methods that are most relevant to the type of Transaction data that we expect a customer to need. Because Kafka is a high-performance messaging system that generates a lot of data, you can customize the agent for your app’s specific throughput and use cases. We collect metric data out of the box, however, we leave it to you to decide whether to record a Transaction.
Our documentation covers much of this, however, I wanted to provide a working example using Kafka’s own simple example app. Similarly, I found a common request is to see the topic and offset as attributes, so I’m adding in a simple example.

What will this solve/make easier?
This should give you an idea of the happy path for a basic implementation along with a simple extension by adding custom attributes to span and trace events.

For initial setup, you’ll of course need to install the Java Agent and enable Distributed Tracing in the newrelic.yml file.

New Relic Kafka Documentation

Java Agent Installation

To start with a complete working Kafka example, download the project from the link below, then add the New Relic Custom Instrumentation to the points noted below.
Kafka Project Examples
New Relic Kafka Documentation

In the code below we are starting Transactions with the @Trace dispatcher argument and naming them MyConsumerTransaction and MyProducerTransaction. Likewise, we want to get the offset and and the topic from each request. One thing to note is that offset is a keyword in New Relic Insights and will cause issues with queries for the attribute. Make sure to name the offset to something other than offset. Below, I’m using kafka_offset.

From the Consumer:

    @Trace(dispatcher = true, metricName = "MyConsumerTransaction")
    public void doWork() {
        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));

        for (ConsumerRecord<Integer, String> record : records) {
            NewRelic.addCustomParameter("kafka__offset", record.offset());
            Iterable<Header> headers = record.headers().headers("newrelic");
            for(Header header : headers){
                String(header.value(), StandardCharsets.UTF_8));

        System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());

An important thing worth noting in the snippet below is that instead of creating an annotation on the run() method, I am calling the producer work from another method. Tracing the run would essentially prevent the created Transaction from ever finishing as it would run the entire life of the while loop. In actuality you’d get a long transaction report when the application is finally shutdown, but you wouldn’t see any useful Transaction data because it would be all one Transaction that never shows in the New Relic UI.

From the Producer:

    public void run() {
    int messageNo = 1;
    while (true) {

@Trace(dispatcher = true, metricName = "MyProducerTransaction")
public void send(int messageNo){
    String messageStr = "Message_" + messageNo;
    long startTime = System.currentTimeMillis();
    if (isAsync) { // Send asynchronously
        producer.send(new ProducerRecord<>(topic,
                messageStr), new DemoCallBack(startTime, messageNo, messageStr));
    } else { // Send synchronously
        try {
            producer.send(new ProducerRecord<>(topic,
            System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
        } catch (InterruptedException | ExecutionException e) {


Here’s a few screenshots where you can see the results of the Trace annotations and custom attribute calls.


Span Attributes

Attributes on Trace Detail

Producer Span

Make sure to check out our Kafka Documentation and the github for Kafka code samples linked above. Hopefully, this gives you an idea of the “happy path” with a little extra by adding the custom attributes for Trace and Span attributes.


Thats a great example on trace data. We are trying to implement a similar tracing (Java Agent + Kafka consumer + producer). We observe a greater latency when the headers are added to the messages (at times 100- 200% increase in message processing time).
In the documentation *[Kafka distributed traces] explaining addition of a 150 to 200 byte payload. The message size in our kafka is 455B without compression. We have lz4 compression enabled.
Couple of questions on this:

  1. Is the kafka message size too small that its adding this huge latency when the newrelic header is appended?
  2. Is there a way around like by having header attached to 1 message/ poll instead of every single message. Is there anything on metrics perspective we will be missing out?
  1. Is the kafka message size too small that its adding this huge latency when the newrelic header is appended?

Adding the distributed trace payload is certainly a risk for really small message. I need to check to see what the payload size is. I’ll update on that asap.

  1. Is there a way around like by having header attached to 1 message/ poll instead of every single message. Is there anything on metrics perspective we will be missing out?

It might be possible to build that into the application logic, but I don’t think you’d get the results you want in the Distributed Tracing view.

However, I’m going to investigate each of these a little more closely and update here on this post.

Thanks for your reply. I have follow up questions to your reply.
Wondering what kafka message size is considered too small for it to add huge latency given my numbers in the previous post.
Also I applied code changes in application to allow 1 header per poll (10000 messages). It still did not improve on the processing time.

Additionally I was wondering how to display transaction metrics when application communicates with Redis similar to one generated with Database. Do we have to integrate the Redis calls as External Services? An example would be beneficial. I would not want to install agent on the Redis machine (like in infrastructure mode)

You should see external calls for Redis from the APM (Java) agent. We show datastore information for Redis with Jedis Redis driver 1.4.0 to latest. Similarly we have instance level Redis db information for Jedis Redis driver 1.4 to 2.10.x.

The header size is about 45% the size of the entire message. It seems reasonable to think that it you’d see a higher latency than without the additional header. I think customers are making a case by case decision on the header size. When you say you’re still seeing high latency, is that only for the send that includes the header? Or, are you saying the you are seeing increased latency on the sends that do not include the distributed trace payload header?