/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.connection;

import com.mulesoft.connectors.commons.template.connection.ConnectorConnection;
import com.mulesoft.connectors.kafka.api.KafkaRecordAttributes;
import com.mulesoft.connectors.kafka.api.source.AckMode;
import com.mulesoft.connectors.kafka.api.source.TopicPartition;
import com.mulesoft.connectors.kafka.internal.error.KafkaErrorType;
import com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException;
import com.mulesoft.connectors.kafka.internal.error.exception.CommitFailedException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidAckModeException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidOffsetException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidTopicNameException;
import com.mulesoft.connectors.kafka.internal.error.exception.NegativeDurationException;
import com.mulesoft.connectors.kafka.internal.error.exception.NotFoundException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationInterruptedException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationTimeoutException;
import com.mulesoft.connectors.kafka.internal.error.exception.SessionNotFoundException;
import com.mulesoft.connectors.kafka.internal.error.exception.TimeoutTooLargeException;
import com.mulesoft.connectors.kafka.internal.error.exception.UnassignedConsumerException;
import com.mulesoft.connectors.kafka.internal.error.exception.UnexpectedException;
import com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool;
import com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPoolClosedException;
import com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer;
import com.mulesoft.connectors.kafka.internal.model.consumer.Session;
import com.mulesoft.connectors.kafka.internal.source.PollingTask;
import java.io.Closeable;
import java.io.InputStream;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionValidationResult;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.extension.api.error.ErrorTypeDefinition;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.FlowListener;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerConnection
implements ConnectorConnection {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerConnection.class);
    private static final String SESSION_KEY_TEMPLATE = "%s-%s";
    private final ConsumerPool consumerPool;
    private final Scheduler workerScheduler;
    private final Map<String, Session<MuleConsumer>> openSessions = new ConcurrentHashMap<String, Session<MuleConsumer>>();

    public ConsumerConnection(ConsumerPool consumerPool, Scheduler workerScheduler) {
        this.consumerPool = consumerPool;
        this.workerScheduler = workerScheduler;
    }

    public void seek(String topic, int partition, long offset, Duration operationTimeout) throws ConnectionException {
        try (Session<MuleConsumer> session = new Session<MuleConsumer>(this.consumerPool.checkOut(topic, partition, operationTimeout), this.consumerPool::checkIn);){
            session.run(muleConsumer -> muleConsumer.seek(topic, partition, offset));
        }
        catch (IllegalArgumentException e) {
            throw new InvalidOffsetException(String.format("The seek operation used an invalid offset %s", offset), offset, (Throwable)e);
        }
        catch (IllegalStateException e) {
            throw new NotFoundException(String.format("The topic:%s partition:%d is not currently assigned to this consumer", topic, partition), e);
        }
        catch (ConsumerPoolClosedException e) {
            throw new ConnectionException("The consumer Pool was closed when trying to execute the seek operation", (Throwable)e, null, (Object)this);
        }
    }

    public Result<InputStream, KafkaRecordAttributes> consume(BiFunction<String, ConsumerRecord<InputStream, InputStream>, Result<InputStream, KafkaRecordAttributes>> outputParser, Duration pollTimeout, Duration operationTimeout, AckMode ackMode, FlowListener flowListener) throws ConnectionException {
        Session<MuleConsumer> session = this.createSession(operationTimeout);
        AtomicReference result = new AtomicReference();
        try {
            Result result2 = session.apply(muleConsumer -> {
                try {
                    result.set(muleConsumer.singleElementPoll(pollTimeout));
                    return (Result)outputParser.apply(this.handleConsumeResultAccordingToAckMode(ackMode, flowListener, session, result, (MuleConsumer)muleConsumer), (ConsumerRecord<InputStream, InputStream>)result.get());
                }
                catch (org.apache.kafka.clients.consumer.InvalidOffsetException e) {
                    throw new InvalidOffsetException(e);
                }
                catch (AuthorizationException e) {
                    throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e);
                }
                catch (org.apache.kafka.common.errors.AuthenticationException e) {
                    throw new AuthenticationException(e, this);
                }
                catch (IllegalArgumentException e) {
                    throw new NegativeDurationException(pollTimeout, (Throwable)e);
                }
                catch (IllegalStateException e) {
                    throw new UnassignedConsumerException(e);
                }
                catch (ArithmeticException e) {
                    throw new TimeoutTooLargeException(pollTimeout, (Throwable)e);
                }
                catch (org.apache.kafka.clients.consumer.CommitFailedException exception) {
                    throw new CommitFailedException(exception.getMessage(), KafkaErrorType.COMMIT_FAILED, exception.getCause());
                }
                catch (InvalidTopicException e) {
                    throw new InvalidTopicNameException(e);
                }
                catch (InterruptException e) {
                    throw new OperationInterruptedException(e);
                }
                catch (KafkaException e) {
                    throw new UnexpectedException(e);
                }
            });
            return result2;
        }
        catch (CommitFailedException e) {
            if (logger.isDebugEnabled()) {
                logger.debug("Throwing ConnectionException because commit failed for connection: {}.", (Object)e, (Object)this);
            }
            session.close();
            throw new ConnectionException((Throwable)((Object)e), (Object)this);
        }
        finally {
            if (result.get() == null || AckMode.IMMEDIATE == ackMode || AckMode.DUPS_OK == ackMode) {
                session.close();
            }
        }
    }

    private String handleConsumeResultAccordingToAckMode(AckMode ackMode, FlowListener flowListener, Session<MuleConsumer> session, AtomicReference<ConsumerRecord<InputStream, InputStream>> result, MuleConsumer muleConsumer) {
        if (result.get() != null) {
            switch (ackMode) {
                case AUTO: {
                    flowListener.onSuccess(message -> {
                        logger.debug("Flow finished with success, about to commit offset for processed messages.");
                        muleConsumer.commit();
                        logger.debug("Successfully committed offset for processed messages.");
                        session.close();
                    });
                    flowListener.onError(exc -> {
                        logger.debug("Flow execution resulted in error. Same message will be consumed again.");
                        muleConsumer.resetBuffer();
                        session.close();
                    });
                    break;
                }
                case IMMEDIATE: {
                    muleConsumer.commit();
                    break;
                }
                case DUPS_OK: {
                    muleConsumer.asyncCommit();
                    break;
                }
                case MANUAL: {
                    String sessionId = session.getId().toString();
                    this.addSession(ackMode, sessionId, session);
                    flowListener.onError(e -> {
                        logger.debug("Flow execution resulted in error. Same message will be consumed again.");
                        muleConsumer.resetBuffer();
                        this.removeSession(ackMode, sessionId);
                    });
                    flowListener.onComplete(() -> {
                        logger.debug("Flow has finished, closing consumer session.");
                        session.close();
                    });
                }
            }
        }
        return session.getId().toString();
    }

    public void commit(AckMode ackMode, String sessionId) throws ConnectionException {
        try {
            this.getSession(ackMode, sessionId).run(MuleConsumer::commit);
        }
        catch (org.apache.kafka.common.errors.AuthenticationException e) {
            throw new AuthenticationException(e, this);
        }
        catch (AuthorizationException e) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e);
        }
        catch (TimeoutException e) {
            logger.warn("The commit timeout for seesionId {}", (Object)sessionId);
            throw new UnexpectedException(e);
        }
        catch (org.apache.kafka.clients.consumer.CommitFailedException exception) {
            throw new CommitFailedException(exception.getMessage(), KafkaErrorType.COMMIT_FAILED, exception.getCause());
        }
        catch (InterruptException e) {
            throw new OperationInterruptedException(e);
        }
        catch (IllegalArgumentException | KafkaException e) {
            logger.debug("There was an unexpected exception while doing a commit of the sessionId {}", (Object)sessionId);
            throw new UnexpectedException(e);
        }
    }

    public void subscribe(Duration operationTimeout, List<String> subscriptionPatterns) throws ConnectionException {
        try (Session<Set> session = new Session<Set>(this.consumerPool.checkoutAll(operationTimeout), consumerSet -> consumerSet.stream().forEach(this.consumerPool::checkIn));){
            session.run(muleConsumers -> muleConsumers.stream().forEach(consumer -> consumer.subscribe(subscriptionPatterns)));
        }
        catch (AuthorizationException e) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e);
        }
        catch (org.apache.kafka.common.errors.AuthenticationException e) {
            throw new AuthenticationException(e, this);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("subscribe", e);
        }
        catch (ConsumerPoolClosedException e) {
            throw new ConnectionException("The consumer Pool is closed, can't subscribe", (Throwable)e, null, (Object)this);
        }
    }

    public void assign(Duration operationTimeout, List<TopicPartition> assignments) throws ConnectionException {
        try (Session<Set> session = new Session<Set>(this.consumerPool.checkoutAll(operationTimeout), consumerSet -> consumerSet.stream().forEach(this.consumerPool::checkIn));){
            session.run(muleConsumers -> {
                Iterator<List<TopicPartition>> assignmentIterator = this.dividePartitions(assignments, muleConsumers.size()).iterator();
                muleConsumers.stream().forEach(consumer -> consumer.assign((List)assignmentIterator.next()));
            });
        }
        catch (AuthorizationException e) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e);
        }
        catch (org.apache.kafka.common.errors.AuthenticationException e) {
            throw new AuthenticationException(e, this);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("assignment", e);
        }
        catch (ConsumerPoolClosedException e) {
            throw new ConnectionException("The consumer Pool was closed when trying to execute the assign operation", (Throwable)e, null, (Object)this);
        }
    }

    public Future<?> startPolling(PollingTask<?, ?, ?> pollingTask) {
        return this.workerScheduler.submit(pollingTask);
    }

    public <T> Map.Entry<String, T> poll(AckMode ackMode, Duration pollTimeout, BiFunction<MuleConsumer, Duration, T> pollOperation) throws ConnectionException {
        Session<MuleConsumer> session = this.createSession(Duration.ofMillis(-1L));
        return session.apply(muleConsumer -> {
            try {
                Object result = pollOperation.apply((MuleConsumer)muleConsumer, pollTimeout);
                return this.handlePollResultAccordingToAckMode(ackMode, session, (MuleConsumer)muleConsumer, (Object)result);
            }
            catch (AuthorizationException e) {
                throw this.ensureClosedSession(new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e), session);
            }
            catch (org.apache.kafka.common.errors.AuthenticationException e) {
                throw this.ensureClosedSession(new AuthenticationException(e, this), session);
            }
            catch (org.apache.kafka.clients.consumer.InvalidOffsetException e) {
                throw this.ensureClosedSession(new InvalidOffsetException(e), session);
            }
            catch (InvalidTopicException e) {
                throw this.ensureClosedSession(new com.mulesoft.connectors.kafka.internal.error.exception.InvalidTopicException("An invalid topic name was provided ", e), session);
            }
            catch (IllegalStateException e) {
                throw this.ensureClosedSession(new UnassignedConsumerException(e), session);
            }
            catch (NotFoundException e) {
                logger.trace("No messages found.");
                return this.ensureClosedSession(null, session);
            }
            catch (InterruptException e) {
                throw new OperationInterruptedException(e);
            }
            catch (RuntimeException e) {
                throw this.ensureClosedSession(e, session);
            }
        });
    }

    private <T, E> E ensureClosedSession(E object, Session<T> session) {
        session.close();
        return object;
    }

    private <T> AbstractMap.SimpleEntry<String, T> handlePollResultAccordingToAckMode(AckMode ackMode, Session<MuleConsumer> session, MuleConsumer muleConsumer, T result) {
        if (result != null) {
            String sessionKey = null;
            switch (ackMode) {
                case AUTO: 
                case MANUAL: {
                    sessionKey = session.getId().toString();
                    this.addSession(ackMode, sessionKey, session);
                    break;
                }
                case DUPS_OK: {
                    muleConsumer.asyncCommit();
                    session.close();
                    break;
                }
                case IMMEDIATE: {
                    muleConsumer.commit();
                    session.close();
                }
            }
            return new AbstractMap.SimpleEntry<String, T>(sessionKey, result);
        }
        session.close();
        return null;
    }

    public void disconnect() {
        IOUtils.closeQuietly((Closeable)this.consumerPool);
        if (logger.isDebugEnabled()) {
            logger.debug("Disconnected connection {}!", (Object)this);
        }
    }

    @Deprecated
    public void validate() {
        if (logger.isDebugEnabled()) {
            logger.debug("Validating connection {}!", (Object)this);
        }
        if (!this.consumerPool.isValid()) {
            throw new ModuleException((ErrorTypeDefinition)KafkaErrorType.INVALID_CONNECTION, (Throwable)new ConnectionException(null, (Object)this));
        }
    }

    private List<List<TopicPartition>> dividePartitions(List<TopicPartition> assignments, int size) {
        ArrayList<List<TopicPartition>> results = new ArrayList<List<TopicPartition>>();
        IntStream.range(0, size).forEach(i -> results.add(new ArrayList()));
        Iterator<TopicPartition> assignmentsIterator = assignments.iterator();
        while (assignmentsIterator.hasNext()) {
            for (int i2 = 0; assignmentsIterator.hasNext() && i2 < size; ++i2) {
                ((List)results.get(i2)).add(assignmentsIterator.next());
            }
        }
        return results;
    }

    public void release(AckMode ackMode, String sessionId) {
        this.getSession(ackMode, sessionId).close();
        this.removeSession(ackMode, sessionId);
    }

    private void removeSession(AckMode ackMode, String sessionId) {
        this.openSessions.remove(String.format(SESSION_KEY_TEMPLATE, new Object[]{ackMode, sessionId}));
    }

    private void addSession(AckMode ackMode, String sessionId, Session<MuleConsumer> session) {
        this.openSessions.put(String.format(SESSION_KEY_TEMPLATE, new Object[]{ackMode, sessionId}), session);
    }

    public void refreshBuffer(AckMode ackMode, String sessionId) {
        Session<MuleConsumer> session = this.getSession(ackMode, sessionId);
        session.apply(sessionElement -> {
            sessionElement.resetBuffer();
            return null;
        });
    }

    private Session<MuleConsumer> getSession(AckMode ackMode, String sessionId) {
        if (StringUtils.isBlank((String)sessionId)) {
            throw new SessionNotFoundException(ackMode, sessionId);
        }
        return this.openSessions.keySet().stream().filter(key -> key.endsWith(sessionId)).peek(key -> {
            if (!key.startsWith(ackMode.name())) {
                throw new InvalidAckModeException(ackMode);
            }
        }).findFirst().map(this.openSessions::get).orElseThrow(() -> new SessionNotFoundException(ackMode, sessionId));
    }

    private Session<MuleConsumer> createSession(Duration operationTimeout) throws ConnectionException {
        try {
            return new Session<MuleConsumer>(this.consumerPool.checkOut(operationTimeout), this.consumerPool::checkIn);
        }
        catch (ConsumerPoolClosedException e) {
            throw new ConnectionException((Throwable)e, (Object)this);
        }
    }

    public ConnectionValidationResult validateWithResult() {
        if (logger.isDebugEnabled()) {
            logger.debug("Validating connection {}!", (Object)this);
        }
        if (!this.consumerPool.isValid()) {
            return ConnectionValidationResult.failure((String)"Invalid Connection", (Exception)((Object)new ConnectionException("Invalid Connection", null, null, (Object)this)));
        }
        return ConnectionValidationResult.success();
    }
}

