Your data. Anywhere you go.

New Relic for iOS or Android


Download on the App Store    Android App on Google play


New Relic Insights App for iOS


Download on the App Store


Learn more

Close icon

Relic Solution: Kafka up and running with Custom Attributes and Distributed Tracing

kafka
java
transactions
topic
javaagent
distributedtracing
offset
span

#1

Producer and Consumer using Custom Attributes and Distributed Tracing
Version Requirements:
Java Agent 4.12 and greater
Distributed Tracing requires Kafka Client 0.11.0.0 or higher

Intro
This is a quick up and running example to see trace data from your Kafka producer and consumer. We’ll go over how to add a custom attribute to the Trace and Span that you’ll see on a Transaction Trace and on the Span in the Distribute Trace section of New Relic. In this example I’ll use excerpts for the relevant methods where I’ll add calls to the New Relic Java Agent API. The example uses the Kafka’s own example code from the github repository and I’ve just added in the New Relic code.
Kafka Project Examples
New Relic Kafka Documentation

Why this topic?
Most modules that we provide for a framework work for a set of Class and Methods that are most relevant to the type of Transaction data that we expect a customer to need. Because Kafka is a high-performance messaging system that generates a lot of data, you can customize the agent for your app’s specific throughput and use cases. We collect metric data out of the box, however, we leave it to you to decide whether to record a Transaction.
Our documentation covers much of this, however, I wanted to provide a working example using Kafka’s own simple example app. Similarly, I found a common request is to see the topic and offset as attributes, so I’m adding in a simple example.

What will this solve/make easier?
This should give you an idea of the happy path for a basic implementation along with a simple extension by adding custom attributes to span and trace events.

For initial setup, you’ll of course need to install the Java Agent and enable Distributed Tracing in the newrelic.yml file.

New Relic Kafka Documentation

Java Agent Installation

To start with a complete working Kafka example, download the project from the link below, then add the New Relic Custom Instrumentation to the points noted below.
Kafka Project Examples
New Relic Kafka Documentation

In the code below we are starting Transactions with the @Trace dispatcher argument and naming them MyConsumerTransaction and MyProducerTransaction. Likewise, we want to get the offset and and the topic from each request. One thing to note is that offset is a keyword in New Relic Insights and will cause issues with queries for the attribute. Make sure to name the offset to something other than offset. Below, I’m using kafka_offset.

From the Consumer:

    @Trace(dispatcher = true, metricName = "MyConsumerTransaction")
    public void doWork() {
        consumer.subscribe(Collections.singletonList(this.topic));
        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
        NewRelic.addCustomParameter("mytopic",topic);

        for (ConsumerRecord<Integer, String> record : records) {
            NewRelic.addCustomParameter("kafka__offset", record.offset());
            Iterable<Header> headers = record.headers().headers("newrelic");
            for(Header header : headers){
                NewRelic.getAgent().getTransaction().acceptDistributedTracePayload(new 
                String(header.value(), StandardCharsets.UTF_8));

        }
        System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
    }
}

An important thing worth noting in the snippet below is that instead of creating an annotation on the run() method, I am calling the producer work from another method. Tracing the run would essentially prevent the created Transaction from ever finishing as it would run the entire life of the while loop. In actuality you’d get a long transaction report when the application is finally shutdown, but you wouldn’t see any useful Transaction data because it would be all one Transaction that never shows in the New Relic UI.

From the Producer:

    public void run() {
    int messageNo = 1;
    while (true) {
        send(messageNo);
        ++messageNo;
    }
}

@Trace(dispatcher = true, metricName = "MyProducerTransaction")
public void send(int messageNo){
    String messageStr = "Message_" + messageNo;
    long startTime = System.currentTimeMillis();
    if (isAsync) { // Send asynchronously
        producer.send(new ProducerRecord<>(topic,
                messageNo,
                messageStr), new DemoCallBack(startTime, messageNo, messageStr));
    } else { // Send synchronously
        try {
            producer.send(new ProducerRecord<>(topic,
                    messageNo,
                    messageStr)).get();
            System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

}

Here’s a few screenshots where you can see the results of the Trace annotations and custom attribute calls.

Transactions

Span Attributes

Attributes on Trace Detail

Producer Span

Make sure to check out our Kafka Documentation and the github for Kafka code samples linked above. Hopefully, this gives you an idea of the “happy path” with a little extra by adding the custom attributes for Trace and Span attributes.


#2

HI,
Thats a great example on trace data. We are trying to implement a similar tracing (Java Agent + Kafka consumer + producer). We observe a greater latency when the headers are added to the messages (at times 100- 200% increase in message processing time).
In the documentation https://docs.newrelic.com/docs/agents/java-agent/instrumentation/instrument-kafka-message-queues#collect-kafka-distributed-traces *[Kafka distributed traces] explaining addition of a 150 to 200 byte payload. The message size in our kafka is 455B without compression. We have lz4 compression enabled.
Couple of questions on this:

  1. Is the kafka message size too small that its adding this huge latency when the newrelic header is appended?
  2. Is there a way around like by having header attached to 1 message/ poll instead of every single message. Is there anything on metrics perspective we will be missing out?

#3
  1. Is the kafka message size too small that its adding this huge latency when the newrelic header is appended?

Adding the distributed trace payload is certainly a risk for really small message. I need to check to see what the payload size is. I’ll update on that asap.

  1. Is there a way around like by having header attached to 1 message/ poll instead of every single message. Is there anything on metrics perspective we will be missing out?

It might be possible to build that into the application logic, but I don’t think you’d get the results you want in the Distributed Tracing view.

However, I’m going to investigate each of these a little more closely and update here on this post.