/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.event;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class TestAsyncDispatcher {
    @Test(timeout=10000L)
    public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
        BlockingQueue eventQueue = (BlockingQueue)Mockito.spy(new LinkedBlockingQueue());
        Event event = (Event)Mockito.mock(Event.class);
        ((BlockingQueue)Mockito.doThrow((Throwable)new InterruptedException()).when((Object)eventQueue)).put(event);
        DrainDispatcher disp = new DrainDispatcher(eventQueue);
        disp.init(new Configuration());
        disp.setDrainEventsOnStop();
        disp.start();
        disp.waitForEventThreadToWait();
        try {
            disp.getEventHandler().handle(event);
            Assert.fail((String)"Expected YarnRuntimeException");
        }
        catch (YarnRuntimeException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof InterruptedException));
        }
        Assert.assertTrue((String)"Event Queue should have been empty", (boolean)eventQueue.isEmpty());
        disp.close();
    }

    @Test(timeout=10000L)
    public void testDispatchStopOnTimeout() throws Exception {
        BlockingQueue<Event> eventQueue = new LinkedBlockingQueue();
        eventQueue = (BlockingQueue)Mockito.spy(eventQueue);
        Mockito.when((Object)eventQueue.isEmpty()).thenReturn((Object)false);
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.dispatcher.drain-events.timeout", 2000);
        DrainDispatcher disp = new DrainDispatcher(eventQueue);
        disp.init((Configuration)conf);
        disp.setDrainEventsOnStop();
        disp.start();
        disp.waitForEventThreadToWait();
        disp.close();
    }

    private void dispatchDummyEvents(Dispatcher disp, int count) {
        for (int i = 0; i < count; ++i) {
            Event event = (Event)Mockito.mock(Event.class);
            Mockito.when((Object)event.getType()).thenReturn((Object)DummyType.DUMMY);
            disp.getEventHandler().handle(event);
        }
    }

    @Test(timeout=10000L)
    public void testDrainDispatcherDrainEventsOnStop() throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.dispatcher.drain-events.timeout", 2000);
        LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<Event>();
        DrainDispatcher disp = new DrainDispatcher(queue);
        disp.init((Configuration)conf);
        disp.register(DummyType.class, new DummyHandler());
        disp.setDrainEventsOnStop();
        disp.start();
        disp.waitForEventThreadToWait();
        this.dispatchDummyEvents((Dispatcher)disp, 2);
        disp.close();
        Assert.assertEquals((long)0L, (long)queue.size());
    }

    private static enum DummyType {
        DUMMY;

    }

    private static class DummyHandler
    implements EventHandler<Event> {
        private DummyHandler() {
        }

        public void handle(Event event) {
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

