/*
 * Decompiled with CFR 0.152.
 */
package com.kdgregory.logging.aws.facade.v2;

import com.kdgregory.logging.aws.facade.KinesisFacade;
import com.kdgregory.logging.aws.facade.KinesisFacadeException;
import com.kdgregory.logging.aws.facade.v2.internal.ClientFactory;
import com.kdgregory.logging.aws.internal.AbstractWriterConfig;
import com.kdgregory.logging.aws.kinesis.KinesisConstants;
import com.kdgregory.logging.aws.kinesis.KinesisWriterConfig;
import com.kdgregory.logging.common.LogMessage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;

public class KinesisFacadeImpl
implements KinesisFacade {
    private static final Map<String, KinesisConstants.StreamStatus> STATUS_LOOKUP = new HashMap<String, KinesisConstants.StreamStatus>();
    private KinesisWriterConfig config;
    private KinesisClient client;

    public KinesisFacadeImpl(KinesisWriterConfig config) {
        this.config = config;
    }

    public KinesisConstants.StreamStatus retrieveStreamStatus() {
        try {
            DescribeStreamSummaryRequest request = (DescribeStreamSummaryRequest)DescribeStreamSummaryRequest.builder().streamName(this.config.getStreamName()).build();
            DescribeStreamSummaryResponse response = this.client().describeStreamSummary(request);
            return STATUS_LOOKUP.get(response.streamDescriptionSummary().streamStatusAsString());
        }
        catch (ResourceNotFoundException ex) {
            return KinesisConstants.StreamStatus.DOES_NOT_EXIST;
        }
        catch (LimitExceededException ex) {
            return null;
        }
        catch (Exception ex) {
            throw this.transformException("retrieveStreamStatus", ex);
        }
    }

    public void createStream() {
        try {
            CreateStreamRequest request = (CreateStreamRequest)CreateStreamRequest.builder().streamName(this.config.getStreamName()).shardCount(Integer.valueOf(this.config.getShardCount())).build();
            this.client().createStream(request);
        }
        catch (ResourceInUseException ex) {
            return;
        }
        catch (Exception ex) {
            throw this.transformException("createStream", ex);
        }
    }

    public void setRetentionPeriod() {
        if (this.config.getRetentionPeriod() == null) {
            return;
        }
        try {
            IncreaseStreamRetentionPeriodRequest request = (IncreaseStreamRetentionPeriodRequest)IncreaseStreamRetentionPeriodRequest.builder().streamName(this.config.getStreamName()).retentionPeriodHours(this.config.getRetentionPeriod()).build();
            this.client().increaseStreamRetentionPeriod(request);
        }
        catch (Exception ex) {
            throw this.transformException("setRetentionPeriod", ex);
        }
    }

    public List<LogMessage> putRecords(List<LogMessage> batch) {
        if (batch.isEmpty()) {
            return batch;
        }
        try {
            PutRecordsRequest request = this.createPutRecordsRequest(batch);
            PutRecordsResponse response = this.client().putRecords(request);
            return this.extractPutRecordsFailures(batch, response);
        }
        catch (Exception ex) {
            throw this.transformException("putRecords", ex);
        }
    }

    public void shutdown() {
        this.client().close();
    }

    protected KinesisClient client() {
        if (this.client == null) {
            this.client = new ClientFactory<KinesisClient>(KinesisClient.class, (AbstractWriterConfig<?>)this.config).create();
        }
        return this.client;
    }

    private KinesisFacadeException transformException(String functionName, Exception ex) {
        boolean isRetryable;
        KinesisFacadeException.ReasonCode reason;
        String message;
        if (ex instanceof ProvisionedThroughputExceededException) {
            message = "throttled";
            reason = KinesisFacadeException.ReasonCode.THROTTLING;
            isRetryable = true;
        } else if (ex instanceof LimitExceededException) {
            message = "limit exceeded";
            reason = KinesisFacadeException.ReasonCode.LIMIT_EXCEEDED;
            isRetryable = true;
        } else if (ex instanceof ResourceInUseException) {
            message = "stream not active";
            reason = KinesisFacadeException.ReasonCode.INVALID_STATE;
            isRetryable = true;
        } else {
            message = "unexpected exception: " + ex.getMessage();
            reason = KinesisFacadeException.ReasonCode.UNEXPECTED_EXCEPTION;
            isRetryable = false;
        }
        return new KinesisFacadeException(message, (Throwable)ex, reason, isRetryable, functionName, new Object[]{this.config.getStreamName()});
    }

    private PutRecordsRequest createPutRecordsRequest(List<LogMessage> batch) {
        ArrayList<PutRecordsRequestEntry> requestRecords = new ArrayList<PutRecordsRequestEntry>(batch.size());
        for (LogMessage message : batch) {
            PutRecordsRequestEntry entry = (PutRecordsRequestEntry)PutRecordsRequestEntry.builder().partitionKey(this.config.getPartitionKeyHelper().getValue()).data(SdkBytes.fromByteArray((byte[])message.getBytes())).build();
            requestRecords.add(entry);
        }
        return (PutRecordsRequest)PutRecordsRequest.builder().streamName(this.config.getStreamName()).records(requestRecords).build();
    }

    private List<LogMessage> extractPutRecordsFailures(List<LogMessage> batch, PutRecordsResponse response) {
        ArrayList<LogMessage> result = new ArrayList<LogMessage>(batch.size());
        if (response.failedRecordCount() == null || response.failedRecordCount() == 0) {
            return result;
        }
        Iterator<LogMessage> lmItx = batch.iterator();
        Iterator rspItx = response.records().iterator();
        while (lmItx.hasNext() && rspItx.hasNext()) {
            LogMessage logMessage = lmItx.next();
            PutRecordsResultEntry entry = (PutRecordsResultEntry)rspItx.next();
            if (entry.errorCode() == null || entry.errorCode().isEmpty()) continue;
            result.add(logMessage);
        }
        return result;
    }

    static {
        STATUS_LOOKUP.put("ACTIVE", KinesisConstants.StreamStatus.ACTIVE);
        STATUS_LOOKUP.put("CREATING", KinesisConstants.StreamStatus.CREATING);
        STATUS_LOOKUP.put("DELETING", KinesisConstants.StreamStatus.DELETING);
        STATUS_LOOKUP.put("UPDATING", KinesisConstants.StreamStatus.UPDATING);
    }
}

