How to associate background threads with request - Scala

I’m using 3.37.0 without this problem. You might want to ensure that the version of the JAR that is deployed in your production environment is at version 3.37.0, not just the thin agent facade JAR that you bundle in with your app.

1 Like

Hey @mauro.capolupo - Did @jimm’s suggestion work for you! Feel free to click the check box icon if that solved the problem. Thanks!

With the new 3.37.0 async support, I was able to successfully track our Scala Future processes, and also our asynchronous javax.ws.rs.client.Client calls.

At Porch we have a tradition in the engineering department called Dev Day. Every few months the devs get two or three days to focus on something we’re passionate about, so I took advantage of that to get this working in a Scala app. I was just able to prove the concept this morning and it will take time to get the code cleaned up into something that I’d want to share, but I’ll include more in this thread later with details.

In turned out to be 20x more complicated than I expected. Here’s what I had to implement:

  • A custom Scala ExecutionContext which can propagate the NR Transaction to subsequent threads. This utilized the new Token feature added in 3.37.0 to “link” the next unit of work, and expire it afterwards.
  • A custom servlet Filter to create an object that stores my working data (the NR Transaction and a unique ID that helps me track everything).
  • I had to wrap my existing javax.ws.rs.client.Client instances in a proxy, so that I could create a NR Segment when a client API call is being configured, while we’re still inside a thread that is linked to an active transaction. (This also required me to proxy javax.ws.rs.client.WebTarget.) I wanted to solve this with a ClientRequestFilter but async filters are invoked on a totally different thread pool, and I had no way to link back to the original NR transaction.
  • A custom ClientRequestFilter, which can hydrate the NR Segment with additional data when the Jersey client is ready to make the request. For example it passes HttpParameters to segment.reportAsExternal().
  • A custom ClientResponseFilter which allows me to call segment.end() when a client API call completes.
  • Finally I also had to augment our async endpoints so that we capture the NR Transaction at the beginning of our processing. I wanted to do this with the servlet Filter but my filter was getting called before the true transaction had been created. Luckily we already route our async endpoints through a utility so it was easy to add this and have it affect the entire app.
4 Likes

https://s3.amazonaws.com/uploads.hipchat.com/75009/1168625/pLiCb1nvXFOOYLa/upload.png

4 Likes

Did you share your code in some project on github? Maybe newrelic developed something more easy to use or it has plan to export this logic with just a simple annotation? @Linds

I haven’t published it anywhere yet since I have been simplifying it a little bit since the original proof of concept.

1 Like

Thanks @jimm. I will follow this post! It is really interesting!

1 Like

HI @jimm, I’d be very interested to see an example of this code if you are happy to share it.

cheers

1 Like

+1 to what @adrian.fitzpatrick said!

@jimm - show us what you got! :blush:

@adrian.fitzpatrick and @Linds - apologies for not sharing the code yet. My first pass at this was the result of a few days of experimentation and it was probably more complicated than it needs to be. I kept holding back, hoping to simplify that before putting it out in the world, but just haven’t had time to revisit it yet. I probably won’t have time for a while so I’ll quickly dump out some info here, maybe it will get you started…

Here’s the basic NewRelicExecutionContext which is one part of the puzzle.

First, my application code which normally imports the global default ExecutionContext now imports an instance of NewRelicExecutionContext:

// A mixin for my application classes that allows me to control the default EC in a single place
trait FutureContext {
  implicit val executionContext = new NewRelicExecutionContext(ExecutionContext.global)
}

Second, my resource endpoints all extend a common base class AsyncResource so that all work gets wrapped in a bit of code where I can ensure that the NewRelicExecutionContext can associate the current thread with the New Relic request:

  // My endpoints end up getting wrapped in code that looks like this, to make monitoring easier.
  private def monitor[T](doSomething: ExecutionContext => T)(implicit ec: ExecutionContext): T = {
    val newEc = NewRelicExecutionContext.startTransaction
    try {
      doSomething(newEc)
    } finally {
      NewRelicExecutionContext.clearThreadLocalTx()
    }
  }

