/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.server.cluster.impl;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;

public class Redistributor
implements Consumer {
    private boolean active;
    private final StorageManager storageManager;
    private final PostOffice postOffice;
    private final Executor executor;
    private final int batchSize;
    private final Queue queue;
    private int count;
    private final long sequentialID;
    private ReusableLatch pendingRuns = new ReusableLatch();

    public Redistributor(Queue queue, StorageManager storageManager, PostOffice postOffice, Executor executor, int batchSize) {
        this.queue = queue;
        this.sequentialID = storageManager.generateID();
        this.storageManager = storageManager;
        this.postOffice = postOffice;
        this.executor = executor;
        this.batchSize = batchSize;
    }

    @Override
    public long sequentialID() {
        return this.sequentialID;
    }

    @Override
    public Filter getFilter() {
        return null;
    }

    @Override
    public String debug() {
        return this.toString();
    }

    @Override
    public String toManagementString() {
        return "Redistributor[" + this.queue.getName() + "/" + this.queue.getID() + "]";
    }

    @Override
    public void disconnect() {
    }

    public synchronized void start() {
        this.active = true;
    }

    public synchronized void stop() throws Exception {
        this.active = false;
        boolean ok = this.flushExecutor();
        if (!ok) {
            ActiveMQServerLogger.LOGGER.errorStoppingRedistributor();
        }
    }

    public synchronized void close() {
        boolean ok = this.flushExecutor();
        if (!ok) {
            throw new IllegalStateException("Timed out waiting for executor to complete");
        }
        this.active = false;
    }

    private boolean flushExecutor() {
        try {
            boolean ok = this.pendingRuns.await(10000L);
            return ok;
        }
        catch (InterruptedException e) {
            ActiveMQServerLogger.LOGGER.failedToFlushExecutor(e);
            return false;
        }
    }

    @Override
    public synchronized HandleStatus handle(final MessageReference reference) throws Exception {
        if (!this.active) {
            return HandleStatus.BUSY;
        }
        if (reference.getMessage().getGroupID() != null) {
            return HandleStatus.NO_MATCH;
        }
        final TransactionImpl tx = new TransactionImpl(this.storageManager);
        final Pair<RoutingContext, Message> routingInfo = this.postOffice.redistribute(reference.getMessage(), this.queue, tx);
        if (routingInfo == null) {
            return HandleStatus.BUSY;
        }
        if (!reference.getMessage().isLargeMessage()) {
            this.postOffice.processRoute((Message)routingInfo.getB(), (RoutingContext)routingInfo.getA(), false);
            this.ackRedistribution(reference, tx);
        } else {
            this.active = false;
            this.executor.execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        Redistributor.this.postOffice.processRoute((Message)routingInfo.getB(), (RoutingContext)routingInfo.getA(), false);
                        Redistributor.this.ackRedistribution(reference, tx);
                        Redistributor redistributor = Redistributor.this;
                        synchronized (redistributor) {
                            Redistributor.this.active = true;
                            Redistributor.this.count++;
                            Redistributor.this.queue.deliverAsync();
                        }
                    }
                    catch (Exception e) {
                        try {
                            tx.rollback();
                        }
                        catch (Exception e2) {
                            ActiveMQServerLogger.LOGGER.failedToRollback(e2);
                        }
                    }
                }
            });
        }
        return HandleStatus.HANDLED;
    }

    @Override
    public void proceedDeliver(MessageReference ref) {
    }

    private void internalExecute(final Runnable runnable) {
        this.pendingRuns.countUp();
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    runnable.run();
                }
                finally {
                    Redistributor.this.pendingRuns.countDown();
                }
            }
        });
    }

    private void ackRedistribution(MessageReference reference, Transaction tx) throws Exception {
        reference.handled();
        this.queue.acknowledge(tx, reference);
        tx.commit();
        this.storageManager.afterCompleteOperations(new IOCallback(){

            public void onError(int errorCode, String errorMessage) {
                ActiveMQServerLogger.LOGGER.ioErrorRedistributing(errorCode, errorMessage);
            }

            public void done() {
                Redistributor.this.execPrompter();
            }
        });
    }

    private void execPrompter() {
        ++this.count;
        if (this.count >= this.batchSize) {
            this.active = false;
            this.executor.execute(new Prompter());
            this.count = 0;
        }
    }

    @Override
    public List<MessageReference> getDeliveringMessages() {
        return Collections.emptyList();
    }

    private class Prompter
    implements Runnable {
        private Prompter() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Redistributor redistributor = Redistributor.this;
            synchronized (redistributor) {
                Redistributor.this.active = true;
                Redistributor.this.queue.deliverAsync();
            }
        }
    }
}

