/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.source;

import com.mulesoft.connectors.kafka.api.source.AckMode;
import com.mulesoft.connectors.kafka.internal.connection.ConsumerConnection;
import com.mulesoft.connectors.kafka.internal.error.exception.NotFoundException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationTimeoutException;
import com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer;
import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.kafka.common.errors.WakeupException;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PollingTask<P, A, I>
implements Runnable,
Closeable {
    private static final Logger logger = LoggerFactory.getLogger(PollingTask.class);
    public static final String SESSION_KEY = "sessionKey";
    public static final String UNRECOVERABLE_EXCEPTION_MESSAGE = "Got an unrecoverable exception while running the polling task";
    public static final String RECOVERABLE_EXCEPTION_MESSAGE = "Got a recoverable exception while running the polling task ({}})";
    private final SourceCallback<P, A> sourceCallback;
    private final BiFunction<String, I, Result<P, A>> parser;
    private final BiFunction<MuleConsumer, Duration, I> pollOperation;
    private final ConsumerConnection consumerConnection;
    private final AckMode ackMode;
    private final Duration pollTimeout;
    private boolean running = true;
    private CountDownLatch closeCountDown = new CountDownLatch(1);

    public PollingTask(ConsumerConnection consumerConnection, AckMode ackMode, Duration pollTimeout, BiFunction<MuleConsumer, Duration, I> pollOperation, BiFunction<String, I, Result<P, A>> parser, SourceCallback<P, A> sourceCallback) {
        this.consumerConnection = consumerConnection;
        this.ackMode = ackMode;
        this.pollTimeout = pollTimeout;
        this.parser = parser;
        this.pollOperation = pollOperation;
        this.sourceCallback = sourceCallback;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        logger.info("Starting the PollingTask with ackMode={}, pollTimeout={}", (Object)this.ackMode, (Object)this.pollTimeout);
        try {
            while (this.running) {
                try {
                    Map.Entry<String, I> entry;
                    logger.trace("Listening for messages.");
                    if (logger.isDebugEnabled()) {
                        logger.debug("Using source connection {}.", (Object)this.consumerConnection);
                    }
                    if ((entry = this.consumerConnection.poll(this.ackMode, this.pollTimeout, this.pollOperation)) == null) continue;
                    if (logger.isDebugEnabled()) {
                        logger.debug("Polled returned a value {}", entry);
                    }
                    String key = entry.getKey();
                    SourceCallbackContext callbackContext = this.sourceCallback.createContext();
                    if (key != null) {
                        callbackContext.addVariable(SESSION_KEY, (Object)key);
                    }
                    this.sourceCallback.handle(this.parser.apply(key, entry.getValue()), callbackContext);
                    if (key == null) {
                        logger.trace("Message(s) sent to flow.");
                        continue;
                    }
                    logger.trace("Message(s) with key '{}' sent to flow.", (Object)key);
                }
                catch (WakeupException e) {
                    if (!logger.isDebugEnabled()) continue;
                    logger.debug(RECOVERABLE_EXCEPTION_MESSAGE, (Object)e, (Object)"Wakeup");
                }
                catch (OperationTimeoutException e) {
                    if (!logger.isDebugEnabled()) continue;
                    logger.debug(RECOVERABLE_EXCEPTION_MESSAGE, (Object)e, (Object)"Operation Timed out");
                }
                catch (NotFoundException e) {
                    if (!logger.isTraceEnabled()) continue;
                    logger.trace("Did not get any results from Kafka for the last poll invocation");
                }
                catch (ConnectionException e) {
                    logger.info(UNRECOVERABLE_EXCEPTION_MESSAGE, (Throwable)e);
                    this.running = false;
                    this.sourceCallback.onConnectionException(e);
                }
                catch (RuntimeException e) {
                    logger.info(UNRECOVERABLE_EXCEPTION_MESSAGE, (Throwable)e);
                    this.running = false;
                    this.sourceCallback.onConnectionException(new ConnectionException(UNRECOVERABLE_EXCEPTION_MESSAGE, (Throwable)e, null, (Object)this.consumerConnection));
                }
            }
            logger.info("Finished the PollingTask normally with ackMode={}, pollTimeout={}", (Object)this.ackMode, (Object)this.pollTimeout);
            return;
        }
        finally {
            this.closeCountDown.countDown();
        }
    }

    @Override
    public void close() {
        logger.debug("Stopping running state {}", (Object)this.running);
        this.running = false;
        try {
            if (!this.closeCountDown.await(30L, TimeUnit.SECONDS)) {
                logger.warn("Polling task timeout while waiting to be closed");
            }
        }
        catch (InterruptedException e) {
            logger.warn("PollingTask was interrupted while closing");
        }
        logger.debug("Stopped");
    }
}

