/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mule.runtime.module.batch.internal.engine.queue;

import com.mulesoft.mule.runtime.module.batch.api.record.Record;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.buffer.BatchContextTransactionManager;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.mule.runtime.module.batch.internal.streaming.SerializationAwareQueueProducer;
import com.mulesoft.mule.runtime.module.batch.util.BatchUtils;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.serialization.ObjectSerializer;
import org.mule.runtime.api.serialization.SerializationException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.QueueProfile;
import org.mule.runtime.core.api.streaming.iterator.ConsumerStreamingIterator;
import org.mule.runtime.core.api.util.queue.Queue;
import org.mule.runtime.core.api.util.queue.QueueManager;
import org.mule.runtime.core.internal.streaming.object.iterator.SimpleConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBatchQueueDelegate
implements BatchQueueDelegate {
    private static final Logger logger = LoggerFactory.getLogger(AbstractBatchQueueDelegate.class);
    private String name;
    private final long queueTimeout;
    private final QueueManager queueManager;
    private final ObjectSerializer serializer;
    private final QueueProfile queueProfile;
    private final Set<String> configuredQueueNames;
    private final MuleContext muleContext;
    protected final BatchJobInstanceAdapter jobInstance;

    public AbstractBatchQueueDelegate(BatchJobInstanceAdapter jobInstance, QueueManager queueManager, long queueTimeout, ObjectSerializer serializer, MuleContext muleContext) {
        this.jobInstance = jobInstance;
        this.queueManager = queueManager;
        this.queueTimeout = queueTimeout;
        this.serializer = serializer;
        this.muleContext = muleContext;
        this.queueProfile = QueueProfile.newInstanceWithPersistentQueueStore();
        this.configuredQueueNames = Collections.newSetFromMap(new ConcurrentHashMap());
    }

    private void assertMuleStarted(BatchTransactionContext ctx) {
        if (!this.muleContext.isStarted()) {
            BatchUtils.rollback(ctx);
            throw new IllegalStateException(String.format("Mule Context is Stopped or stopping. Execution context '%s' for instance '%s' of job '%s' has been rolled back", ctx.getId(), ctx.getJobInstance().getId(), ctx.getJob().getName()));
        }
    }

    @Override
    public void dispatch(BatchTransactionContext ctx, Collection<Record> records) throws MuleException {
        this.assertMuleStarted(ctx);
        if (CollectionUtils.isEmpty(records)) {
            return;
        }
        Queue queue = this.getQueue(ctx);
        try {
            byte[] bytes = this.serializer.getInternalProtocol().serialize(records);
            if (!queue.offer((Serializable)bytes, this.queueTimeout)) {
                throw new DefaultMuleException(I18nMessageFactory.createStaticMessage(String.format("Could not dispatch records to batch queue %s (%d elements already queued)", queue.getName(), queue.size())));
            }
        }
        catch (SerializationException e) {
            throw new DefaultMuleException(String.format("Could not dispatch records to batch queue %s due to Serialization Exception", ctx.getJobInstance().getQueueName()), (Throwable)e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DefaultMuleException(String.format("Could not dispatch records to batch queue %s due to Thread interruption Exception", ctx.getJobInstance().getQueueName()), (Throwable)e);
        }
        if (logger.isDebugEnabled()) {
            logger.debug(String.format("dispatched %d records to Batch Queue %s", records.size(), ctx.getJobInstance().getQueueName()));
        }
    }

    @Override
    public void dispatchAndCommit(BatchTransactionContext ctx, BatchJobInstanceAdapter jobInstance, List<Record> records, BatchContextTransactionManager manager) {
        try {
            this.dispatch(ctx, records);
            manager.commit(ctx);
        }
        catch (MuleException e) {
            manager.rollback(ctx, jobInstance, records, e);
        }
    }

    @Override
    public List<Record> poll(BatchTransactionContext ctx) throws MuleException {
        byte[] bytes;
        block3: {
            this.assertMuleStarted(ctx);
            Queue queue = this.getQueue(ctx);
            try {
                bytes = (byte[])queue.poll(this.queueTimeout);
                if (bytes != null) break block3;
                return null;
            }
            catch (InterruptedException e) {
                throw new DefaultMuleException(String.format("Thread interrupted while trying to read from queue %s", queue.getName()), (Throwable)e);
            }
        }
        return (List)this.serializer.getInternalProtocol().deserialize(bytes);
    }

    @Override
    public long size(BatchTransactionContext ctx) throws MuleException {
        this.assertMuleStarted(ctx);
        Queue queue = this.getQueue(ctx);
        return queue != null ? queue.size() : 0;
    }

    @Override
    public Iterator<List<Record>> iterator(BatchTransactionContext ctx) throws MuleException {
        this.assertMuleStarted(ctx);
        SerializationAwareQueueProducer producer = new SerializationAwareQueueProducer(this.getQueue(ctx), this.serializer);
        SimpleConsumer consumer = new SimpleConsumer(producer);
        return new ConsumerStreamingIterator<List<Record>>(consumer);
    }

    @Override
    public void dispose(BatchTransactionContext ctx) {
        try {
            this.getQueue(ctx).dispose();
        }
        catch (Exception e) {
            logger.error(String.format("Exception found while trying to dispose queue %s for instance '%s' of job '%s", this.getQueueName(), ctx.getJobInstance().getId(), ctx.getJobInstance().getOwnerJobName()), (Throwable)e);
        }
    }

    private Queue getQueue(BatchTransactionContext ctx) {
        String queueName = this.getQueueName();
        if (this.configuredQueueNames.add(queueName)) {
            try {
                this.queueProfile.configureQueue(queueName, this.queueManager);
            }
            catch (InitialisationException e) {
                throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("Could not configure queue " + queueName), (Throwable)e);
            }
        }
        return ctx.getQueue(queueName);
    }

    @Override
    public final String getQueueName() {
        if (this.name == null) {
            this.name = this.buildQueueName();
        }
        return this.name;
    }

    protected abstract String buildQueueName();
}

