/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.source;

import com.mulesoft.connectors.kafka.api.source.AckMode;
import com.mulesoft.connectors.kafka.internal.config.ConsumerConfiguration;
import com.mulesoft.connectors.kafka.internal.connection.ConsumerConnection;
import com.mulesoft.connectors.kafka.internal.source.PollingTask;
import com.mulesoft.connectors.kafka.internal.source.SourceCallbackWrapper;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MessageListenerSource<P, A>
extends Source<P, A> {
    private static final Logger logger = LoggerFactory.getLogger(MessageListenerSource.class);
    @Config
    protected ConsumerConfiguration config;
    @Connection
    private ConnectionProvider<ConsumerConnection> connectionProvider;
    @Summary(value="The timeout for the poll")
    @ConfigOverride
    @Optional
    @Parameter
    @DisplayName(value="Poll timeout")
    private int pollTimeout;
    @Summary(value="Time unit for poll timeout.")
    @ConfigOverride
    @Optional
    @Parameter
    @DisplayName(value="Poll timeout time unit")
    private TimeUnit pollTimeoutTimeUnit;
    @Summary(value="Declares the kind of Acknowledgement mode supported.")
    @ConfigOverride
    @Optional
    @Parameter
    @DisplayName(value="Acknowledgement mode")
    private AckMode ackMode;
    @Parameter
    @DisplayName(value="Amount of parallel consumers")
    @Optional(defaultValue="1")
    private int parallelConsumersAmount;
    private List<PollingTask<P, A, ?>> pollingTasks;
    private ConsumerConnection connection;

    public void onStart(SourceCallback<P, A> sourceCallback) throws MuleException {
        SourceCallbackWrapper<P, A> sourceCallbackWrapper = new SourceCallbackWrapper<P, A>(sourceCallback);
        String s = this.parallelConsumersAmount == 1 ? "" : "s";
        logger.info("Starting {} polling task{} in {} mode. Polling timeout is {} ms.", new Object[]{this.parallelConsumersAmount, s, this.ackMode, this.pollTimeoutTimeUnit.toMillis(this.pollTimeout)});
        this.connection = (ConsumerConnection)this.connectionProvider.connect();
        if (logger.isDebugEnabled()) {
            logger.debug("Message listener is using connection provider {}.", this.connectionProvider);
            logger.debug("Message listener is using connection {}.", (Object)this.connection);
        }
        this.pollingTasks = new ArrayList();
        for (int i = 0; i < this.parallelConsumersAmount; ++i) {
            logger.debug("Starting polling task {}.", (Object)(i + 1));
            this.createPollingTask(this.pollingTasks::add, this.config, this.connection, sourceCallbackWrapper, this.ackMode, this.config.asDuration(this.pollTimeout, this.pollTimeoutTimeUnit));
            this.connection.startPolling(this.pollingTasks.get(i));
        }
    }

    public abstract void createPollingTask(Consumer<PollingTask<P, A, ?>> var1, ConsumerConfiguration var2, ConsumerConnection var3, SourceCallback<P, A> var4, AckMode var5, Duration var6);

    @OnSuccess
    public void onSuccess(SourceCallbackContext context) {
        if (this.ackMode.equals((Object)AckMode.AUTO)) {
            try {
                this.connection.commit(this.ackMode, context.getVariable("sessionKey").orElse(""));
            }
            catch (Exception e) {
                logger.error("Failed to commit offsets in source", (Throwable)e);
                context.getSourceCallback().onConnectionException(new ConnectionException((Throwable)e, context.getConnection()));
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Finished the flow successfully for an event.");
        }
    }

    @OnError
    public void onFailure(SourceCallbackContext context) {
        if (this.ackMode.equals((Object)AckMode.MANUAL)) {
            logger.warn("The flow failed, the listener will consume the same messages if the commit was not invoked");
            this.connection.refreshBuffer(AckMode.MANUAL, context.getVariable("sessionKey").orElse(""));
        } else if (this.ackMode.equals((Object)AckMode.AUTO)) {
            logger.warn("The flow failed, the listener will consume the same messages");
            this.connection.refreshBuffer(AckMode.AUTO, context.getVariable("sessionKey").orElse(""));
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Finished the flow with an error for an event.");
        }
    }

    @OnTerminate
    public void onTerminate(SourceCallbackContext context) {
        if (this.ackMode.equals((Object)AckMode.MANUAL)) {
            logger.debug("The flow terminated, releasing consumer for MANUAL ack mode.");
            this.connection.release(this.ackMode, context.getVariable("sessionKey").orElse(""));
        } else if (this.ackMode.equals((Object)AckMode.AUTO)) {
            logger.debug("The flow terminated, releasing consumer for AUTO ack mode.");
            this.connection.release(this.ackMode, context.getVariable("sessionKey").orElse(""));
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Terminated flow with context {}", (Object)context);
        }
    }

    public void onStop() {
        if (logger.isDebugEnabled()) {
            logger.debug("Stopping MessageListener {}", (Object)((Object)((Object)this)).getClass().getSimpleName());
        }
        if (this.pollingTasks != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Stopping all polling tasks");
            }
            this.pollingTasks.forEach(IOUtils::closeQuietly);
        }
        logger.debug("Stopped MessageListener {}", (Object)((Object)((Object)this)).getClass().getSimpleName());
    }
}

