/*
 * Decompiled with CFR 0.152.
 */
package net.engio.mbassy.bus;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.AbstractSyncMessageBus;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand;

public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublicationCommand>
extends AbstractSyncMessageBus<T, P>
implements IMessageBus<T, P> {
    private final ExecutorService executor;
    private final List<Thread> dispatchers;
    private final BlockingQueue<MessagePublication> pendingMessages;

    protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
        super(configuration);
        this.executor = configuration.getExecutorForAsynchronousHandlers();
        this.getRuntime().add("handler.async-service", this.executor);
        this.pendingMessages = configuration.getPendingMessagesQueue();
        this.dispatchers = new ArrayList<Thread>(configuration.getNumberOfMessageDispatchers());
        this.initDispatcherThreads(configuration);
    }

    private void initDispatcherThreads(IBusConfiguration configuration) {
        for (int i = 0; i < configuration.getNumberOfMessageDispatchers(); ++i) {
            Thread dispatcher = configuration.getThreadFactoryForAsynchronousMessageDispatch().newThread(new Runnable(){

                @Override
                public void run() {
                    while (true) {
                        try {
                            while (true) {
                                ((MessagePublication)AbstractSyncAsyncMessageBus.this.pendingMessages.take()).execute();
                            }
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                        catch (Throwable t) {
                            AbstractSyncAsyncMessageBus.this.handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", null, null, null));
                            continue;
                        }
                        break;
                    }
                }
            });
            this.dispatchers.add(dispatcher);
            dispatcher.start();
        }
    }

    protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request) {
        try {
            this.pendingMessages.put(request);
            return request.markScheduled();
        }
        catch (InterruptedException e) {
            return request;
        }
    }

    protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit) {
        try {
            return this.pendingMessages.offer(request, timeout, unit) ? request.markScheduled() : request;
        }
        catch (InterruptedException e) {
            return request;
        }
    }

    protected void finalize() throws Throwable {
        this.shutdown();
        super.finalize();
    }

    @Override
    public void shutdown() {
        for (Thread dispatcher : this.dispatchers) {
            dispatcher.interrupt();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    @Override
    public boolean hasPendingMessages() {
        return this.pendingMessages.size() > 0;
    }

    @Override
    public Executor getExecutor() {
        return this.executor;
    }
}

