/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connector.amazon.kinesis.internal.source.processor;

import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.mulesoft.connector.amazon.kinesis.api.RecordAttributes;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.DynamoDbThroughputExceededException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.IllegalStateException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException;
import com.mulesoft.connector.amazon.kinesis.internal.source.consumer.Consumer;
import com.mulesoft.connector.amazon.kinesis.internal.source.consumer.ConsumerImpl;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class KinesisRecordProcessor
implements ShardRecordProcessor {
    public static final String CONTEXT_SHARD_ID_VARIABLE = "SHARD_ID";
    private static final Logger logger = LoggerFactory.getLogger(KinesisRecordProcessor.class);
    public static final String CONTEXT_SEQUENCE_NUMBER_VARIABLE = "SEQUENCE_NUMBER";
    public static final String CONTEXT_SUBSEQUENCE_NUMBER_VARIABLE = "SUB_SEQUENCE_NUMBER";
    private final SourceCallback<List<Result<InputStream, RecordAttributes>>, Void> sourceCallback;
    private final Consumer consumer;
    private volatile RecordProcessorCheckpointer checkpointer;
    private String shardId;

    public KinesisRecordProcessor(SourceCallback<List<Result<InputStream, RecordAttributes>>, Void> sourceCallback, Consumer consumer) {
        this.sourceCallback = sourceCallback;
        this.consumer = consumer;
    }

    public void initialize(InitializationInput initializationInput) {
        this.shardId = initializationInput.shardId();
        logger.debug("Initialized record processor for shard: {}", (Object)this.shardId);
        ((ConsumerImpl)this.consumer).subscribeRecordProcessor(this.shardId, this);
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        this.checkpointer = processRecordsInput.checkpointer();
        Result processedRecord = Result.builder().output(processRecordsInput.records().stream().map(rec -> Result.builder().output(rec.data() != null ? new ByteBufferBackedInputStream(rec.data()) : new ByteArrayInputStream(new byte[0])).attributes((Object)this.createResponseAttributes((KinesisClientRecord)rec)).build()).collect(Collectors.toList())).build();
        SourceCallbackContext callbackContext = this.sourceCallback.createContext();
        List sequenceNumbers = ((List)processedRecord.getOutput()).stream().map(rec -> rec.getAttributes().flatMap(attributes -> Optional.ofNullable(attributes.getSequenceNumber()))).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
        callbackContext.addVariable(CONTEXT_SHARD_ID_VARIABLE, (Object)this.shardId);
        callbackContext.addVariable(CONTEXT_SEQUENCE_NUMBER_VARIABLE, sequenceNumbers);
        this.sourceCallback.handle(processedRecord, callbackContext);
    }

    public void leaseLost(LeaseLostInput leaseLostInput) {
        this.checkpointer = null;
        logger.debug("Lost lease, terminating record processor for shard ID: '{}'.", (Object)this.shardId);
    }

    public void shardEnded(ShardEndedInput shardEndedInput) {
        logger.debug("Shard '{}' ended, checkpointing the position.", (Object)this.shardId);
        try {
            shardEndedInput.checkpointer().checkpoint();
        }
        catch (InvalidStateException | ShutdownException e) {
            logger.warn(String.format("Error occurred when trying to checkpoint the processor's position after its shard '%s' ended.", this.shardId), e);
        }
    }

    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        logger.debug("Scheduler for shard '{}' is shutting down.", (Object)this.shardId);
        this.consumer.unsubscribeRecordProcessor(this.shardId);
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        }
        catch (InvalidStateException | ShutdownException e) {
            logger.warn("Exception while checkpointing at requested shutdown. Giving up.", e);
        }
    }

    public void checkpoint() {
        if (this.checkpointer == null) {
            logger.warn("Unable to checkpoint the record processor related to shard '{}' because it's not actively processing records from that shard.", (Object)this.shardId);
            return;
        }
        this.checkpoint(null, null);
    }

    public void checkpoint(String sequenceNumber, Long subsequenceNumber) {
        try {
            if (this.checkpointer == null) {
                throw new IllegalStateException("Attempted to checkpoint the position on a record processor which has not consumed any records yet.");
            }
            if (subsequenceNumber != null) {
                this.checkpointer.checkpoint(sequenceNumber, subsequenceNumber.longValue());
            } else if (sequenceNumber != null) {
                this.checkpointer.checkpoint(sequenceNumber);
            } else {
                this.checkpointer.checkpoint();
            }
        }
        catch (ThrottlingException e) {
            throw new DynamoDbThroughputExceededException(String.format("DynamoDB write limit reached when checkpointing for shard '%s'. %s", this.shardId, e.getMessage()), e);
        }
        catch (InvalidStateException | ShutdownException e) {
            throw new IllegalStateException(String.format("Record processor consuming shard '%s' cannot be checkpointed currently. %s", this.shardId, e.getMessage()), e);
        }
        catch (IllegalArgumentException e) {
            if (subsequenceNumber == null) {
                throw new InvalidArgumentException(String.format("The sequence number '%s' is not a valid sequence number for a record in the shard '%s'.", sequenceNumber, this.shardId), e);
            }
            throw new InvalidArgumentException(String.format("Unable to checkpoint the reading position in shard '%s'. %s", this.shardId, e.getMessage()), e);
        }
    }

    private RecordAttributes createResponseAttributes(KinesisClientRecord rec) {
        return new RecordAttributes(rec.sequenceNumber(), rec.subSequenceNumber(), rec.approximateArrivalTimestamp().toString(), rec.partitionKey(), rec.encryptionType() != null ? rec.encryptionType().toString() : null, rec.aggregated(), this.shardId);
    }

    public boolean hasCheckpointer() {
        return this.checkpointer != null;
    }
}