Here’s the execution context itself:

import com.newrelic.api.agent.{NewRelic, Token, Trace, Transaction}
import org.slf4j.LoggerFactory

import scala.concurrent.ExecutionContext

object NewRelicExecutionContext {

  private val logger = LoggerFactory.getLogger(getClass)
  private val threadLocal = new ThreadLocal[Transaction]

  /**
   * Called by AsyncResource endpoint wrappers, so that we don't try to grab the NewRelic Transaction until we are
   * absolutely certain that the correct transaction has been set up.  This avoids trying too early and getting a
   * default no-op transaction.
   */
  def startTransaction(implicit ec: ExecutionContext): ExecutionContext = {
    val newTransaction = NewRelic.getAgent.getTransaction
    setThreadLocalTx(newTransaction)
    new NewRelicExecutionContext(ec, Option(newTransaction))
  }

  private[scala] def debug(msg: => String): Unit = {
    //logger.info(s"${Thread.currentThread.getName} $msg")
  }

  def getThreadLocalTx: Option[Transaction] = Option(threadLocal.get)
  private[scala] def setThreadLocalTx(tx: Transaction): Unit = threadLocal.set(tx)
  def clearThreadLocalTx(): Unit = threadLocal.remove()
}


class NewRelicExecutionContext(wrappedContext: ExecutionContext, tx: Option[Transaction] = None, token: Option[Token] = None) extends ExecutionContext {
  import NewRelicExecutionContext._

  override def execute(work: Runnable): Unit =
    wrappedContext.execute(new ExecutionWrapper(work, tx, token))

  override def reportFailure(cause: Throwable): Unit =
    wrappedContext.reportFailure(cause)

  override def prepare(): ExecutionContext = {
    val nextTx = tx.orElse(getThreadLocalTx)
    val nextToken = nextTx.map(_.getToken)
    new NewRelicExecutionContext(wrappedContext.prepare(), nextTx, nextToken)
  }
}


/**
 * Wraps execution of the code inside the Future callbacks so that we can trace it, and also so that
 * we can continue to propagate the original request & response.
 */
class ExecutionWrapper(work: Runnable, currentTx: Option[Transaction], token: Option[Token]) extends Runnable {
  import NewRelicExecutionContext._

  @Trace(dispatcher=true, async=true)
  override def run(): Unit = {
    try {
      setup()
      work.run()
    } finally {
      tearDown()
    }
  }

  private def setup(): Unit = {
    // Link the token, so New Relic can associate this work with the original transaction
    token.filter(_.link) // .foreach { tok => debug(s"Linked $tok on tx=$currentTx")}

    // Remember the transaction in the thread-local variable during this work so that any new
    // Futures also get tracked.  This is critical to propagating along the current transaction.
    currentTx.foreach { tx => setThreadLocalTx(tx) }
  }

  private def tearDown(): Unit = {
    token.foreach(_.expire())
    clearThreadLocalTx()
  }
}

That execution context gets you half of the way there, where all background Future work will be traceable as long as it starts out in one of my top-level entry points (which call startTransaction, etc).

Another big piece is that at application startup, when I am initializing my javax.ws.rs.Client instances to talk to other services, I create Client proxies which can surface the client call in New Relic.

The main entry point is a class called NewRelicClient, and you proxy your existing naked client like val myClient = NewRelicClient(rawClient):

import javax.ws.rs.client.{Client, ClientRequestContext, ClientRequestFilter, ClientResponseContext, ClientResponseFilter, Invocation, WebTarget}

import com.newrelic.api.agent.{HeaderType, HttpParameters, OutboundHeaders, Segment}
import org.slf4j.LoggerFactory

