/*
 * Decompiled with CFR 0.152.
 */
package net.engio.mbassy.bus;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.bus.AbstractSyncMessageBus;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.MessagePublication;

public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPostCommand>
extends AbstractSyncMessageBus<T, P>
implements IMessageBus<T, P> {
    private final ExecutorService executor;
    private final List<Thread> dispatchers;
    private final BlockingQueue<MessagePublication> pendingMessages;

    public AbstractSyncAsyncMessageBus(BusConfiguration configuration) {
        super(configuration);
        this.executor = configuration.getExecutor();
        this.pendingMessages = new LinkedBlockingQueue<MessagePublication>(configuration.getMaximumNumberOfPendingMessages());
        this.dispatchers = new ArrayList<Thread>(configuration.getNumberOfMessageDispatchers());
        this.initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
    }

    private void initDispatcherThreads(int numberOfThreads) {
        for (int i = 0; i < numberOfThreads; ++i) {
            Thread dispatcher = new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        while (true) {
                            ((MessagePublication)AbstractSyncAsyncMessageBus.this.pendingMessages.take()).execute();
                        }
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            });
            dispatcher.setDaemon(true);
            this.dispatchers.add(dispatcher);
            dispatcher.start();
        }
    }

    protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request) {
        try {
            this.pendingMessages.put(request);
            return request.markScheduled();
        }
        catch (InterruptedException e) {
            return request.setError();
        }
    }

    protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit) {
        try {
            return this.pendingMessages.offer(request, timeout, unit) ? request.markScheduled() : request.setError();
        }
        catch (InterruptedException e) {
            return request.setError();
        }
    }

    protected void finalize() throws Throwable {
        this.shutdown();
        super.finalize();
    }

    @Override
    public void shutdown() {
        for (Thread dispatcher : this.dispatchers) {
            dispatcher.interrupt();
        }
        this.executor.shutdown();
    }

    @Override
    public boolean hasPendingMessages() {
        return this.pendingMessages.size() > 0;
    }

    @Override
    public Executor getExecutor() {
        return this.executor;
    }
}

