java lib to easily build Guice scopes and gRPC scopes lib built on top of it

I’ve been working on a collection of few open-source java libraries related to Guice. They are a collection of classes that I was previously copy/pasting/customizing across subsequent projects I was working on. Recently I’ve found some time to polish them a bit and put on GitHub, so maybe others will find them useful. I’m at the point where the API seems to be stable, so before calling this a 1.0 release, I wanted to get some feedback about the API mainly: I myself find it convenient and intuitive, but of course I’m biased, hence this post 😉

The first library guice-context-scopes is a generic framework (that’s a bit big word for just 4 classes 😉 ) for building Guice scopes that are automatically (to some extent at least) passed around when switching between threads:

this lib formally introduces a notion of a ServerSideContext that can be tracked using ContextTrackers when switching between threads. Trackers are in turn used by ContextScopes to obtain the current Context from which scopes will obtain/store scoped objects. To automate the whole process, ContextTrackingExecutor was introduced (extending ThreadPoolExecutor) that automatically transfers contexts when executing a task.

The second are gRPC Guice scopes built on top the first one. It provides rpcScope and listenerCallScope:

Oversimplifying, in case of a streaming client, listenerCallScope spans over processing of a single message from client’s stream, while rpcScope spans over a whole given RPC. Oversimplifying again, in case of a unary client, these 2 scopes have roughly the same span.

In case some of my intentions/documentation is unclear, there’s a small sample project included in the gRPC scopes repo.

Let’s start with the 1st lib: the following is a copy-paste of the code from version 1.0-alpha7:

// Copyright (c) Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
package pl.morgwai.base.guice.scopes;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.google.inject.Key;
import com.google.inject.Provider;



/**
 * Stores attributes associated with some server-side processing/call (such as a servlet request
 * processing, an RPC or a session combining several received calls) and allows to execute
 * operations within itself.
 * <p>
 * Overriding classes must use themselves as {@code Ctx} type argument.</p>
 * <p>
 * Overriding classes usually add properties and methods specific to a given type of call, like
 * given call's arguments etc.</p>
 * <p>
 * If many threads run within the same context, the attributes that they access must be thread-safe
 * or properly synchronized.</p>
 */
public abstract class ServerSideContext<Ctx extends ServerSideContext<Ctx>> {



    final ConcurrentMap<Key<?>, Object> attributes = new ConcurrentHashMap<>();
    final ContextTracker<Ctx> tracker;



    protected ServerSideContext(ContextTracker<Ctx> tracker) {
        this.tracker = tracker;
    }



    /**
     * @see ContextTrackingExecutor#executeWithinAll(java.util.List, Runnable)
     */
    @SuppressWarnings("unchecked")
    public void executeWithinSelf(Runnable operation) {
        tracker.trackWhileExecuting((Ctx) this, operation);
    }



    /**
     * @see ContextTrackingExecutor#executeWithinAll(java.util.List, Callable)
     */
    @SuppressWarnings("unchecked")
    public <T> T executeWithinSelf(Callable<T> operation) throws Exception {
        return tracker.trackWhileExecuting((Ctx) this, operation);
    }



    /**
     * Removes the attribute given by <code>key</code> from this context. This is sometimes useful
     * to force the associated {@link ContextScope} to obtain a new instance from the unscoped
     * provider if the current one is not usable anymore (for example a timed-out connection, etc).
     * <p>
     * <b>Note:</b> If multiple threads run within the same context, care must be taken to prevent
     * some of them from retaining the old stale instance.</p>
     */
    public void removeAttribute(Key<?> key) {
        attributes.remove(key);
    }



    /**
     * For internal use only by {@link ContextScope#scope(Key, com.google.inject.Provider)}.
     */
    @SuppressWarnings("unchecked")
    <T> T provideAttributeIfAbsent(Key<T> key, Provider<T> provider) {
        return (T) attributes.computeIfAbsent(key, (ignored) -> provider.get());
    }
}
// Copyright (c) Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
package pl.morgwai.base.guice.scopes;

import java.util.concurrent.Callable;



/**
 * Allows to track which server-side call is handled by which thread.
 */
public class ContextTracker<Ctx extends ServerSideContext<Ctx>> {



    private final ThreadLocal<Ctx> currentContex = new ThreadLocal<>();