/**
 * Observes client API calls and records timings in New Relic, working in unison with NewRelicExecutionContext.
 * This works by creating a proxy around all the public methods in the client.
 *
 * If you have an existing client, just wrap it like this:
 * {{{
 *  val client = NewRelicClient(myRealClient)
 * }}}
 *
 * Or, if you would normally use a ClientSupplierBundle, use NewRelicClientSupplierBundle.
 */
object NewRelicClient {

  private val logger = LoggerFactory.getLogger(getClass)

  private val propertyBase = getClass.getName
  private val segmentProperty = s"$propertyBase/Segment"
  private val requestFilter = new RequestFilter()
  private val responseFilter = new ResponseFilter()


  def apply(client: Client): Client = {
    client match {
      case alreadyMonitored: MonitoredClient =>
        alreadyMonitored

      case _ =>
        debug(s"monitor() client=$client")
        client.register(requestFilter, classOf[ClientRequestFilter])
        client.register(responseFilter, classOf[ClientResponseFilter])
        new MonitoredClient(client)
    }
  }

  private def debug(message: => String): Unit = {
    //logger.info(s"${Thread.currentThread.getName} $message")
  }

  private def attachNewSegmentToRequest(builder: Invocation.Builder): Invocation.Builder = {
    NewRelicExecutionContext
      .getThreadLocalTx
      .map { transaction =>
        val segment = transaction.startSegment("ApiClient", "Fetch")
        builder.property(segmentProperty, segment)
      }
      .getOrElse(builder)
  }


  private class MonitoredClient(client: Client) extends ClientWrapper(client) {
    override def reinstantiate(newClient: Client) = new MonitoredClient(newClient)
    override def wrapWebTarget(target: WebTarget): WebTarget = new MonitoredWebTarget(target)
    override def wrapInvocationBuilder(builder: Invocation.Builder) = attachNewSegmentToRequest(builder)
  }


  private class MonitoredWebTarget(target: WebTarget) extends WebTargetWrapper(target) {
    override def reinstantiate(newTarget: WebTarget) = new MonitoredWebTarget(newTarget)
    override def wrapInvocationBuilder(builder: Invocation.Builder) = attachNewSegmentToRequest(builder)
  }


  private class RequestFilter extends ClientRequestFilter {
    override def filter(request: ClientRequestContext): Unit = {
      val segment = request.getProperty(segmentProperty).asInstanceOf[Segment]
      if (segment != null) {
        val uri = request.getUri
        val description = s"API@${uri.getHost}:${uri.getPort}"

        segment.reportAsExternal(HttpParameters
          .library(description)
          .uri(uri)
          .procedure(request.getMethod)
          .noInboundHeaders()
          .build())
        segment.addOutboundRequestHeaders(new Headers(request))
      }
    }
  }


  private class ResponseFilter extends ClientResponseFilter {
    override def filter(request: ClientRequestContext, response: ClientResponseContext): Unit = {
      val segment = request.getProperty(segmentProperty).asInstanceOf[Segment]
      if (segment != null) {
        debug(s"response complete, segment=$segment")
        segment.end()
      }
    }
  }


  private class Headers(request: ClientRequestContext) extends OutboundHeaders {
    override def getHeaderType = HeaderType.HTTP
    override def setHeader(name: String, value: String): Unit = request.getHeaders.putSingle(name, value)
  }

}

The actual runtime proxy for Client is in a Java class called ClientWrapper:

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.Link;
import javax.ws.rs.core.UriBuilder;
import java.net.URI;
import java.util.Map;

/**
 * A Client which delegates all operations to another client.  You can override individual methods to modify the
 * default behavior of the wrapped instance.
 */
public abstract class ClientWrapper implements Client {

    private final Client client;


    public ClientWrapper(Client client) {
        this.client = client;
    }

    protected abstract ClientWrapper reinstantiate(Client newClient);

    protected WebTarget wrapWebTarget(WebTarget target) {
        return target;
    }

    protected Invocation.Builder wrapInvocationBuilder(Invocation.Builder builder) {
        return builder;
    }

    private Client reWrap(Client newClient) {
        if (newClient == client)
            return this;
        else
            return reinstantiate(newClient);
    }

