Мониторинг зависших потоков для standalone-приложения

ThreadMonitor W   WSVR0605W: Thread "Default : 0" () has been active for xxx milliseconds and may be hung.  There is/are 1 thread(s) in total in the server that may be hung.
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:939)
	at com.ibm.j2ca.base.internal.PollEventManagerWorker.run(PollEventManagerWorker.java:96)
	at com.ibm.ejs.j2c.work.WorkProxy.run(WorkProxy.java:419)
	at com.ibm.ejs.j2c.work.AsyncWorkProxy.run(AsyncWorkProxy.java:136)
	at com.ibm.ws.asynchbeans.J2EEContext$RunProxy.run(J2EEContext.java:268)
	at java.security.AccessController.doPrivileged(AccessController.java:192)
	at com.ibm.ws.asynchbeans.J2EEContext.run(J2EEContext.java:768)
	at com.ibm.ws.asynchbeans.ExecutionContextImpl.go(ExecutionContextImpl.java:85)
	at com.ibm.ejs.j2c.work.AsyncWorkProxy.run(AsyncWorkProxy.java:90)
	at com.ibm.ws.util.ThreadPool$Worker.run(ThreadPool.java:1497)

Какое-то время в рабочие будни мне приходилось сталкиваться с Websphere Application Server. У него есть существенные недостатки, среди которых - неторопливость, закрытость и собственная JVM. Но, кроме недостатков, были и достоинства... например... много возможностей по настройке безопасности.
В сервер приложений был встроен мониторинг "зависающих" потоков, т.е. если длина транзакции переваливала допустимый интервал в лог вываливался стек вызовов "зависшего" потока с информацией о том, что поток, быть может, завис (пример приведен под заголовком) .

Собственно, эту функциональность мне и захотелось запихнуть в совсем другое, небольшое приложение. Скорее всего существует немало готовых решений данного вопроса, но, порой хочется написать свой велосипед.

Как можно реализовать

1-ый вариант использовать базирующуюся на очереди с приоритетами блокирующуюся очередь DelayQueue, куда каждый рабочий поток будет добавлять задачи с определенным интервалом. В задачу также будет включена ссылка на поток, который будет отслеживаться. Если рабочий поток успевает удалить задачу из очереди раньше, чем ее обнаружит "мониторинг" - то все хорошо, если же задачу достает сам мониторинг, это означает зависание и мы просто реализуем логику, сопоставленную зависанию. На данный момент времени мне неизвестна реализация неблокирующего алгоритма очереди с приоритетами, поэтому данный вариант подразумевает, что при добавлении-удалении-отслеживании задач мы будем использовать какую-то блокировку (как минимум, это лок внутри самой DelayQueue).

2-ой вариант, т.к. интервал выполнения задачи у меня всего один - можно немного упростить и не использовать очередь с приоритетами, а задействовать обычную ConcurrentLinkedQueue (в реализации задействована двусторонняя очередь ConcurrentLinkedDeque, т.к. в определенном состоянии необходимо добавить головной элемент, но сути идеи это не меняет).

Как должно работать

Я считаю, что перед написанием кода нового объекта/компонента (если ситуация позволяет), сначала следует представить как имеющиеся объекты будут с ним взаимодействовать, а еще лучше написать сразу тесты, которые покажут как его следует использовать.

В данном случае мне бы хотелось иметь:
  • Объект для запуска/остановки мониторинга. Стоит упомянуть, что реализация, базирующася на DelayQueue, позволяет отслеживать зависания без создания отдельного сервисного потока. Можно задействовать имеющийся в приложении планировщик, но в этом случае точность отслежвания будет регулироваться периодичностью опроса планировщиком очереди.
    public interface LifeCycle {
     
        public void start();
     
        public void stop();
     
    }
  • Объект, который позволит рабочему потоку оповещать мониторинг о том, что он начал/закончил выполнять определенную задачу
    public interface TrackingLifeCycle {
     
        public void startTracking();
     
        public void stopTracking();
    }
  • Объекты для выполнения каких-то действий над зависшим потоком в случае зависания/отвисания (в приведенном выше примере WAS, это сообщение в лог файле)
    public interface HangEventHandler<T> {
     
        /**
         * 
         * @param task
         */
        public void onEvent(T task);
     
    }

    public interface UnHangEventHandler<T> {
     
        /**
         * Handles event, when previously seemed hung task completed
         * 
         * @param task
         *            task that previously was seemed hung 
         * @param leftHangCount
         *            count of hung tasks (0 - no hung tasks)
         */
        public void onEvent(T task, int leftHangCount);
     
    }

