/*
 * Decompiled with CFR 0.152.
 */
package org.cibseven.bpm.client.topic.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.cibseven.bpm.client.backoff.BackoffStrategy;
import org.cibseven.bpm.client.backoff.ErrorAwareBackoffStrategy;
import org.cibseven.bpm.client.exception.ExternalTaskClientException;
import org.cibseven.bpm.client.impl.EngineClient;
import org.cibseven.bpm.client.impl.EngineClientException;
import org.cibseven.bpm.client.impl.ExternalTaskClientLogger;
import org.cibseven.bpm.client.task.ExternalTask;
import org.cibseven.bpm.client.task.ExternalTaskHandler;
import org.cibseven.bpm.client.task.impl.ExternalTaskImpl;
import org.cibseven.bpm.client.task.impl.ExternalTaskServiceImpl;
import org.cibseven.bpm.client.topic.TopicSubscription;
import org.cibseven.bpm.client.topic.impl.TopicSubscriptionImpl;
import org.cibseven.bpm.client.topic.impl.TopicSubscriptionManagerLogger;
import org.cibseven.bpm.client.topic.impl.dto.FetchAndLockResponseDto;
import org.cibseven.bpm.client.topic.impl.dto.TopicRequestDto;
import org.cibseven.bpm.client.variable.impl.TypedValueField;
import org.cibseven.bpm.client.variable.impl.TypedValues;
import org.cibseven.bpm.client.variable.impl.VariableValue;

