/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing.requestreply;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.buffer.BoundedFifoBuffer;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.api.notification.RoutingNotification;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.api.store.ObjectStoreManager;
import org.mule.runtime.api.store.ObjectStoreSettings;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.ObjectUtils;
import org.mule.runtime.core.internal.context.DefaultMuleContext;
import org.mule.runtime.core.internal.context.MuleContextWithRegistries;
import org.mule.runtime.core.internal.message.InternalMessage;
import org.mule.runtime.core.internal.registry.MuleRegistry;
import org.mule.runtime.core.internal.routing.requestreply.MultipleRequestReplierEvent;
import org.mule.runtime.core.internal.routing.requestreply.RequestReplyRequesterMessageProcessor;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.mule.runtime.core.privileged.processor.AbstractInterceptingMessageProcessorBase;
import org.mule.runtime.core.privileged.registry.RegistrationException;
import org.mule.runtime.core.privileged.routing.ResponseTimeoutException;
import org.mule.runtime.core.privileged.store.DeserializationPostInitialisable;

public abstract class AbstractAsyncRequestReplyRequester
extends AbstractInterceptingMessageProcessorBase
implements RequestReplyRequesterMessageProcessor,
Initialisable,
Startable,
Stoppable,
Disposable {
    private static final int MAX_PROCESSED_GROUPS = 50000;
    private static final long UNCLAIMED_TIME_TO_LIVE = 60000L;
    private static final long UNCLAIMED_INTERVAL = 60000L;
    private static final String NAME_TEMPLATE = "%s.%s.%s.asyncReplies";
    protected String name;
    protected volatile long timeout = -1L;
    protected volatile boolean failOnTimeout = true;
    protected MessageSource replyMessageSource;
    private final Processor internalAsyncReplyMessageProcessor = new InternalAsyncReplyMessageProcessor();
    private Scheduler scheduler;
    private NotificationDispatcher notificationFirer;
    private AsyncReplyMonitoringRunnable replyRunnable;
    protected final Map<String, RequestReplyLatch> locks = new ConcurrentHashMap<String, RequestReplyLatch>();
    private String storePrefix = "";
    protected final ConcurrentMap<String, PrivilegedEvent> responseEvents = new ConcurrentHashMap<String, PrivilegedEvent>();
    private final Object processedLock = new Object();
    private final BoundedFifoBuffer processed = new BoundedFifoBuffer(50000);
    protected ObjectStore store;

    @Override
    public CoreEvent process(CoreEvent event) throws MuleException {
        if (this.replyMessageSource == null) {
            return this.processNext(event);
        }
        this.addLock(event);
        this.sendAsyncRequest(event);
        PrivilegedEvent resultEvent = this.receiveAsyncReply(event);
        if (resultEvent != null) {
            if (((InternalMessage)resultEvent.getMessage()).getInboundProperty("MULE_SESSION") != null) {
                ((PrivilegedEvent)event).getSession().merge(resultEvent.getSession());
            }
            resultEvent = PrivilegedEvent.builder(event).message(resultEvent.getMessage()).build();
            PrivilegedEvent.setCurrentEvent(resultEvent);
        }
        return resultEvent;
    }

    private void addLock(CoreEvent event) {
        String correlationId = this.getAsyncReplyCorrelationId(event);
        this.locks.put(correlationId, new RequestReplyLatch(event.getGroupCorrelation().map(gc -> gc.getGroupSize().orElse(-1)).orElse(-1), event.getGroupCorrelation().map(gc -> gc.getSequence()).orElse(-1)));
    }

    private Latch getLatch(String correlationId) {
        RequestReplyLatch requestReplyLatch = this.locks.get(correlationId);
        return requestReplyLatch.latch;
    }

    protected Latch createEventLock() {
        return new Latch();
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public void setFailOnTimeout(boolean failOnTimeout) {
        this.failOnTimeout = failOnTimeout;
    }

    @Override
    public void setReplySource(MessageSource messageSource) {
        this.verifyReplyMessageSource(messageSource);
        this.replyMessageSource = messageSource;
        messageSource.setListener(this.internalAsyncReplyMessageProcessor);
    }

    @Override
    public void initialise() throws InitialisationException {
        this.name = String.format(NAME_TEMPLATE, this.storePrefix, this.muleContext.getConfiguration().getId(), this.getLocation().getRootContainerName());
        MuleRegistry registry = ((MuleContextWithRegistries)this.muleContext).getRegistry();
        this.store = ((ObjectStoreManager)registry.get("_muleObjectStoreManager")).createObjectStore(this.name, ObjectStoreSettings.builder().persistent(false).maxEntries(50000).entryTtl(60000L).expirationInterval(60000L).build());
        try {
            this.notificationFirer = registry.lookupObject(NotificationDispatcher.class);
        }
        catch (RegistrationException e) {
            throw new InitialisationException((Throwable)e, (Initialisable)this);
        }
    }

    @Override
    public void start() throws MuleException {
        this.scheduler = this.muleContext.getSchedulerService().customScheduler(this.muleContext.getSchedulerBaseConfig().withName(this.name).withMaxConcurrentTasks(1).withShutdownTimeout(0L, TimeUnit.MILLISECONDS));
        this.replyRunnable = new AsyncReplyMonitoringRunnable();
        this.scheduler.scheduleWithFixedDelay(this.replyRunnable, 0L, 100L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void stop() throws MuleException {
        if (this.scheduler != null) {
            this.scheduler.stop();
        }
    }

    @Override
    public void dispose() {
        if (this.store != null) {
            try {
                ((ObjectStoreManager)((MuleContextWithRegistries)this.muleContext).getRegistry().get("_muleObjectStoreManager")).disposeStore(this.name);
            }
            catch (ObjectStoreException e) {
                this.logger.debug("Exception disposing of store", (Throwable)e);
            }
        }
    }

    public void setStorePrefix(String storePrefix) {
        this.storePrefix = storePrefix;
    }

    protected void verifyReplyMessageSource(MessageSource messageSource) {
    }

    private String getAsyncReplyCorrelationId(CoreEvent event) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(event.getContext().getCorrelationId());
        return stringBuilder.toString();
    }

    protected void sendAsyncRequest(CoreEvent event) throws MuleException {
        this.processNext(event);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PrivilegedEvent receiveAsyncReply(CoreEvent event) throws MuleException {
        PrivilegedEvent result;
        boolean resultAvailable;
        String asyncReplyCorrelationId;
        block12: {
            asyncReplyCorrelationId = this.getAsyncReplyCorrelationId(event);
            System.out.println("receiveAsyncReply: " + asyncReplyCorrelationId);
            Latch asyncReplyLatch = this.getLatch(asyncReplyCorrelationId);
            boolean interruptedWhileWaiting = false;
            resultAvailable = false;
            try {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Waiting for async reply message with id: " + asyncReplyCorrelationId);
                }
                if (this.timeout <= 0L) {
                    asyncReplyLatch.await();
                    resultAvailable = true;
                } else {
                    resultAvailable = asyncReplyLatch.await(this.timeout, TimeUnit.MILLISECONDS);
                }
                if (!resultAvailable) {
                    asyncReplyLatch.await(1000L, TimeUnit.MILLISECONDS);
                    resultAvailable = asyncReplyLatch.getCount() == 0L;
                }
            }
            catch (InterruptedException e) {
                interruptedWhileWaiting = true;
                return interruptedWhileWaiting;
            }
            finally {
                this.locks.remove(asyncReplyCorrelationId);
                result = (PrivilegedEvent)this.responseEvents.remove(asyncReplyCorrelationId);
                if (!interruptedWhileWaiting) break block12;
                Thread.currentThread().interrupt();
                return null;
            }
        }
        if (resultAvailable) {
            if (result == null) {
                throw new IllegalStateException("Response MuleEvent is null");
            }
            PrivilegedEvent.setCurrentEvent(result);
            return result;
        }
        this.addProcessed(new ProcessedEvents(asyncReplyCorrelationId, EndReason.FINISHED_BY_TIMEOUT));
        if (this.failOnTimeout) {
            this.notificationFirer.dispatch(new RoutingNotification(event.getMessage(), null, 1302));
            throw new ResponseTimeoutException(CoreMessages.responseTimedOutWaitingForId((int)this.timeout, asyncReplyCorrelationId), null);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addProcessed(Object id) {
        Object object = this.processedLock;
        synchronized (object) {
            if (this.processed.isFull()) {
                this.processed.remove();
            }
            this.processed.add(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isAlreadyProcessed(Object id) {
        Object object = this.processedLock;
        synchronized (object) {
            return this.processed.contains(id);
        }
    }

    @Override
    public String toString() {
        return ObjectUtils.toString(this);
    }

    private PrivilegedEvent retrieveEvent(String correlationId) throws MuleException {
        MultipleRequestReplierEvent multipleEvent = (MultipleRequestReplierEvent)this.store.retrieve(correlationId);
        PrivilegedEvent event = multipleEvent.getEvent();
        if (DefaultMuleContext.currentMuleContext.get() == null) {
            try {
                DeserializationPostInitialisable.Implementation.init(event, this.muleContext);
            }
            catch (Exception e) {
                throw new DefaultMuleException(e);
            }
        }
        return event;
    }

    @Override
    public ReactiveProcessor.ProcessingType getProcessingType() {
        return ReactiveProcessor.ProcessingType.BLOCKING;
    }

    private static enum EndReason {
        PROCESSED,
        FINISHED_BY_TIMEOUT;

    }

    private class ProcessedEvents {
        private String id;
        private EndReason endReason;

        private ProcessedEvents(String id, EndReason endReason) {
            this.id = id;
            this.endReason = endReason;
        }

        private ProcessedEvents(String id) {
            this.id = id;
            this.endReason = EndReason.PROCESSED;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ProcessedEvents that = (ProcessedEvents)o;
            if (!this.id.equals(that.id)) {
                return false;
            }
            return this.endReason == that.endReason;
        }

        public int hashCode() {
            int result = this.id.hashCode();
            result = 31 * result + this.endReason.hashCode();
            return result;
        }
    }

    private class RequestReplyLatch {
        private final int groupSize;
        private final int correlationSequence;
        private final Latch latch;

        RequestReplyLatch(int groupSize, int correlationSequence) {
            this.latch = AbstractAsyncRequestReplyRequester.this.createEventLock();
            this.groupSize = groupSize;
            this.correlationSequence = correlationSequence;
        }

        private boolean isSequenceEvent() {
            return this.groupSize != -1;
        }

        private void countDown() {
            this.latch.countDown();
        }

        private boolean isLastEvent() {
            return this.groupSize == this.correlationSequence;
        }
    }

    private class AsyncReplyMonitoringRunnable
    implements Runnable {
        private AsyncReplyMonitoringRunnable() {
        }

        @Override
        public void run() {
            try {
                List<String> ids = AbstractAsyncRequestReplyRequester.this.store.allKeys();
                AbstractAsyncRequestReplyRequester.this.logger.debug("Found " + ids.size() + " objects in store");
                for (Serializable serializable : ids) {
                    try {
                        boolean deleteEvent = false;
                        String correlationId = (String)((Object)serializable);
                        MultipleRequestReplierEvent multipleEvent = (MultipleRequestReplierEvent)AbstractAsyncRequestReplyRequester.this.store.retrieve(correlationId);
                        if (AbstractAsyncRequestReplyRequester.this.isAlreadyProcessed(new ProcessedEvents(correlationId, EndReason.FINISHED_BY_TIMEOUT))) {
                            deleteEvent = true;
                            PrivilegedEvent event = multipleEvent.getEvent();
                            if (AbstractAsyncRequestReplyRequester.this.logger.isDebugEnabled()) {
                                AbstractAsyncRequestReplyRequester.this.logger.debug("An event was received for an event group that has already been processed, this is because the async-reply timed out. GroupCorrelation Id is: " + correlationId + ". Dropping event");
                            }
                            AbstractAsyncRequestReplyRequester.this.notificationFirer.dispatch(new RoutingNotification(event.getMessage(), event.getContext().getOriginatingLocation().getComponentIdentifier().getIdentifier().getNamespace(), 1301));
                        } else {
                            RequestReplyLatch requestReplyLatch = AbstractAsyncRequestReplyRequester.this.locks.get(correlationId);
                            if (requestReplyLatch != null) {
                                PrivilegedEvent event = AbstractAsyncRequestReplyRequester.this.retrieveEvent(correlationId);
                                CoreEvent previousResult = AbstractAsyncRequestReplyRequester.this.responseEvents.putIfAbsent(correlationId, event);
                                if (previousResult != null) {
                                    throw new IllegalStateException("Detected duplicate result message with id: " + correlationId);
                                }
                                if (requestReplyLatch.isSequenceEvent()) {
                                    if (requestReplyLatch.isLastEvent()) {
                                        AbstractAsyncRequestReplyRequester.this.addProcessed(new ProcessedEvents(correlationId));
                                        deleteEvent = true;
                                    }
                                } else {
                                    AbstractAsyncRequestReplyRequester.this.addProcessed(new ProcessedEvents(correlationId));
                                    deleteEvent = true;
                                }
                                requestReplyLatch.countDown();
                                multipleEvent.removeEvent();
                            }
                        }
                        if (!deleteEvent) continue;
                        AbstractAsyncRequestReplyRequester.this.store.remove(correlationId);
                    }
                    catch (Exception ex) {
                        AbstractAsyncRequestReplyRequester.this.logger.debug("Error processing async replies", (Throwable)ex);
                    }
                }
            }
            catch (Exception ex) {
                AbstractAsyncRequestReplyRequester.this.logger.debug("Error processing async replies", (Throwable)ex);
            }
        }
    }

    class InternalAsyncReplyMessageProcessor
    extends AbstractComponent
    implements Processor {
        InternalAsyncReplyMessageProcessor() {
        }

        @Override
        public CoreEvent process(CoreEvent event) throws MuleException {
            String messageId = AbstractAsyncRequestReplyRequester.this.getAsyncReplyCorrelationId(event);
            RequestReplyLatch requestReplyLatch = AbstractAsyncRequestReplyRequester.this.locks.get(messageId);
            if (requestReplyLatch != null && requestReplyLatch.isSequenceEvent() && AbstractAsyncRequestReplyRequester.this.store.contains(messageId)) {
                MultipleRequestReplierEvent multipleEvent = (MultipleRequestReplierEvent)AbstractAsyncRequestReplyRequester.this.store.retrieve(messageId);
                multipleEvent.addEvent((PrivilegedEvent)event);
            } else {
                MultipleRequestReplierEvent multipleEvent = new MultipleRequestReplierEvent();
                multipleEvent.addEvent((PrivilegedEvent)event);
                AbstractAsyncRequestReplyRequester.this.store.store(messageId, multipleEvent);
            }
            AbstractAsyncRequestReplyRequester.this.replyRunnable.run();
            return null;
        }
    }
}