    /**
     * @return context which the calling thread is running within.
     * @see ContextTrackingExecutor#getActiveContexts(ContextTracker...)
     */
    public Ctx getCurrentContext() {
        return currentContex.get();
    }



    void trackWhileExecuting(Ctx ctx, Runnable operation) {
        currentContex.set(ctx);
        try {
            operation.run();
        } finally {
            currentContex.remove();
        }
    }



    <T> T trackWhileExecuting(Ctx ctx, Callable<T> operation) throws Exception {
        currentContex.set(ctx);
        try {
            return operation.call();
        } finally {
            currentContex.remove();
        }
    }
}
// Copyright (c) Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
package pl.morgwai.base.guice.scopes;

import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.Scope;



/**
 * Scopes objects to a call context obtained from the associated {@link ContextTracker}.
 */
public class ContextScope<Ctx extends ServerSideContext<Ctx>> implements Scope {



    final ContextTracker<Ctx> tracker;

    final String name;
    public String getName() { return name; }



    /**
     * @throws RuntimeException if there's no context for the current thread. This most commonly
     * happens when providing a callback to some async method without transferring the context.
     * Use static helper methods {@link ContextTrackingExecutor#getActiveCount()} and
     * {@link ContextTrackingExecutor#executeWithinAll(java.util.List, Runnable)} to fix it:
     * <Pre>
     *class MyClass {
     *
     *    &commat;Inject ContextTracker&lt;ContextT1&gt; tracker1;
     *    &commat;Inject ContextTracker&lt;ContextT2&gt; tracker2;
     *
     *    void myMethod(Object param) {
     *        // myMethod code
     *        var activeCtxList = ContextTrackingExecutor.getActiveContexts(tracker1, tracker2);
     *        someAsyncMethod(param, (callbackParam) -&gt;
     *            ContextTrackingExecutor.executeWithinAll(activeCtxList, () -&gt; {
     *                // callback code
     *            }
     *        ));
     *    }
     *}</pre>
     */
    @Override
    public <T> Provider<T> scope(Key<T> key, Provider<T> unscoped) {
        return () -> {
            try {
                return tracker.getCurrentContext().provideAttributeIfAbsent(key, unscoped);
            } catch (NullPointerException e) {
                // NPE here is a result of a bug that will be usually eliminated in development
                // phase and not happen in production, so we catch NPE instead of checking manually
                // each time.
                throw new RuntimeException("no context for thread "
                        + Thread.currentThread().getName() + " in scope " + name
                        + ". See javadoc for ContextScope.scope(...)");
            }
        };
    }



    public ContextScope(String name, ContextTracker<Ctx> tracker) {
        this.name = name;
        this.tracker = tracker;
    }