Реализация

Полная реализация получилась несколько запутанной.

На диаграмме классов видно, что предусмотрено две базовых реализации мониторинга - базовый TaskTracker (который позволяет добавлять задачи с определенным идентификатором и интервалом ожидания завершения и SimpleTaskTracker, который использует в качестве идентификатора - идентификатор текущего потока и имеет фиксированное время ожидания выполнения. Возможно, стоит добавить builder для упрощения создания мониторинга.

Полные исходники доступны на github. Должен (постыдно) отметить, что они еще пока не до конца структурированы и, вероятно, будут модифицироваться.

Реализация "почти неблокирующегося слежения" (ConcurrentDequeBasedSimpleTracker):
Данная версия сделана для развлечения и может дорабатываться (хотя, в целом идея проста и понятна, но реализация может содержать ошибки и не предназначена для промышленной эксплуатации, да и смысла в подобной "оптимизации" для данного функционала нет. Для промышленной эксплутациия рекомендуется использовать версию, базирующуюся на DelayQueue).
/**
 * Реализация "почти неблокирующегося слежения" 
 * @author pereslegin pavel
 *
 */
public class ConcurrentDequeBasedSimpleTracker<E extends Delayed & Identified<Long>>  {
    // ссылка на поток, осуществляющий отслеживание зависаний
    private volatile Thread localThread;
 
    // собственно, очередь задач
    private final ConcurrentLinkedDeque<E> queue = new ConcurrentLinkedDeque<>();
 
    // обработчик события "задача зависла"
    private final HangEventHandler<E> hangHandler;
 
    // обработчик события "ранее зависшая задача - выполнилась"
    private final UnHangEventHandler<E> unhangHandler;
 
    // коллекция для сохранения списка зависших задач, с помощью нее мы будем 
    // отслеживать, что задача числилиась зависшей.
    // Note: ConcurrentHashMap  не является полностью неблокирующейся коллекцией
    private final ConcurrentMap<Long, E> hangMap = new ConcurrentHashMap<Long, E>();
 
    // для того, чтобы отслеживающий зависания поток мог после извлечения зависшей задачи - 
    // добавить ее снова в конец очереди (ну и, соответственно, поток запустивший задачу 
    // смог ее оттуда удалить, когда она завершится)
    private final PrototypeFactory<E> fact;
    // TODO: dirty fix
    private final long maxParkNanos;
 
    ConcurrentDequeBasedSimpleTracker(HangEventHandler<E> hangHandler,
            UnHangEventHandler<E> unhangHandler, PrototypeFactory<E> fact, long maxDelay) {
        this.hangHandler = hangHandler;
        this.unhangHandler = unhangHandler;
        this.fact = fact;
        this.maxParkNanos = TimeUnit.NANOSECONDS.convert(maxDelay, TimeUnit.MILLISECONDS);
    }
 
    public void submit(E t) {
        Thread th = localThread;
        if (th == null)
            throw new IllegalStateException("Task tracker MUST BE started before workers");
 
        queue.add(t);
 
        // TODO: если очередь пустая - то отслеживающий зависания поток может
        // спать вечным сном
        // if (th.getState() == State.WAITING) {
        //     // будим его
        //     LockSupport.unpark(th);
        // }
    }
 
    public void remove(E t) {
        // spin forever
        while (!queue.remove(t));
 
        // если ранее было зафиксировано зависание задачи - вызываем обработчик "отвисания"
        if ((t = hangMap.remove(t.getId())) != null)
            unhangHandler.onEvent(t, hangMap.size());
    }
 
    public void track() throws InterruptedException {
        // сохраняем ссылку на поток занимающийся отслеживанием
        localThread = Thread.currentThread();
        Thread local = Thread.currentThread();
        E t;
        boolean event = false;
 
        // крутимся, пока поток не будет явно прерван
        while (!local.isInterrupted()) {
            // спим пока очередь пустая
            while ((t = queue.peek()) == null) {
                // TODO:  
                LockSupport.parkNanos(maxParkNanos);
            }
            // извекли "головную" задачу - определяем сколько времени можно спать
            long timeout = t.getDelay(TimeUnit.NANOSECONDS);
            if (timeout > 0 && queue.peek() == t) {
                // если время есть - то снова спим
                LockSupport.parkNanos(timeout);
            }
 
            // проверяем что таймаут удаления задачи из очереди наступил
            if (t.getDelay(TimeUnit.NANOSECONDS) <= 0) { // spurious wake up test
                if (queue.peek() == t) {
                    // testing that element was not removed
                    if (queue.poll() != t) {
                        // if someone other was polled - reversing (this is
                        // possible because we have a single thread for tracking)
                        queue.addFirst(t);
                    }
                    else {
                        event = true;
                    }
                }
            }
            if (event) {
                event = false;
                // resubmit task
                submit(fact.newInstance(t));
 
                if (unhangHandler != null) {
                    hangMap.putIfAbsent(t.getId(), t);
                }
                hangHandler.onEvent(t);
 
            }
        }
        // если поток был прерван - сбрасываем флаг и выкидываем InterruptedException
        if (Thread.interrupted()) // handling thread interruption
            throw new InterruptedException("Thread was interrupted");
    }
}
p.s. LockSupport настоятельно не рекомендуется использовать для замены wait/notify (как минимум потому, что park/unpark, не связаны отношением happened-before, для HB park/unpark дополнительно сопровождаются volatile store/load).

Пример реализации обработчика зависания/отвисания потока:

public class HangEventsHandlerImpl implements HangEventHandler<TaskDelayed>,
        UnHangEventHandler<TaskDelayed> {
 
 
    @Override
    public void onEvent(TaskDelayed task) {
        Thread thread = task.getThread();
        if (isNotNull(thread)) {
            print("Thread "
                    + thread.getName()
                    + " has been active for "
                    + (System.currentTimeMillis() - task.getOriginalStartTime())
                    + " milliseconds and may be hung." + System.lineSeparator()
                    + getThreadInfo(thread.getId()) + System.lineSeparator()
                    + getStackTrace(thread));
        }
    }
 
    @Override
    public void onEvent(TaskDelayed task, int leftHangCount) {
        Thread thread = task.getThread();
        if (isNotNull(thread)) {
            print("Thread "
                    + thread.getName()
                    + " (" + thread.getId() + "),"+
                    " previously reported hung, has completed in about "
                    + (System.currentTimeMillis() - task.getOriginalStartTime())
                    + " ms. There still " + leftHangCount
                    + " threads may be hung.");
        }
    }
 
    private static boolean isNotNull(Thread thread) {
        if (thread == null) {
            // side effect
            print("Unable to get working thread reference");
            return false;
        }
        return true;
    }
 
    /**
     * 
     * @param id
     * @return {@link ThreadInfo} by thread id
     */
    private static ThreadInfo getThreadInfo(long id) {
        return ManagementFactory.getThreadMXBean().getThreadInfo(id);
    }
 
    private static String getStackTrace(Thread t) {
        if (t != null) {
            StringBuilder buffer = new StringBuilder();
            StackTraceElement[] stackElements = t.getStackTrace();
            for (int i = 0; i < stackElements.length; i++) {
                if (buffer.length() > 0) {
                    buffer.append(System.lineSeparator());
                } else {
                    buffer.append("Thread (").append(t.getState()).append(':')
                            .append(t.getId()).append(':').append(t.getName())
                            .append(')').append(System.lineSeparator());
                }
                buffer.append("\tat " + stackElements[i]);
            }
            return buffer.toString();
        }
        return null;
    }
 
    // stdout
    private static void print(String logMessage) {
        System.out.println(logMessage);
    }
 
}

Subscribe to xtern.info RSS