public class TopicSubscriptionManager
implements Runnable {
    protected static final TopicSubscriptionManagerLogger LOG = ExternalTaskClientLogger.TOPIC_SUBSCRIPTION_MANAGER_LOGGER;
    protected ReentrantLock ACQUISITION_MONITOR = new ReentrantLock(false);
    protected Condition IS_WAITING = this.ACQUISITION_MONITOR.newCondition();
    protected AtomicBoolean isRunning = new AtomicBoolean(false);
    protected ExternalTaskServiceImpl externalTaskService;
    protected EngineClient engineClient;
    protected CopyOnWriteArrayList<TopicSubscription> subscriptions;
    protected List<TopicRequestDto> taskTopicRequests;
    protected Map<String, ExternalTaskHandler> externalTaskHandlers;
    protected Thread thread;
    protected BackoffStrategy backoffStrategy;
    protected AtomicBoolean isBackoffStrategyDisabled;
    protected TypedValues typedValues;
    protected long clientLockDuration;

    public TopicSubscriptionManager(EngineClient engineClient, TypedValues typedValues, long clientLockDuration) {
        this.engineClient = engineClient;
        this.subscriptions = new CopyOnWriteArrayList();
        this.taskTopicRequests = new ArrayList<TopicRequestDto>();
        this.externalTaskHandlers = new HashMap<String, ExternalTaskHandler>();
        this.clientLockDuration = clientLockDuration;
        this.typedValues = typedValues;
        this.externalTaskService = new ExternalTaskServiceImpl(engineClient);
        this.isBackoffStrategyDisabled = new AtomicBoolean(false);
    }

    @Override
    public void run() {
        while (this.isRunning.get()) {
            try {
                this.acquire();
            }
            catch (Throwable e) {
                LOG.exceptionWhileAcquiringTasks(e);
            }
        }
    }

    protected void acquire() {
        this.taskTopicRequests.clear();
        this.externalTaskHandlers.clear();
        this.subscriptions.forEach(this::prepareAcquisition);
        if (!this.taskTopicRequests.isEmpty()) {
            FetchAndLockResponseDto fetchAndLockResponse = this.fetchAndLock(this.taskTopicRequests);
            fetchAndLockResponse.getExternalTasks().forEach(externalTask -> {
                String topicName = externalTask.getTopicName();
                ExternalTaskHandler taskHandler = this.externalTaskHandlers.get(topicName);
                if (taskHandler != null) {
                    this.handleExternalTask((ExternalTask)externalTask, taskHandler);
                } else {
                    LOG.taskHandlerIsNull(topicName);
                }
            });
            if (!this.isBackoffStrategyDisabled.get()) {
                this.runBackoffStrategy(fetchAndLockResponse);
            }
        }
    }

    protected void prepareAcquisition(TopicSubscription subscription) {
        TopicRequestDto taskTopicRequest = TopicRequestDto.fromTopicSubscription(subscription, this.clientLockDuration);
        this.taskTopicRequests.add(taskTopicRequest);
        String topicName = subscription.getTopicName();
        ExternalTaskHandler externalTaskHandler = subscription.getExternalTaskHandler();
        this.externalTaskHandlers.put(topicName, externalTaskHandler);
    }

    protected FetchAndLockResponseDto fetchAndLock(List<TopicRequestDto> subscriptions) {
        List<ExternalTask> externalTasks = null;
        try {
            LOG.fetchAndLock(subscriptions);
            externalTasks = this.engineClient.fetchAndLock(subscriptions);
        }
        catch (EngineClientException ex) {
            LOG.exceptionWhilePerformingFetchAndLock(ex);
            return new FetchAndLockResponseDto(LOG.handledEngineClientException("fetching and locking task", ex));
        }
        return new FetchAndLockResponseDto(externalTasks);
    }

    protected void handleExternalTask(ExternalTask externalTask, ExternalTaskHandler taskHandler) {
        ExternalTaskImpl task = (ExternalTaskImpl)externalTask;
        Map<String, TypedValueField> variables = task.getVariables();
        Map<String, VariableValue> wrappedVariables = this.typedValues.wrapVariables(task, variables);
        task.setReceivedVariableMap(wrappedVariables);
        try {
            taskHandler.execute(task, this.externalTaskService);
        }
        catch (ExternalTaskClientException e) {
            LOG.exceptionOnExternalTaskServiceMethodInvocation(task.getTopicName(), e);
        }
        catch (Throwable e) {
            LOG.exceptionWhileExecutingExternalTaskHandler(task.getTopicName(), e);
        }
    }

    public synchronized void stop() {
        if (this.isRunning.compareAndSet(true, false)) {
            this.resume();
            try {
                this.thread.join();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.exceptionWhileShuttingDown(e);
            }
        }
    }

    public synchronized void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            this.thread = new Thread((Runnable)this, TopicSubscriptionManager.class.getSimpleName());
            this.thread.start();
        }
    }

    protected void subscribe(TopicSubscription subscription) {
        if (!this.subscriptions.addIfAbsent(subscription)) {
            String topicName = subscription.getTopicName();
            throw LOG.topicNameAlreadySubscribedException(topicName);
        }
        this.resume();
    }

    protected void unsubscribe(TopicSubscriptionImpl subscription) {
        this.subscriptions.remove(subscription);
    }

    public EngineClient getEngineClient() {
        return this.engineClient;
    }

    public List<TopicSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    public boolean isRunning() {
        return this.isRunning.get();
    }

    public void setBackoffStrategy(BackoffStrategy backOffStrategy) {
        this.backoffStrategy = backOffStrategy;
    }

    protected void runBackoffStrategy(FetchAndLockResponseDto fetchAndLockResponse) {
        try {
            List<ExternalTask> externalTasks = fetchAndLockResponse.getExternalTasks();
            if (this.backoffStrategy instanceof ErrorAwareBackoffStrategy) {
                ErrorAwareBackoffStrategy errorAwareBackoffStrategy = (ErrorAwareBackoffStrategy)this.backoffStrategy;
                ExternalTaskClientException exception = fetchAndLockResponse.getError();
                errorAwareBackoffStrategy.reconfigure(externalTasks, exception);
            } else {
                this.backoffStrategy.reconfigure(externalTasks);
            }
            long waitTime = this.backoffStrategy.calculateBackoffTime();
            this.suspend(waitTime);
        }
        catch (Throwable e) {
            LOG.exceptionWhileExecutingBackoffStrategyMethod(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void suspend(long waitTime) {
        if (waitTime > 0L && this.isRunning.get()) {
            this.ACQUISITION_MONITOR.lock();
            try {
                if (this.isRunning.get()) {
                    this.IS_WAITING.await(waitTime, TimeUnit.MILLISECONDS);
                }
            }
            catch (InterruptedException e) {
                LOG.exceptionWhileExecutingBackoffStrategyMethod(e);
            }
            finally {
                this.ACQUISITION_MONITOR.unlock();
            }
        }
    }

    protected void resume() {
        this.ACQUISITION_MONITOR.lock();
        try {
            this.IS_WAITING.signal();
        }
        finally {
            this.ACQUISITION_MONITOR.unlock();
        }
    }

    public void disableBackoffStrategy() {
        this.isBackoffStrategyDisabled.set(true);
    }
}

