/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.model.consumer;

import com.mulesoft.connectors.kafka.internal.error.exception.NotFoundException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationInterruptedException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationTimeoutException;
import com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool;
import com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPoolClosedException;
import com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.core.api.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultConsumerPool
implements ConsumerPool {
    private static final Logger logger = LoggerFactory.getLogger(DefaultConsumerPool.class);
    private Map<MuleConsumer, Semaphore> consumerMap;
    private List<Map.Entry<MuleConsumer, Semaphore>> consumerIndexes;
    private AtomicInteger currentPosition = new AtomicInteger();
    private AtomicBoolean isValid = new AtomicBoolean(true);
    private Semaphore consumerAvailabilitySemaphore;

    public DefaultConsumerPool(Set<MuleConsumer> consumerSet) {
        this.consumerMap = consumerSet.stream().peek(consumer -> consumer.setPool(this)).collect(Collectors.toMap(Function.identity(), consumer -> new Semaphore(1, true)));
        this.consumerAvailabilitySemaphore = new Semaphore(consumerSet.size(), true);
        this.consumerIndexes = new ArrayList<Map.Entry<MuleConsumer, Semaphore>>(this.consumerMap.entrySet());
    }

    @Override
    public MuleConsumer checkOut(Duration timeout) throws ConsumerPoolClosedException {
        Optional<MuleConsumer> muleConsumer = this.checkOut(true, timeout);
        if (!muleConsumer.isPresent()) {
            muleConsumer = this.checkOut(false, timeout);
        }
        return muleConsumer.orElseThrow(() -> new OperationTimeoutException(String.format("Unable to checkout a consumer from the consumer using timeout of %d ms", timeout.toMillis()), timeout.toMillis()));
    }

    @Override
    public MuleConsumer checkOut(String topic, int partition, Duration timeout) throws ConsumerPoolClosedException {
        Optional<MuleConsumer> muleConsumer = this.checkOutConsumer(Optional.of(consumer -> consumer.assignment().stream().anyMatch(assignment -> assignment.getTopic().equals(topic) && assignment.getPartition() == partition)), timeout);
        MuleConsumer result = muleConsumer.orElseThrow(() -> new NotFoundException(String.format("There is no consumer for the topic: %s and partition %d", topic, partition)));
        this.checkPoolIsValid(muleConsumer);
        return result;
    }

    @Override
    public Set<MuleConsumer> checkoutAll(Duration timeout) throws ConsumerPoolClosedException {
        long startTime = System.currentTimeMillis();
        long finishTime = startTime + timeout.toMillis();
        ArrayList<Semaphore> acquiredSemaphores = new ArrayList<Semaphore>(this.consumerMap.size());
        int consumerAvailabilitySemaphoreAcquired = 0;
        try {
            int counter = 0;
            for (Map.Entry<MuleConsumer, Semaphore> consumerEntry : this.consumerMap.entrySet()) {
                this.acquireSemaphore(this.consumerAvailabilitySemaphore, timeout, true);
                ++consumerAvailabilitySemaphoreAcquired;
                MuleConsumer muleConsumer = consumerEntry.getKey();
                Semaphore semaphore = consumerEntry.getValue();
                logger.trace("Checking out consumer {}/{}: {}.", new Object[]{++counter, this.consumerMap.size(), muleConsumer.getId()});
                if (timeout.isNegative() || timeout.isZero()) {
                    semaphore.acquire();
                } else {
                    long remainig = finishTime - System.currentTimeMillis();
                    if (!consumerEntry.getValue().tryAcquire(remainig, TimeUnit.MILLISECONDS)) {
                        throw new OperationTimeoutException(timeout.toMillis());
                    }
                }
                logger.trace("Consumer {} checked out.", (Object)muleConsumer.getId());
                acquiredSemaphores.add(semaphore);
            }
            Set<MuleConsumer> muleConsumers = this.consumerMap.keySet();
            this.checkPoolIsValid(muleConsumers);
            return muleConsumers;
        }
        catch (InterruptedException e) {
            acquiredSemaphores.stream().forEach(Semaphore::release);
            this.consumerAvailabilitySemaphore.release(consumerAvailabilitySemaphoreAcquired);
            throw new OperationInterruptedException(e);
        }
        catch (RuntimeException e) {
            acquiredSemaphores.stream().forEach(Semaphore::release);
            this.consumerAvailabilitySemaphore.release(consumerAvailabilitySemaphoreAcquired);
            throw e;
        }
    }

    @Override
    public void checkIn(MuleConsumer consumer) {
        logger.trace("Checking in consumer {}.", (Object)consumer.getId());
        this.consumerMap.get(consumer).release();
        this.consumerAvailabilitySemaphore.release();
        logger.trace("Consumer {} checked in.", (Object)consumer.getId());
    }

    @Override
    public boolean isValid() {
        return this.isValid.get();
    }

    @Override
    public void invalidate() {
        this.isValid.set(false);
    }

    @Override
    public void close() {
        try {
            Set<MuleConsumer> muleConsumers = this.checkoutAll(Duration.ofMillis(-1L));
            muleConsumers.forEach(IOUtils::closeQuietly);
            muleConsumers.forEach(this::checkIn);
        }
        catch (ConsumerPoolClosedException e) {
            logger.debug("Could not close consumer pool, it is already closed");
            throw new MuleRuntimeException((Throwable)e);
        }
        this.invalidate();
    }

    private Optional<MuleConsumer> checkOutConsumer(Optional<Predicate<MuleConsumer>> consumerFilter, Duration timeout) throws ConsumerPoolClosedException {
        Optional<MuleConsumer> result = Optional.empty();
        logger.debug("checkOutConsumer: Trying to checkout specific consumer from pool.");
        boolean acquirePermitForConsumerAvailabilitySemaphore = false;
        try {
            long sysTimeMillis = System.currentTimeMillis();
            acquirePermitForConsumerAvailabilitySemaphore = this.acquireSemaphore(this.consumerAvailabilitySemaphore, timeout, true);
            for (int i = 0; i < this.consumerIndexes.size(); ++i) {
                Map.Entry<MuleConsumer, Semaphore> consumerEntry = this.consumerIndexes.get(i);
                boolean semaphoreAcquired = false;
                MuleConsumer consumer = consumerEntry.getKey();
                Semaphore semaphore = consumerEntry.getValue();
                semaphoreAcquired = this.acquireSemaphore(semaphore, timeout, true);
                if (consumerFilter.isPresent() && !consumerFilter.get().test(consumer)) {
                    semaphore.release();
                    semaphoreAcquired = false;
                }
                if (!semaphoreAcquired) continue;
                result = Optional.of(consumer);
                break;
            }
            logger.debug("checkOutConsumer: WHILE LOOP took : {}", (Object)(System.currentTimeMillis() - sysTimeMillis));
        }
        catch (InterruptedException e) {
            if (acquirePermitForConsumerAvailabilitySemaphore) {
                this.consumerAvailabilitySemaphore.release();
            }
            throw new OperationInterruptedException(e);
        }
        catch (RuntimeException e) {
            if (acquirePermitForConsumerAvailabilitySemaphore) {
                this.consumerAvailabilitySemaphore.release();
            }
            throw e;
        }
        this.checkPoolIsValid(result);
        logger.debug("checkOutConsumer returning");
        return result;
    }

    private Optional<MuleConsumer> checkOut(boolean checkSemaphoreAvailablePermits, Duration timeout) throws ConsumerPoolClosedException {
        Optional<MuleConsumer> result = Optional.empty();
        logger.debug("checkOut: Trying to checkout consumer from pool");
        boolean acquirePermitForConsumerAvailabilitySemaphore = false;
        try {
            long sysTimeMilis = System.currentTimeMillis();
            acquirePermitForConsumerAvailabilitySemaphore = this.acquireSemaphore(this.consumerAvailabilitySemaphore, timeout, true);
            while (!result.isPresent() && this.isValid.get()) {
                Map.Entry<MuleConsumer, Semaphore> consumerEntry = this.consumerIndexes.get(this.currentPosition.getAndUpdate(currentValue -> (currentValue + 1) % this.consumerMap.size()));
                MuleConsumer consumer = consumerEntry.getKey();
                Semaphore semaphore = consumerEntry.getValue();
                boolean shouldReturnConsumer = !checkSemaphoreAvailablePermits || consumerEntry.getValue().availablePermits() != 0;
                if (!shouldReturnConsumer || !this.acquireSemaphore(semaphore, timeout, false)) continue;
                result = Optional.of(consumer);
            }
            logger.debug("WHILE LOOP took : {}", (Object)(System.currentTimeMillis() - sysTimeMilis));
            logger.debug("checkOut returning");
        }
        catch (InterruptedException e) {
            if (acquirePermitForConsumerAvailabilitySemaphore) {
                this.consumerAvailabilitySemaphore.release();
            }
            throw new OperationInterruptedException(e);
        }
        this.checkPoolIsValid(result);
        return result;
    }

    @Override
    public void checkPoolIsValid(Set<MuleConsumer> muleConsumers) throws ConsumerPoolClosedException {
        if (!this.isValid()) {
            muleConsumers.forEach(this::checkIn);
            throw new ConsumerPoolClosedException();
        }
    }

    @Override
    public void checkPoolIsValid(Optional<MuleConsumer> result) throws ConsumerPoolClosedException {
        if (!this.isValid()) {
            result.ifPresent(this::checkIn);
            if (!result.isPresent()) {
                this.consumerAvailabilitySemaphore.release();
            }
            throw new ConsumerPoolClosedException();
        }
        if (!result.isPresent()) {
            this.consumerAvailabilitySemaphore.release();
        }
    }

    private boolean acquireSemaphore(Semaphore semaphore, Duration timeout, boolean forceAcquire) throws InterruptedException {
        if (timeout.isNegative() || timeout.isZero()) {
            if (forceAcquire) {
                return this.acquire(semaphore);
            }
            return semaphore.tryAcquire();
        }
        if (!semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
            throw new OperationTimeoutException(timeout.toMillis());
        }
        return true;
    }

    private boolean acquire(Semaphore semaphore) throws InterruptedException {
        semaphore.acquire();
        return true;
    }
}