    @Override
    public void close() {
        client.close();
    }

    @Override
    public WebTarget target(String uri) {
        return wrapWebTarget(client.target(uri));
    }

    @Override
    public WebTarget target(URI uri) {
        return wrapWebTarget(client.target(uri));
    }

    @Override
    public WebTarget target(UriBuilder uriBuilder) {
        return wrapWebTarget(client.target(uriBuilder));
    }

    @Override
    public WebTarget target(Link link) {
        return wrapWebTarget(client.target(link));
    }

    @Override
    public Invocation.Builder invocation(Link link) {
        return wrapInvocationBuilder(client.invocation(link));
    }

    @Override
    public SSLContext getSslContext() {
        return client.getSslContext();
    }

    @Override
    public HostnameVerifier getHostnameVerifier() {
        return client.getHostnameVerifier();
    }

    @Override
    public Configuration getConfiguration() {
        return client.getConfiguration();
    }

    @Override
    public Client property(String name, Object value) {
        return reWrap(client.property(name, value));
    }

    @Override
    public Client register(Class<?> componentClass) {
        return reWrap(client.register(componentClass));
    }

    @Override
    public Client register(Class<?> componentClass, int priority) {
        return reWrap(client.register(componentClass, priority));
    }

    @Override
    public Client register(Class<?> componentClass, Class<?>... contracts) {
        return reWrap(client.register(componentClass, contracts));
    }

    @Override
    public Client register(Class<?> componentClass, Map<Class<?>, Integer> contracts) {
        return reWrap(client.register(componentClass, contracts));
    }

    @Override
    public Client register(Object component) {
        return reWrap(client.register(component));
    }

    @Override
    public Client register(Object component, int priority) {
        return reWrap(client.register(component, priority));
    }

    @Override
    public Client register(Object component, Class<?>... contracts) {
        return reWrap(client.register(component, contracts));
    }

    @Override
    public Client register(Object component, Map<Class<?>, Integer> contracts) {
        return reWrap(client.register(component, contracts));
    }

}

And finally this Client proxy further proxies the WebTarget using another Java class WebTargetWrapper:

import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriBuilder;
import java.net.URI;
import java.util.Map;

/**
 * A WebTarget which delegates all operations to another target.  You can override individual methods to modify the
 * default behavior of the wrapped instance.
 */
public abstract class WebTargetWrapper implements WebTarget {

    private final WebTarget target;


    public WebTargetWrapper(WebTarget target) {
        this.target = target;
    }

    protected abstract WebTargetWrapper reinstantiate(WebTarget newTarget);

    protected Invocation.Builder wrapInvocationBuilder(Invocation.Builder builder) {
        return builder;
    }

    private WebTarget reWrap(WebTarget newTarget) {
        if (newTarget == target)
            return this;
        else
            return reinstantiate(newTarget);
    }

    @Override
    public URI getUri() {
        return target.getUri();
    }

    @Override
    public UriBuilder getUriBuilder() {
        return target.getUriBuilder();
    }

    @Override
    public WebTarget path(String path) {
        return reWrap(target.path(path));
    }

    @Override
    public WebTarget resolveTemplate(String name, Object value) {
        return reWrap(target.resolveTemplate(name, value));
    }

    @Override
    public WebTarget resolveTemplate(String name, Object value, boolean encodeSlashInPath) {
        return reWrap(target.resolveTemplate(name, value, encodeSlashInPath));
    }

    @Override
    public WebTarget resolveTemplateFromEncoded(String name, Object value) {
        return reWrap(target.resolveTemplateFromEncoded(name, value));
    }

    @Override
    public WebTarget resolveTemplates(Map<String, Object> templateValues) {
        return reWrap(target.resolveTemplates(templateValues));
    }

    @Override
    public WebTarget resolveTemplates(Map<String, Object> templateValues, boolean encodeSlashInPath) {
        return reWrap(target.resolveTemplates(templateValues, encodeSlashInPath));
    }