    @Override
    public String toString() {
        return name;
    }
}
// Copyright (c) Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
package pl.morgwai.base.guice.scopes;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
 * A {@link ThreadPoolExecutor} that upon task execution automatically updates which thread runs
 * within which {@link ServerSideContext} using supplied {@link #trackers}.
 * <p>
 * Instances usually correspond 1-1 with some type of blocking or time consuming operations, such
 * as CPU/GPU intensive calculations or blocking network communication with some resource.<br/>
 * In case of network operations, a given threadPool size should usually correspond to the pool size
 * of the connections to the given resource.<br/>
 * In case of CPU/GPU intensive operations, it should usually correspond to the number of given
 * cores available to the process.</p>
 * <p>
 * Instances are usually created at app startup, stored on static vars and/or configured for
 * injection using<pre>
 * bind(ContextTrackingExecutor.class)
 *    .annotatedWith(Names.named("someOpTypeExecutor"))
 *    .toInstance(...)</pre>
 * and injected with
 * <pre>@Named("someOpTypeExecutor") ContextTrackingExecutor someOpTypeExecutor</pre></p>
 * <p>
 * If multiple threads run within the same context (for example by using
 * {@link #invokeAll(Collection)}), then the attributes they access must be thread-safe or properly
 * synchronized.</p>
 */
public class ContextTrackingExecutor extends ThreadPoolExecutor {



    final ContextTracker<?>() trackers;

    final String name;
    public String getName() { return name; }



    public ContextTrackingExecutor(String name, int poolSize, ContextTracker<?>... trackers) {
        super(poolSize, poolSize, 0l, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
                new NamedThreadFactory(name));
        this.name = name;
        this.trackers = trackers;
    }



    @Override
    public void execute(Runnable task) {
        final var activeCtxs = getActiveContexts(trackers);
        super.execute(() -> executeWithinAll(activeCtxs, task));
    }



    /**
     * Retrieves all active contexts from supplied trackers.
     */
    public static List<ServerSideContext<?>> getActiveContexts(ContextTracker<?>... trackers) {
        return Arrays.stream(trackers)
                .map((tracker) -> tracker.getCurrentContext())
                .filter((ctx) -> ctx != null)
                .collect(Collectors.toList());
    }



    /**
     * Executes {@code operation} within all contexts supplied via {@code ctxs}.
     * @see #getActiveContexts(ContextTracker...)
     */
    public static void executeWithinAll(List<ServerSideContext<?>> ctxs, Runnable operation) {
        switch (ctxs.size()) {
            case 1:
                ctxs.get(0).executeWithinSelf(operation);
                return;
            case 2:
                ctxs.get(1).executeWithinSelf(() -> ctxs.get(0).executeWithinSelf(operation));
                return;
            case 0:
                log.warn(Thread.currentThread().getName() + " is running outside of any context");
                operation.run();
                return;
            default:
                executeWithinAll(
                    ctxs.subList(1, ctxs.size()),
                    () -> ctxs.get(0).executeWithinSelf(operation)
                );
        }
    }



    /**
     * Executes {@code operation} within all contexts supplied via {@code ctxs}.
     * @see #getActiveContexts(ContextTracker...)
     */
    public static <T> T executeWithinAll(List<ServerSideContext<?>> ctxs, Callable<T> operation)
            throws Exception {
        switch (ctxs.size()) {
            case 1:
                return ctxs.get(0).executeWithinSelf(operation);
            case 2:
                return ctxs.get(1).executeWithinSelf(
                        () -> ctxs.get(0).executeWithinSelf(operation));
            case 0:
                log.warn(Thread.currentThread().getName() + " is running outside of any context");
                return operation.call();
            default:
                return executeWithinAll(
                    ctxs.subList(1, ctxs.size()),
                    () -> ctxs.get(0).executeWithinSelf(operation)
                );
        }
    }



    @Override
    public <T> Future<T> submit(Callable<T> task) {
        final var activeCtxs = getActiveContexts(trackers);
        return super.submit(() -> executeWithinAll(activeCtxs, task));
    }



    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        final var activeCtxs = getActiveContexts(trackers);
        return super.submit(
            () -> executeWithinAll(activeCtxs, task),
            result
        );
    }



    @Override
    public Future<?> submit(Runnable task) {
        final var activeCtxs = getActiveContexts(trackers);
        return super.submit(() -> executeWithinAll(activeCtxs, task));
    }



    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        return super.invokeAll(wrapTasks(tasks));
    }



    private <T> List<Callable<T>> wrapTasks(Collection<? extends Callable<T>> tasks) {
        final var activeCtxs = getActiveContexts(trackers);
        return tasks.stream()
                .map((task) -> (Callable<T>) () -> executeWithinAll(activeCtxs, task))
                .collect(Collectors.toList());
    }



    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
            TimeUnit unit) throws InterruptedException {
        return super.invokeAll(wrapTasks(tasks), timeout, unit);
    }



    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        return super.invokeAny(wrapTasks(tasks));
    }



    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        return super.invokeAny(wrapTasks(tasks), timeout, unit);
    }



    public ContextTrackingExecutor(
            String name,
            int poolSize,
            BlockingQueue<Runnable> workQueue,
            ContextTracker<?>... trackers) {
        super(poolSize, poolSize, 0l, TimeUnit.SECONDS, workQueue);
        this.name = name;
        this.trackers = trackers;
    }



    public ContextTrackingExecutor(
            String name,
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler,
            ContextTracker<?>... trackers) {
        super(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                handler);
        this.name = name;
        this.trackers = trackers;
    }



    /**
     * Calls {@link #shutdown()} and waits <code>timeoutSeconds</code> for termination. If it fails,
     * calls {@link #shutdownNow()}.
     * Logs outcome to {@link Logger} named after this class.
     * @return <code>null</code> if the executor was shutdown cleanly, list of tasks returned by
     *     {@link #shutdownNow()} otherwise.
     */
    public List<Runnable> tryShutdownGracefully(long timeoutSeconds) {
        shutdown();
        try {
            awaitTermination(timeoutSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException e) {}
        if ( ! isTerminated()) {
            final int activeCount = getActiveCount();
            final List<Runnable> unstartedTasks = shutdownNow();
            log.warn(activeCount + " active and " + unstartedTasks.size()
                    + " unstarted tasks are still remaining in executor " + name);
            return unstartedTasks;
        } else {
            log.info("executor " + name + " shutdown completed");
            return null;
        }
    }



    static class NamedThreadFactory implements ThreadFactory {

        static final ThreadGroup contextTrackingExecutors;

        final ThreadGroup threadGroup;
        final AtomicInteger threadNumber;
        final String namePrefix;



        static {
            final var securityManager = System.getSecurityManager();
            final var parentThreadGroup = securityManager != null
                    ? securityManager.getThreadGroup()
                    : Thread.currentThread().getThreadGroup();
            contextTrackingExecutors =
                    new ThreadGroup(parentThreadGroup, "ContextTrackingExecutors");
            contextTrackingExecutors.setDaemon(false);
        }



        NamedThreadFactory(String name) {
            threadGroup = new ThreadGroup(contextTrackingExecutors, name);
            threadNumber = new AtomicInteger(1);
            namePrefix = name + "-thread-";
        }



        @Override
        public Thread newThread(Runnable task) {
            return new Thread(threadGroup, task, namePrefix + threadNumber.getAndIncrement());
        }
    }



    static final Logger log = LoggerFactory.getLogger(ContextTrackingExecutor.class.getName());
}

