/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connector.amazon.kinesis.internal.source;

import com.mulesoft.connector.amazon.kinesis.api.RecordAttributes;
import com.mulesoft.connector.amazon.kinesis.api.StreamInitialPosition;
import com.mulesoft.connector.amazon.kinesis.internal.connection.KinesisConnection;
import com.mulesoft.connector.amazon.kinesis.internal.connection.provider.parameter.CommonListenerParameters;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.IllegalStateException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException;
import com.mulesoft.connector.amazon.kinesis.internal.source.consumer.Consumer;
import com.mulesoft.connector.amazon.kinesis.internal.source.consumer.ConsumerManager;
import java.io.InputStream;
import java.util.List;
import javax.inject.Inject;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.source.BackPressure;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.BackPressureMode;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias(value="listener")
@DisplayName(value="Listener")
@ClusterSupport(value=SourceClusterSupport.DEFAULT_ALL_NODES)
@BackPressure(defaultMode=BackPressureMode.WAIT, supportedModes={BackPressureMode.WAIT, BackPressureMode.DROP})
public class KinesisSource
extends Source<List<Result<InputStream, RecordAttributes>>, Void> {
    private static final Logger logger = LoggerFactory.getLogger(KinesisSource.class);
    @ParameterGroup(name="Listener Parameters")
    private CommonListenerParameters commonParams;
    @ParameterGroup(name="Stream Initial Position")
    private StreamInitialPosition initialPosition;
    @Connection
    private ConnectionProvider<KinesisConnection> connectionProvider;
    @Inject
    private ConsumerManager consumerManager;
    private Consumer consumer;

    public void onStart(SourceCallback<List<Result<InputStream, RecordAttributes>>, Void> sourceCallback) throws MuleException {
        if (this.commonParams.getMaxBatchSize() < 1 || this.commonParams.getMaxBatchSize() > 10000) {
            throw new InvalidArgumentException("Maximum batch size must be between 1 and 10000 (inclusive).");
        }
        KinesisConnection connection = (KinesisConnection)this.connectionProvider.connect();
        this.consumer = this.consumerManager.addConsumer(connection.createConsumer(this.consumerManager.getConsumerId(this.commonParams.getApplicationName(), this.commonParams.getStreamName()), this.commonParams, this.initialPosition), sourceCallback);
    }

    public void onStop() {
        this.consumerManager.removeConsumer(this.consumer);
    }

    @OnError
    public void onError() {
        logger.debug("Kinesis records processed with errors.");
    }

    @OnSuccess
    public void onSuccess() {
        logger.debug("Kinesis records processed successfully.");
    }

    @OnTerminate
    public void onTerminate(SourceResult sourceResult) {
        if (this.commonParams.isCheckpointOnComplete()) {
            logger.debug("Checkpointing the record processor position.");
            String shardId = (String)sourceResult.getSourceCallbackContext().getVariable("SHARD_ID").orElseThrow(() -> new IllegalStateException("Shard identifier not found in the consumer mule message."));
            List sequenceNumber = (List)sourceResult.getSourceCallbackContext().getVariable("SEQUENCE_NUMBER").orElseThrow(() -> new IllegalStateException("Sequence number not found in the consumer mule message."));
            try {
                for (String seqNumber : sequenceNumber) {
                    logger.debug("Checkpointing the record processor position. '{}' : '{}'", (Object)shardId, (Object)sequenceNumber);
                    this.consumer.checkpoint(shardId, seqNumber);
                }
            }
            catch (Exception e) {
                logger.warn("An error occurred during the automatic checkpointing of the current record processor position: {}\n Continuing...", (Object)e.getMessage());
            }
        }
    }
}