    @Override
    public WebTarget resolveTemplatesFromEncoded(Map<String, Object> templateValues) {
        return reWrap(target.resolveTemplatesFromEncoded(templateValues));
    }

    @Override
    public WebTarget matrixParam(String name, Object... values) {
        return reWrap(target.matrixParam(name, values));
    }

    @Override
    public WebTarget queryParam(String name, Object... values) {
        return reWrap(target.queryParam(name, values));
    }

    @Override
    public Invocation.Builder request() {
        return wrapInvocationBuilder(target.request());
    }

    @Override
    public Invocation.Builder request(String... acceptedResponseTypes) {
        return wrapInvocationBuilder(target.request(acceptedResponseTypes));
    }

    @Override
    public Invocation.Builder request(MediaType... acceptedResponseTypes) {
        return wrapInvocationBuilder(target.request(acceptedResponseTypes));
    }

    @Override
    public Configuration getConfiguration() {
        return target.getConfiguration();
    }

    @Override
    public WebTarget property(String name, Object value) {
        return reWrap(target.property(name, value));
    }

    @Override
    public WebTarget register(Class<?> componentClass) {
        return reWrap(target.register(componentClass));
    }

    @Override
    public WebTarget register(Class<?> componentClass, int priority) {
        return reWrap(target.register(componentClass, priority));
    }

    @Override
    public WebTarget register(Class<?> componentClass, Class<?>... contracts) {
        return reWrap(target.register(componentClass, contracts));
    }

    @Override
    public WebTarget register(Class<?> componentClass, Map<Class<?>, Integer> contracts) {
        return reWrap(target.register(componentClass, contracts));
    }

    @Override
    public WebTarget register(Object component) {
        return reWrap(target.register(component));
    }

    @Override
    public WebTarget register(Object component, int priority) {
        return reWrap(target.register(component, priority));
    }

    @Override
    public WebTarget register(Object component, Class<?>... contracts) {
        return reWrap(target.register(component, contracts));
    }

    @Override
    public WebTarget register(Object component, Map<Class<?>, Integer> contracts) {
        return reWrap(target.register(component, contracts));
    }

}
3 Likes

I want to use it this way (Scala Pseudo Code) …

@Trace(dispatcher = true)
def doSomething(): Future[Result] = {
  val token = NewRelic.getAgent.getTransaction.getToken
  val handler = new FutureHandler[Result](token)
  handler.future
}

class FutureHandler[Result](token: Token) {
  val promise = Promise[Result]()
  def future: Future[Result] = promise.future

  @Trace(async = true)
  def onSuccess(res: Res) = {
    val tokenIsLinked = token.link

   promise.success(result)
   // more instrumented code ...

   val tokenIsExpired = token.expire
}

Not sure I am using this right.

In any case the token.link always returns false. Why?

@rolandtritsch in your example, token.link is being called in onSuccess, which is too late.

If I understand correctly, we should link the token on the new thread at the start of processing, and then expire it when finished. If you wait until onSuccess, the work is already completed, so the link would be ineffective.

It also seems important to make sure you call getToken on a thread that New Relic is already monitoring. The agent is very happy to return no-op stubs when you try to do things on unrecognized threads, so the key is to make sure the original thread can propagate the transaction to all of its children threads in an unbroken chain.

3 Likes

We linked threads by newrelic in such way:

trait NewrelicExecutionContext extends ExecutionContext {
  self =>

  override def prepare(): ExecutionContext = new ExecutionContext {
    
    val token = NewRelic.getAgent.getTransaction.getToken

    override def execute(r: Runnable): Unit = {
      self.execute(new Runnable {

        @Trace(async = true, dispatcher = true)
        override def run(): Unit = {    
          token.link()

          try {    
            r.run()
          } finally {            
            token.expire()
          }
        }
      })
    }

    override def reportFailure(cause: Throwable): Unit = self.reportFailure(cause)
  }
}