Now gRPC scopes version 1.0-alpha6:

// Copyright (c) Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
package pl.morgwai.base.grpc.scopes;

import pl.morgwai.base.guice.scopes.ContextTracker;
import pl.morgwai.base.guice.scopes.ServerSideContext;



/**
 * Context of a single call to one of the methods of {@link io.grpc.ServerCall.Listener}.
 * Each method of a <code>Listener</code> is executed with a new <code>ListenerCallContext</code>.
 *
 * @see GrpcModule#listenerCallScope corresponding <code>Scope</code>
 * @see <a href="https://gist.github.com/morgwai/6967bcf51b8ba586847c7f1922c99b88">a simplified
 *      overview of relation between methods of <code>Listener</code> and user code</a>
 * @see <a href="https://github.com/grpc/grpc-java/blob/master/stub/src/main/java/io/grpc/stub/
ServerCalls.java">ServerCalls source for detailed info</a>
 */
public class ListenerCallContext extends ServerSideContext<ListenerCallContext> {



    ListenerCallContext(ContextTracker<ListenerCallContext> tracker) {
        super(tracker);
    }
}
// Copyright (c) Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
package pl.morgwai.base.grpc.scopes;

import io.grpc.Metadata;
import io.grpc.ServerCall;

import pl.morgwai.base.guice.scopes.ContextTracker;
import pl.morgwai.base.guice.scopes.ServerSideContext;



/**
 * Context of a given RPC ({@link io.grpc.ServerCall}).
 * A single instance spans over the whole processing of a given RPC: from the beginning of the
 * invocation of a given remote procedure, across all its messages, until the RPC is closed.
 * Specifically {@link io.grpc.ServerCallHandler#startCall(ServerCall, io.grpc.Metadata)} and all
 * methods of the returned {@link io.grpc.ServerCall.Listener} are executed within the same
 * <code>RpcContext</code>.
 *
 * @see GrpcModule#rpcScope corresponding <code>Scope</code>
 * @see <a href="https://gist.github.com/morgwai/6967bcf51b8ba586847c7f1922c99b88">a simplified
 *      overview of relation between methods of <code>Listener</code> and user code</a>
 * @see <a href="https://github.com/grpc/grpc-java/blob/master/stub/src/main/java/io/grpc/stub/
ServerCalls.java">ServerCalls source for detailed info</a>
 */
public class RpcContext extends ServerSideContext<RpcContext> {



    final ServerCall<?, ?> rpc;
    public ServerCall<?, ?> getRpc() { return rpc; }

    final Metadata headers;
    public Metadata getHeaders() { return headers; }



