/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.rosettanet.extension.internal.delivery;

import com.mulesoft.b2b.sync.provider.ObjectStoreWithLockManager;
import com.mulesoft.connectors.rosettanet.extension.api.MessageAttributes;
import com.mulesoft.connectors.rosettanet.extension.api.data.Completion;
import com.mulesoft.connectors.rosettanet.extension.api.data.CompletionCode;
import com.mulesoft.connectors.rosettanet.extension.internal.config.RosettaNetConfig;
import com.mulesoft.connectors.rosettanet.extension.internal.delivery.data.MessageDetail;
import com.mulesoft.connectors.rosettanet.extension.internal.delivery.data.MessageState;
import com.mulesoft.connectors.rosettanet.extension.internal.send.SendAction;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.ListIterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StoreHandler {
    private static final Logger logger = LoggerFactory.getLogger(StoreHandler.class);
    private static final String DELETE_STORE_PROPERTY = "com.mulesoft.connectors.rosettanet.extension.internal.delivery.DeliveryManager.deleteStore";
    private static final String MULE_LOCK_WAIT_SECONDS = "com.mulesoft.connectors.rosettanet.extension.internal.delivery.DeliveryManager.muleLockSeconds";
    private static final String CLOUDHUB_LOCK_STALE_SECONDS = "com.mulesoft.connectors.rosettanet.extension.internal.delivery.DeliveryManager.chLockStaleSeconds";
    private static final String MESSAGE_LOCK_NAME = "com.mulesoft.connectors.rosettanet.extension.internal.delivery.DeliveryManager.messageLock";
    private static final String MESSAGE_STORE_NAME = "com.mulesoft.connectors.rosettanet.extension.internal.delivery.DeliveryManager.messageStore";
    private static final String MESSAGE_DETAIL_PREFIX = "com.mulesoft.connectors.rosettanet.extension.internal.delivery.DeliveryManager.messageDetail#";
    private static final String RETRY_TIMESTAMP_NAME = "com.mulesoft.connectors.rosettanet.extension.internal.delivery.DeliveryManager.retryTimestamp";
    private static final String RELEASE_RETRY_COUNT_KEY = "com.mulesoft.connectors.rosettanet.extension.internal.delivery.DeliveryManager.releaseRetryCount";
    private static final String LOCK_RETRY_COUNT_KEY = "com.mulesoft.connectors.rosettanet.extension.internal.delivery.DeliveryManager.lockRetryCount";
    private static final boolean onCloudHub = System.getProperty("anypoint.url") != null;
    private static final long INITIAL_RETRY_WAIT = 30000L;
    private static final long RETRY_STORE_MILLIS = 10000L;
    private static final int LOCK_RETRY_COUNT = 10;
    private static final int RELEASE_RETRY_COUNT = 12;
    private static final long BASE_RETRY_DELAY_MS = 500L;
    private static final long MAX_RETRY_DELAY_MS = 30000L;
    private static final long MULE_LOCK_BASE_SECONDS = 15L;
    private static final long CLOUDHUB_LOCK_STALE_DEFAULT_SECONDS = 180L;
    private static final String FAILED_TRYING_TO = "Failed trying to ";
    private final Executor sendExecutor;
    private final RetryHandler retryHandler;
    private final ConcurrentHashMap<String, RosettaNetConfig> configurations;
    private final ConcurrentHashMap<String, SourceCallback<Completion, MessageAttributes>> callbacks;
    private final long muleLockWait;
    private final long staleLockAge;
    private final ObjectStoreWithLockManager osManager;

    protected StoreHandler(ObjectStoreWithLockManager osManager, Executor executor) {
        this.osManager = osManager;
        this.sendExecutor = executor;
        this.retryHandler = new RetryHandler();
        this.configurations = new ConcurrentHashMap();
        this.callbacks = new ConcurrentHashMap();
        logger.info("On cloud configuration ::" + System.getProperty("anypoint.url"));
        this.muleLockWait = StoreHandler.timeFromProperty(MULE_LOCK_WAIT_SECONDS, onCloudHub ? 60L : 15L) * 1000L;
        this.staleLockAge = StoreHandler.timeFromProperty(CLOUDHUB_LOCK_STALE_SECONDS, 180L) * 1000L;
        this.osManager.registerForSync(MESSAGE_STORE_NAME);
        this.osManager.executeWithLock(MESSAGE_STORE_NAME, deliveryObjectStore -> {
            if ("true".equalsIgnoreCase(System.getProperty(DELETE_STORE_PROPERTY))) {
                try {
                    if (deliveryObjectStore.contains(MESSAGE_STORE_NAME)) {
                        deliveryObjectStore.remove(MESSAGE_STORE_NAME);
                    }
                }
                catch (ObjectStoreException e) {
                    logger.error("Unable to delete old send message store", (Throwable)e);
                    throw new RuntimeException("Could not delete old send message store", e);
                }
                logger.debug("Deleted messages in progress from store");
            } else {
                ArrayList<MessageState> states = this.loadState(deliveryObjectStore);
                logger.debug("Found " + states.size() + " existing messages in store");
            }
            return true;
        });
    }

    private static long timeFromProperty(String pname, long dflt) {
        long time = dflt;
        String override = System.getProperty(pname);
        if (override != null) {
            logger.info(" Custom property - key :" + pname + "  value :" + override);
            time = Integer.valueOf(override).intValue();
        }
        return time;
    }

    public int registerConfiguration(RosettaNetConfig config) {
        this.configurations.put(config.getConfigName(), config);
        return this.configurations.size();
    }

    public int unregisterConfiguration(RosettaNetConfig config) {
        this.configurations.remove(config.getConfigName());
        return this.configurations.size();
    }

    protected void start() {
        Thread thread = new Thread(this.retryHandler);
        thread.start();
    }

    protected void stop() {
        this.retryHandler.stop();
    }

    public void registerCompletion(RosettaNetConfig config, SourceCallback<Completion, MessageAttributes> callback) {
        String name = config.getConfigName();
        if (!this.configurations.containsKey(name)) {
            String text = "Configuration [" + name + "] has completion but is not registered";
            logger.error(text);
            throw new RuntimeException(text);
        }
        this.callbacks.put(name, callback);
    }

    public void unregisterCompletion(RosettaNetConfig config) {
        this.callbacks.remove(config.getConfigName());
    }

    private ArrayList<MessageState> loadState(ObjectStore deliveryObjectStore) {
        Instant startExecute = Instant.now();
        try {
            ArrayList states = new ArrayList();
            if (deliveryObjectStore.contains(MESSAGE_STORE_NAME)) {
                Serializable value = deliveryObjectStore.retrieve(MESSAGE_STORE_NAME);
                if (value instanceof ArrayList) {
                    for (Object o : (ArrayList)value) {
                        if (o instanceof MessageState) continue;
                        String text = "Expected ArrayList<MessageState> as object store value, got ArrayList<" + value.getClass().getName() + ">";
                        logger.error(text);
                        throw new RuntimeException(text);
                    }
                    states = (ArrayList)value;
                } else if (value != null) {
                    String text = "Expected ArrayList<MessageState> as object store value, got [" + value.getClass().getName() + "]";
                    logger.error(text);
                    throw new RuntimeException(text);
                }
            }
            ArrayList arrayList = states;
            return arrayList;
        }
        catch (ObjectStoreException e) {
            logger.error("Could not load send message store", (Throwable)e);
            throw new RuntimeException("Could not load send message store", e);
        }
        finally {
            Duration periodToExecute = Duration.between(startExecute, Instant.now());
            logger.debug("Time to load states: " + periodToExecute.toMillis() + " milliseconds");
        }
    }

    private void saveState(ArrayList<MessageState> states, ObjectStore deliveryObjectStore) {
        try {
            try {
                deliveryObjectStore.remove(MESSAGE_STORE_NAME);
            }
            catch (Exception exception) {
                // empty catch block
            }
            deliveryObjectStore.store(MESSAGE_STORE_NAME, states);
        }
        catch (ObjectStoreException e) {
            logger.error("Could not save send message store", (Throwable)e);
            throw new RuntimeException("Could not save send message store", e);
        }
    }

    private void sendAction(MessageState msg, MessageDetail message, RosettaNetConfig config, ArrayList<MessageState> states, ObjectStore deliveryObjectStore) {
        SendAction action = new SendAction(message.getMimeData(), config);
        Instant startExecute = Instant.now();
        this.sendExecutor.execute(action);
        Duration periodToExecute = Duration.between(startExecute, Instant.now());
        logger.debug("Time to call ExecutorService execute: " + periodToExecute.toMillis() + " milliseconds");
        msg.setSendCount(msg.getSendCount() + 1);
        msg.setNextTimeout(System.currentTimeMillis() + (long)config.getSendAction().getTimeToAcknowledge() * 1000L);
        this.saveState(states, deliveryObjectStore);
    }

    protected void initialSendAction(MessageDetail message, RosettaNetConfig config) {
        MessageState msg = new MessageState();
        msg.setConfigName(config.getConfigName());
        msg.setMessageId(message.getMessageId());
        this.osManager.executeWithLock(MESSAGE_STORE_NAME, deliveryObjectStore -> {
            try {
                deliveryObjectStore.store(MESSAGE_DETAIL_PREFIX + message.getMessageId(), (Serializable)message);
            }
            catch (ObjectStoreException e) {
                String text = "Unable to persist message [" + message.getMessageId() + "] to object store";
                logger.error(text, (Throwable)e);
                throw new RuntimeException(text, e);
            }
            ArrayList<MessageState> states = this.loadState(deliveryObjectStore);
            states.add(msg);
            this.saveState(states, deliveryObjectStore);
            this.sendAction(msg, message, config, states, deliveryObjectStore);
            return true;
        });
    }

    private String stateIds(ArrayList<MessageState> states) {
        StringBuilder builder = new StringBuilder();
        for (MessageState state : states) {
            if (builder.length() == 0) {
                builder.append("[");
            } else {
                builder.append(", ");
            }
            builder.append(state.getMessageId());
        }
        builder.append("]");
        return builder.toString();
    }

    private void removeMessage(String id, ArrayList<MessageState> states, ObjectStore deliveryObjectStore) {
        logger.debug("Removing message id [" + id + "] from states " + this.stateIds(states));
        states.removeIf(m -> id.equals(m.getMessageId()));
        this.saveState(states, deliveryObjectStore);
        logger.debug("Saved updated states " + this.stateIds(states));
        try {
            deliveryObjectStore.remove(MESSAGE_DETAIL_PREFIX + id);
        }
        catch (Exception e) {
            logger.debug("Failed removing message " + id + " from store", (Throwable)e);
        }
    }

    private MessageDetail loadMessage(String id, ObjectStore deliveryObjectStore) {
        try {
            Serializable value = deliveryObjectStore.retrieve(MESSAGE_DETAIL_PREFIX + id);
            if (value == null || value instanceof MessageDetail) {
                return (MessageDetail)value;
            }
            String text = "Expected MessageDetail as object store value, got " + value.getClass().getName();
            logger.error(text);
            throw new RuntimeException(text);
        }
        catch (ObjectStoreException e) {
            logger.error("Could not load send message data", (Throwable)e);
            throw new RuntimeException("Could not load send message data", e);
        }
    }

    private void notifyComplete(String name, MessageAttributes attrs, Completion complete) {
        SourceCallback<Completion, MessageAttributes> callback = this.callbacks.get(name);
        String msgId = attrs.getMessageId();
        if (callback == null) {
            logger.warn("No CompletionSource found for configuration [" + name + "] (message [" + msgId + "])");
        } else {
            callback.handle(Result.builder().output((Object)complete).attributes((Object)attrs).build());
        }
    }

    protected void completeMessage(String name, MessageAttributes attrs, Completion complete) {
        logger.info("Handling completion of message " + attrs.getMessageId());
        this.notifyComplete(name, attrs, complete);
        this.osManager.executeWithLock(MESSAGE_STORE_NAME, deliveryObjectStore -> {
            ArrayList<MessageState> states = this.loadState(deliveryObjectStore);
            this.removeMessage(attrs.getMessageId(), states, deliveryObjectStore);
            return true;
        });
    }

    private class RetryHandler
    implements Runnable {
        private volatile boolean shutDown;

        private RetryHandler() {
        }

        private void failMessage(MessageDetail msg, RosettaNetConfig config) {
            MessageAttributes attrs = new MessageAttributes();
            attrs.setPartnerBusinessId(config.getPartnerBusinessIdentifier());
            attrs.setPartnerLocationId(config.getPartnerLocationId());
            attrs.setMessageId(msg.getMessageId());
            attrs.setMimeData(new ByteArrayInputStream(msg.getMimeData().getBody()));
            attrs.setAcknowledgeData(new ByteArrayInputStream(new byte[0]));
            Completion complete = new Completion();
            complete.setCompletionCode(CompletionCode.FAILURE);
            StoreHandler.this.notifyComplete(config.getConfigName(), attrs, complete);
        }

        public synchronized void stop() {
            this.shutDown = true;
            this.notifyAll();
        }

        private long checkMessageStates() {
            return (Long)StoreHandler.this.osManager.executeWithLock(StoreHandler.MESSAGE_STORE_NAME, deliveryObjectStore -> {
                long nextwait = 10000L;
                ArrayList<MessageState> fails = new ArrayList<MessageState>();
                ArrayList<MessageState> retries = new ArrayList<MessageState>();
                ArrayList states = StoreHandler.this.loadState(deliveryObjectStore);
                logger.trace("Beginning timeout scan with " + states.size() + " messages in store");
                if (!states.isEmpty()) {
                    long timenow = System.currentTimeMillis();
                    ListIterator iter = states.listIterator();
                    while (iter.hasNext()) {
                        MessageState msg = (MessageState)iter.next();
                        long bias = msg.getNextTimeout() - timenow;
                        if (bias <= 0L) {
                            RosettaNetConfig config = (RosettaNetConfig)StoreHandler.this.configurations.get(msg.getConfigName());
                            if (config == null) {
                                logger.error("Configuration [" + msg.getConfigName() + "] not found for timeout on sent message [" + msg.getMessageId() + "] - message discarded");
                                iter.remove();
                                continue;
                            }
                            if (msg.getSendCount() > config.getPipDetail().getRetryCount()) {
                                fails.add(msg);
                                iter.remove();
                                continue;
                            }
                            retries.add(msg);
                            continue;
                        }
                        if (nextwait <= bias) continue;
                        nextwait = bias;
                    }
                    if (!fails.isEmpty()) {
                        StoreHandler.this.saveState(states, deliveryObjectStore);
                    }
                }
                if (fails.size() > 0 || retries.size() > 0) {
                    logger.debug("Found [" + fails.size() + "] failures and [" + retries.size() + "] retries for [" + states.size() + "] messages waiting (next ready in [" + (nextwait + 500L) / 1000L + "] seconds)");
                    for (MessageState msg : fails) {
                        if (this.shutDown) break;
                        RosettaNetConfig config = (RosettaNetConfig)StoreHandler.this.configurations.get(msg.getConfigName());
                        MessageDetail detail = StoreHandler.this.loadMessage(msg.getMessageId(), deliveryObjectStore);
                        if (detail == null) {
                            logger.warn("Failed message [" + msg.getMessageId() + "] not found in store");
                            continue;
                        }
                        this.failMessage(detail, config);
                    }
                    for (MessageState msg : retries) {
                        if (!this.shutDown) {
                            MessageDetail detail = StoreHandler.this.loadMessage(msg.getMessageId(), deliveryObjectStore);
                            if (detail == null) {
                                logger.warn("Retransmit message [" + msg.getMessageId() + "] not found in store");
                                continue;
                            }
                            RosettaNetConfig config = (RosettaNetConfig)StoreHandler.this.configurations.get(msg.getConfigName());
                            StoreHandler.this.sendAction(msg, detail, config, states, deliveryObjectStore);
                            continue;
                        }
                        break;
                    }
                } else if (logger.isTraceEnabled()) {
                    logger.debug("No messages require action");
                }
                return nextwait;
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long waittime = 30000L + (long)(this.hashCode() >> 8 & 0xFF) * 30000L / 256L;
            logger.info("Initializing retry handler with delay of " + waittime);
            while (!this.shutDown) {
                RetryHandler retryHandler = this;
                synchronized (retryHandler) {
                    try {
                        logger.trace("Retry handler thread entering wait for " + waittime + " ms.");
                        this.wait(waittime);
                    }
                    catch (InterruptedException e) {
                        logger.error("Unexpected wake from wait", (Throwable)e);
                    }
                }
                if (this.shutDown) break;
                waittime = this.checkMessageStates();
            }
        }
    }
}

