/*
 * Decompiled with CFR 0.152.
 */
package com.kdgregory.log4j.aws.internal.kinesis;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.kdgregory.log4j.aws.internal.kinesis.KinesisAppenderStatistics;
import com.kdgregory.log4j.aws.internal.kinesis.KinesisWriterConfig;
import com.kdgregory.log4j.aws.internal.shared.AbstractLogWriter;
import com.kdgregory.log4j.aws.internal.shared.LogMessage;
import com.kdgregory.log4j.aws.internal.shared.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.regex.Pattern;
import org.apache.log4j.helpers.LogLog;

public class KinesisLogWriter
extends AbstractLogWriter {
    private static final int DESCRIBE_TRIES = 300;
    private static final int DESCRIBE_SLEEP = 100;
    private static final int STREAM_ACTIVE_TRIES = 240;
    private static final long STREAM_ACTIVE_SLEEP = 250L;
    private static final int SEND_RETRY_LIMIT = 3;
    private static final int CREATE_RETRY_LIMIT = 12;
    private static final int CREATE_RETRY_SLEEP = 5000;
    private KinesisWriterConfig config;
    private KinesisAppenderStatistics stats;
    protected AmazonKinesis client;
    private Random rnd = new Random();

    public KinesisLogWriter(KinesisWriterConfig config, KinesisAppenderStatistics stats) {
        super(stats, config.batchDelay, config.discardThreshold, config.discardAction);
        this.config = config;
        this.stats = stats;
        stats.setActualStreamName(config.streamName);
    }

    @Override
    protected void createAWSClient() {
        this.client = this.tryClientFactory(this.config.clientFactoryMethod, AmazonKinesis.class, true);
        if (this.client == null && this.config.clientEndpoint == null) {
            this.client = this.tryClientFactory("com.amazonaws.services.kinesis.AmazonKinesisClientBuilder.defaultClient", AmazonKinesis.class, false);
        }
        if (this.client == null) {
            LogLog.debug((String)(this.getClass().getSimpleName() + ": creating service client via constructor"));
            this.client = (AmazonKinesis)this.tryConfigureEndpointOrRegion(new AmazonKinesisClient(), this.config.clientEndpoint);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    protected boolean ensureDestinationAvailable() {
        if (!Pattern.matches("[a-zA-Z0-9_.-]{1,128}", this.config.streamName)) {
            return this.initializationFailure("invalid stream name: " + this.config.streamName, null);
        }
        if (this.config.partitionKey == null || this.config.partitionKey.length() > 256) {
            return this.initializationFailure("invalid partition key: length must be 1-256", null);
        }
        try {
            String streamStatus = this.getStreamStatus();
            if (streamStatus == null) {
                if (!this.config.autoCreate) return this.initializationFailure("stream \"" + this.config.streamName + "\" does not exist and auto-create not enabled", null);
                this.createStream();
                this.waitForStreamToBeActive();
                this.setRetentionPeriodIfNeeded();
                return true;
            } else {
                if (StreamStatus.ACTIVE.toString().equals(streamStatus)) return true;
                this.waitForStreamToBeActive();
            }
            return true;
        }
        catch (Exception ex) {
            return this.initializationFailure("unable to configure stream: " + this.config.streamName, ex);
        }
    }

    @Override
    protected List<LogMessage> processBatch(List<LogMessage> currentBatch) {
        PutRecordsRequest request = this.convertBatchToRequest(currentBatch);
        if (request != null) {
            List<Integer> failures = this.attemptToSend(request);
            return this.extractFailures(currentBatch, failures);
        }
        return Collections.emptyList();
    }

    @Override
    protected int effectiveSize(LogMessage message) {
        return message.size() + this.config.partitionKeyLength;
    }

    @Override
    protected boolean withinServiceLimits(int batchBytes, int numMessages) {
        return batchBytes < 0x500000 && numMessages < 500;
    }

    private void createStream() {
        for (int retry = 0; retry < 12; ++retry) {
            try {
                LogLog.debug((String)("creating Kinesis stream: " + this.config.streamName + " with " + this.config.shardCount + " shards"));
                CreateStreamRequest request = new CreateStreamRequest().withStreamName(this.config.streamName).withShardCount(Integer.valueOf(this.config.shardCount));
                this.client.createStream(request);
                return;
            }
            catch (ResourceInUseException ignored) {
                return;
            }
            catch (LimitExceededException ex) {
                Utils.sleepQuietly(5000L);
                continue;
            }
        }
        throw new IllegalStateException("unable to create stream after 12 tries");
    }

    private void waitForStreamToBeActive() {
        for (int ii = 0; ii < 240; ++ii) {
            if (StreamStatus.ACTIVE.toString().equals(this.getStreamStatus())) {
                return;
            }
            Utils.sleepQuietly(250L);
        }
        throw new IllegalStateException("stream did not become active within 240 seconds");
    }

    private String getStreamStatus() {
        for (int ii = 0; ii < 300; ++ii) {
            try {
                DescribeStreamRequest request = new DescribeStreamRequest().withStreamName(this.config.streamName);
                DescribeStreamResult response = this.client.describeStream(request);
                return response.getStreamDescription().getStreamStatus();
            }
            catch (ResourceNotFoundException ex) {
                return null;
            }
            catch (LimitExceededException ex) {
                Utils.sleepQuietly(100L);
                continue;
            }
        }
        throw new IllegalStateException("unable to describe stream after 30 seconds");
    }

    private void setRetentionPeriodIfNeeded() {
        if (this.config.retentionPeriod != null) {
            try {
                this.client.increaseStreamRetentionPeriod(new IncreaseStreamRetentionPeriodRequest().withStreamName(this.config.streamName).withRetentionPeriodHours(this.config.retentionPeriod));
                this.waitForStreamToBeActive();
            }
            catch (InvalidArgumentException invalidArgumentException) {
                // empty catch block
            }
        }
    }

    private PutRecordsRequest convertBatchToRequest(List<LogMessage> batch) {
        if (batch.isEmpty()) {
            return null;
        }
        ArrayList<PutRecordsRequestEntry> requestRecords = new ArrayList<PutRecordsRequestEntry>(batch.size());
        for (LogMessage message : batch) {
            requestRecords.add(new PutRecordsRequestEntry().withPartitionKey(this.partitionKey()).withData(ByteBuffer.wrap(message.getBytes())));
        }
        return new PutRecordsRequest().withStreamName(this.config.streamName).withRecords(requestRecords);
    }

    private List<Integer> attemptToSend(PutRecordsRequest request) {
        ArrayList<Integer> failures = new ArrayList<Integer>(request.getRecords().size());
        for (int attempt = 0; attempt < 3; ++attempt) {
            try {
                PutRecordsResult response = this.client.putRecords(request);
                int ii = 0;
                for (PutRecordsResultEntry entry : response.getRecords()) {
                    if (entry.getErrorCode() != null) {
                        failures.add(ii);
                    }
                    ++ii;
                }
                this.stats.updateMessagesSent(request.getRecords().size() - failures.size());
                return failures;
            }
            catch (Exception ex) {
                this.stats.setLastError(null, ex);
                Utils.sleepQuietly(250 * (attempt + 1));
                continue;
            }
        }
        LogLog.error((String)"failed to send batch after 3 retries", (Throwable)this.stats.getLastError());
        for (int ii = 0; ii < request.getRecords().size(); ++ii) {
            failures.add(ii);
        }
        return failures;
    }

    private List<LogMessage> extractFailures(List<LogMessage> currentBatch, List<Integer> failureIndexes) {
        if (!failureIndexes.isEmpty()) {
            ArrayList<LogMessage> messages = new ArrayList<LogMessage>(failureIndexes.size());
            for (Integer idx : failureIndexes) {
                messages.add(currentBatch.get(idx));
            }
            return messages;
        }
        return Collections.emptyList();
    }

    private String partitionKey() {
        if ("".equals(this.config.partitionKey)) {
            StringBuilder sb = new StringBuilder(16);
            for (int ii = 0; ii < this.config.partitionKeyLength; ++ii) {
                sb.append((char)(48 + this.rnd.nextInt(10)));
            }
            return sb.toString();
        }
        return this.config.partitionKey;
    }
}

