/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue.dispatching;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.joda.time.DateTime;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.queue.api.QueueEvent;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dispatching.BlockingRejectionExecutionHandler;
import org.killbill.queue.dispatching.CallableCallback;
import org.killbill.queue.dispatching.Dispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestDispatcher {
    private final int QUEUE_SIZE = 5;
    private Dispatcher<BusEventModelDao> dispatcher;
    private TestCallableCallback callback;

    @BeforeClass(groups={"fast"})
    public void beforeClass() throws Exception {
        ThreadFactory testThreadFactory = new ThreadFactory(){

            public Thread newThread(Runnable r) {
                return new Thread(new ThreadGroup("TestGrp"), r, "test-grp--th");
            }
        };
        this.callback = new TestCallableCallback();
        this.dispatcher = new Dispatcher(1, 1, 5L, TimeUnit.MINUTES, new LinkedBlockingQueue(5), testThreadFactory, (RejectedExecutionHandler)((Object)new TestBlockingRejectionExecutionHandler(this.callback)));
        this.dispatcher.start();
    }

    @Test(groups={"fast"})
    public void testBlockingRejectionHandler() throws Exception {
        this.callback.block();
        for (int i = 0; i < 7; ++i) {
            this.dispatch(i, this.callback);
        }
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until((Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return TestDispatcher.this.callback.getProcessed().size() == 7;
            }
        });
    }

    private void dispatch(int i, TestCallableCallback callback) {
        BusEventModelDao e1 = new BusEventModelDao("owner", new DateTime(), String.class.getName(), "e-" + i, UUID.randomUUID(), Long.valueOf(1L), Long.valueOf(1L));
        this.dispatcher.dispatch((EventEntryModelDao)e1, (CallableCallback)callback);
    }

    public static class TestEvent
    implements BusEvent {
        private final String json;
        private final Long searchKey1;
        private final Long searchKey2;
        private final UUID userToken;

        @JsonCreator
        public TestEvent(@JsonProperty(value="json") String json, @JsonProperty(value="searchKey1") Long searchKey1, @JsonProperty(value="searchKey2") Long searchKey2, @JsonProperty(value="userToken") UUID userToken) {
            this.json = json;
            this.searchKey2 = searchKey2;
            this.searchKey1 = searchKey1;
            this.userToken = userToken;
        }

        public String getJson() {
            return this.json;
        }

        public Long getSearchKey1() {
            return this.searchKey1;
        }

        public Long getSearchKey2() {
            return this.searchKey2;
        }

        public UUID getUserToken() {
            return this.userToken;
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class TestCallableCallback
    implements CallableCallback<QueueEvent, BusEventModelDao> {
        private final Logger logger = LoggerFactory.getLogger(TestCallableCallback.class);
        private volatile boolean isBlocked = false;
        private final List<QueueEvent> processed = new ArrayList<QueueEvent>();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void block() {
            TestCallableCallback testCallableCallback = this;
            synchronized (testCallableCallback) {
                this.isBlocked = true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void unblock() {
            TestCallableCallback testCallableCallback = this;
            synchronized (testCallableCallback) {
                this.isBlocked = false;
                this.notifyAll();
            }
        }

        public List<QueueEvent> getProcessed() {
            return this.processed;
        }

        public QueueEvent deserialize(BusEventModelDao modelDao) {
            return new TestEvent(modelDao.getEventJson(), modelDao.getSearchKey1(), modelDao.getSearchKey2(), modelDao.getUserToken());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void dispatch(QueueEvent event, BusEventModelDao modelDao) throws Exception {
            TestCallableCallback testCallableCallback = this;
            synchronized (testCallableCallback) {
                while (this.isBlocked) {
                    this.logger.info("Thread " + Thread.currentThread().getId() + " blocking...");
                    this.wait();
                    this.logger.info("Thread " + Thread.currentThread().getId() + " unblocking...");
                }
            }
            this.logger.info("Got entry " + modelDao.getEventJson());
            this.processed.add(event);
        }

        public void updateErrorCountOrMoveToHistory(QueueEvent event, BusEventModelDao modelDao, long errorCount, Throwable lastException) {
        }
    }

    private class TestBlockingRejectionExecutionHandler
    extends BlockingRejectionExecutionHandler {
        private final TestCallableCallback callback;

        public TestBlockingRejectionExecutionHandler(TestCallableCallback callback) {
            this.callback = callback;
        }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            this.callback.unblock();
            super.rejectedExecution(r, executor);
        }
    }
}