    RpcContext(ServerCall<?, ?> rpc, Metadata headers, ContextTracker<RpcContext> tracker) {
        super(tracker);
        this.rpc = rpc;
        this.headers = headers;
    }
}
// Copyright (c) Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
package pl.morgwai.base.grpc.scopes;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;



/**
 * Creates and starts tracking a new {@link RpcContext} for each new RPC ({@link ServerCall})
 * and a new {@link ListenerCallContext} for each {@link Listener} call.
 *
 * @see GrpcModule
 */
public class ContextInterceptor implements ServerInterceptor {



    final GrpcModule grpcModule;



    @Override
    public <Request, Response> Listener<Request> interceptCall(
            ServerCall<Request, Response> call,
            Metadata headers,
            ServerCallHandler<Request, Response> handler) {
        final RpcContext rpcContext = grpcModule.newRpcContext(call, headers);
        final Listener<Request> listener;
        try {
            listener = rpcContext.executeWithinSelf(() -> handler.startCall(call, headers));
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            return null;  // dead code: result of wrapping handler.startCall(...) with a Callable
        }

        return new Listener<Request>() {

            @Override
            public void onMessage(Request message) {
                rpcContext.executeWithinSelf(() -> {
                    grpcModule.newListenerCallContext().executeWithinSelf(
                        () -> listener.onMessage(message)
                    );
                });
            }

            @Override
            public void onHalfClose() {
                rpcContext.executeWithinSelf(() -> {
                    grpcModule.newListenerCallContext().executeWithinSelf(
                        () -> listener.onHalfClose()
                    );
                });
            }

            @Override
            public void onCancel() {
                rpcContext.executeWithinSelf(() -> {
                    grpcModule.newListenerCallContext().executeWithinSelf(
                        () -> listener.onCancel()
                    );
                });
            }

            @Override
            public void onComplete() {
                rpcContext.executeWithinSelf(() -> {
                    grpcModule.newListenerCallContext().executeWithinSelf(
                        () -> listener.onComplete()
                    );
                });
            }

            @Override
            public void onReady() {
                rpcContext.executeWithinSelf(() -> {
                    grpcModule.newListenerCallContext().executeWithinSelf(
                        () -> listener.onReady()
                    );
                });
            }
        };
    }



    public ContextInterceptor(GrpcModule grpcModule) {
        this.grpcModule = grpcModule;
    }
}
// Copyright (c) Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
package pl.morgwai.base.grpc.scopes;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scope;
import com.google.inject.TypeLiteral;
import io.grpc.Metadata;
import io.grpc.ServerCall;

import pl.morgwai.base.guice.scopes.ContextScope;
import pl.morgwai.base.guice.scopes.ContextTracker;
import pl.morgwai.base.guice.scopes.ContextTrackingExecutor;



/**
 * gRPC Guice {@link Scope}s, {@link ContextTracker}s and some helper methods.
 * <p>
 * <b>NO STATIC:</b> by a common convention, objects such as <code>Scope</code>s are usually stored
 * on <code>static</code> vars. Global context however has a lot of drawbacks. Instead, create just
 * 1 {@code GrpcModule} instance in your app initialization code (for example on a local var in your
 * <code>main</code> method) and then use its member scopes ({@link #rpcScope},
 * {@link #listenerCallScope}) in your Guice {@link Module}s and {@link #contextInterceptor} to
 * build your services.</p>
 */
public class GrpcModule implements Module {



    /**
     * Allows tracking of the {@link RpcContext context of a given RPC (<code>ServerCall</code>)}.
     */
    public final ContextTracker<RpcContext> rpcContextTracker = new ContextTracker<>();

    /**
     * Scopes objects to the {@link RpcContext context of a given RPC (<code>ServerCall</code>)}.
     */
    public final Scope rpcScope = new ContextScope<>("RPC_SCOPE", rpcContextTracker);



    /**
     * Allows tracking of the {@link ListenerCallContext context of a single
     * <code>ServerCall.Listener</code> call} and as a consequence also of a corresponding request
     * observer's call.
     */
    public final ContextTracker<ListenerCallContext> listenerCallContextTracker =
            new ContextTracker<>();

    /**
     * Scopes objects to the {@link ListenerCallContext context of a given <code>Listener</code>
     * call} and as a consequence also of a corresponding request observer's call.
     */
    public final Scope listenerCallScope =
            new ContextScope<>("LISTENER_CALL_SCOPE", listenerCallContextTracker);



    /**
     * <code>ServerInterceptor</code> that must be installed for all gRPC services that use
     * {@link #rpcScope} and {@link #listenerCallScope}.
     */
    public final ContextInterceptor contextInterceptor = new ContextInterceptor(this);



    /**
     * Binds {@link #rpcContextTracker} and {@link #listenerCallContextTracker} and corresponding
     * contexts for injection. Binds {@code ContextTracker<?>()} to instance containing all
     * trackers for use with {@link ContextTrackingExecutor#getActiveContexts(ContextTracker...)}.
     */
    @Override
    public void configure(Binder binder) {
        TypeLiteral<ContextTracker<RpcContext>> rpcContextTrackerType =
                new TypeLiteral<>() {};
        binder.bind(rpcContextTrackerType).toInstance(rpcContextTracker);
        binder.bind(RpcContext.class).toProvider(
                () -> rpcContextTracker.getCurrentContext());

        TypeLiteral<ContextTracker<ListenerCallContext>> messageContextTrackerType =
                new TypeLiteral<>() {};
        binder.bind(messageContextTrackerType).toInstance(listenerCallContextTracker);
        binder.bind(ListenerCallContext.class).toProvider(
                () -> listenerCallContextTracker.getCurrentContext());

        TypeLiteral<ContextTracker<?>()> trackerArrayType = new TypeLiteral<>() {};
        binder.bind(trackerArrayType).toInstance(trackers);
    }

    public final ContextTracker<?>() trackers = {listenerCallContextTracker, rpcContextTracker};



    /**
     * Convenience "constructor" for <code>ContextTrackingExecutor</code>. (I really miss method
     * extensions in Java)
     */
    public ContextTrackingExecutor newContextTrackingExecutor(String name, int poolSize) {
        return new ContextTrackingExecutor(
                name, poolSize, rpcContextTracker, listenerCallContextTracker);
    }



    /**
     * Convenience "constructor" for <code>ContextTrackingExecutor</code>.
     */
    public ContextTrackingExecutor newContextTrackingExecutor(
            String name,
            int poolSize,
            BlockingQueue<Runnable> workQueue) {
        return new ContextTrackingExecutor(
                name, poolSize, workQueue, rpcContextTracker, listenerCallContextTracker);
    }



    /**
     * Convenience "constructor" for <code>ContextTrackingExecutor</code>.
     */
    public ContextTrackingExecutor newContextTrackingExecutor(
            String name,
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler,
            ContextTracker<?>... trackers) {
        return new ContextTrackingExecutor(
                name,
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                handler,
                rpcContextTracker, listenerCallContextTracker);
    }



    RpcContext newRpcContext(ServerCall<?, ?> rpc, Metadata headers) {
        return new RpcContext(rpc, headers, rpcContextTracker);
    }

    ListenerCallContext newListenerCallContext() {
        return new ListenerCallContext(listenerCallContextTracker);
    }
}

That’s it. As mentioned before, I’d like to focus mainly on the API design: is it safe, enough convenient and intuitive etc, but other feedback is welcome also.

About my code-style:

  • I mostly use 3 blank lines around method, unless some methods are very tightly related in which case there will be just 1 blank line between them (for example a private helper method used only by 1 other method will be separated from it by just 1 blank line)
  • unless meaningful for a whole given class, instance variables are usually defined near the methods that use them, so that their meaning and scope is easier to see
  • normally I use 1 blank line around instance variables, unless there’s a group of tightly related ones that will not have blank lines between them.
  • boilerplate accessors are defined in 1 line right after the given instance variable
  • boilerplate code that is not supposed to be read (for example unused stub methods to satisfy interface requirements) is usually 1 line per whole method to save space.
  • methods are not grouped by type (for example all constructors together, all statics together etc), but rather ordered the way to clearly demonstrate the basic life-cycle of a given class. So for example: the most common constructor, other necessary initializers, mid-life-cycle methods with their helper methods, tear-down methods. After that additional constructors, uncommon advanced usage methods, generic helpers, finally at the very end there will be all type of leftover boilerplate.

Some additional info:

  • language level is set to 11
  • javadocs are currently intended to be read in java source files and may lack some sections when rendered to html.