/*
 * Decompiled with CFR 0.152.
 */
package com.kdgregory.logging.aws.kinesis;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.kdgregory.logging.aws.internal.AbstractLogWriter;
import com.kdgregory.logging.aws.internal.Utils;
import com.kdgregory.logging.aws.kinesis.KinesisWriterConfig;
import com.kdgregory.logging.aws.kinesis.KinesisWriterStatistics;
import com.kdgregory.logging.common.LogMessage;
import com.kdgregory.logging.common.factories.ClientFactory;
import com.kdgregory.logging.common.util.InternalLogger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.regex.Pattern;

public class KinesisLogWriter
extends AbstractLogWriter<KinesisWriterConfig, KinesisWriterStatistics, AmazonKinesis> {
    public static final String RANDOM_PARTITION_KEY_CONFIG = "{random}";
    private static final int DESCRIBE_TRIES = 300;
    private static final int DESCRIBE_SLEEP = 100;
    private static final int STREAM_ACTIVE_TRIES = 240;
    private static final long STREAM_ACTIVE_SLEEP = 250L;
    private static final int CREATE_RETRY_LIMIT = 12;
    private static final int CREATE_RETRY_SLEEP = 5000;
    private int partitionKeyLength;
    private boolean randomPartitionKeys;
    private Random rnd = new Random();

    public KinesisLogWriter(KinesisWriterConfig config, KinesisWriterStatistics stats, InternalLogger logger, ClientFactory<AmazonKinesis> clientFactory) {
        super(config, stats, logger, clientFactory);
        this.randomPartitionKeys = RANDOM_PARTITION_KEY_CONFIG.equals(config.partitionKey) || "".equals(config.partitionKey) || null == config.partitionKey;
        this.partitionKeyLength = this.randomPartitionKeys ? 8 : config.partitionKey.getBytes(StandardCharsets.UTF_8).length;
        stats.setActualStreamName(config.streamName);
    }

    @Override
    public boolean isMessageTooLarge(LogMessage message) {
        return this.effectiveSize(message) > 0x100000;
    }

    @Override
    public int maxMessageSize() {
        return 0x100000 - this.partitionKeyLength;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    protected boolean ensureDestinationAvailable() {
        if (!Pattern.matches("[a-zA-Z0-9_.-]{1,128}", ((KinesisWriterConfig)this.config).streamName)) {
            this.reportError("invalid stream name: " + ((KinesisWriterConfig)this.config).streamName, null);
            return false;
        }
        if (((KinesisWriterConfig)this.config).partitionKey == null || ((KinesisWriterConfig)this.config).partitionKey.length() > 256) {
            this.reportError("invalid partition key: length must be 1-256", null);
            return false;
        }
        try {
            String streamStatus = this.getStreamStatus();
            if (streamStatus != null) {
                if (StreamStatus.ACTIVE.toString().equals(streamStatus)) return true;
                this.waitForStreamToBeActive();
                return true;
            }
            if (((KinesisWriterConfig)this.config).autoCreate) {
                this.createStream();
                this.waitForStreamToBeActive();
                this.setRetentionPeriodIfNeeded();
                return true;
            }
            this.reportError("stream \"" + ((KinesisWriterConfig)this.config).streamName + "\" does not exist and auto-create not enabled", null);
            return false;
        }
        catch (Exception ex) {
            this.reportError("unable to configure stream: " + ((KinesisWriterConfig)this.config).streamName, ex);
            return false;
        }
    }

    @Override
    protected List<LogMessage> sendBatch(List<LogMessage> currentBatch) {
        PutRecordsRequest request = this.convertBatchToRequest(currentBatch);
        if (request != null) {
            List<Integer> failures = this.attemptToSend(request);
            return this.extractFailures(currentBatch, failures);
        }
        return Collections.emptyList();
    }

    @Override
    protected int effectiveSize(LogMessage message) {
        return message.size() + this.partitionKeyLength;
    }

    @Override
    protected boolean withinServiceLimits(int batchBytes, int numMessages) {
        return batchBytes < 0x500000 && numMessages <= 500;
    }

    @Override
    protected void stopAWSClient() {
        ((AmazonKinesis)this.client).shutdown();
    }

    private void createStream() {
        for (int retry = 0; retry < 12; ++retry) {
            try {
                this.logger.debug("creating Kinesis stream: " + ((KinesisWriterConfig)this.config).streamName + " with " + ((KinesisWriterConfig)this.config).shardCount + " shards");
                CreateStreamRequest request = new CreateStreamRequest().withStreamName(((KinesisWriterConfig)this.config).streamName).withShardCount(Integer.valueOf(((KinesisWriterConfig)this.config).shardCount));
                ((AmazonKinesis)this.client).createStream(request);
                return;
            }
            catch (ResourceInUseException ignored) {
                return;
            }
            catch (LimitExceededException ex) {
                Utils.sleepQuietly(5000L);
                continue;
            }
        }
        throw new IllegalStateException("unable to create stream after 12 tries");
    }

    private void waitForStreamToBeActive() {
        for (int ii = 0; ii < 240; ++ii) {
            if (StreamStatus.ACTIVE.toString().equals(this.getStreamStatus())) {
                return;
            }
            Utils.sleepQuietly(250L);
        }
        throw new IllegalStateException("stream did not become active within 240 seconds");
    }

    private String getStreamStatus() {
        for (int ii = 0; ii < 300; ++ii) {
            try {
                DescribeStreamRequest request = new DescribeStreamRequest().withStreamName(((KinesisWriterConfig)this.config).streamName);
                DescribeStreamResult response = ((AmazonKinesis)this.client).describeStream(request);
                return response.getStreamDescription().getStreamStatus();
            }
            catch (ResourceNotFoundException ex) {
                return null;
            }
            catch (LimitExceededException ex) {
                Utils.sleepQuietly(100L);
                continue;
            }
        }
        throw new IllegalStateException("unable to describe stream after 30 seconds");
    }

    private void setRetentionPeriodIfNeeded() {
        if (((KinesisWriterConfig)this.config).retentionPeriod != null) {
            try {
                ((AmazonKinesis)this.client).increaseStreamRetentionPeriod(new IncreaseStreamRetentionPeriodRequest().withStreamName(((KinesisWriterConfig)this.config).streamName).withRetentionPeriodHours(((KinesisWriterConfig)this.config).retentionPeriod));
                this.waitForStreamToBeActive();
            }
            catch (InvalidArgumentException invalidArgumentException) {
                // empty catch block
            }
        }
    }

    private PutRecordsRequest convertBatchToRequest(List<LogMessage> batch) {
        if (batch.isEmpty()) {
            return null;
        }
        ArrayList<PutRecordsRequestEntry> requestRecords = new ArrayList<PutRecordsRequestEntry>(batch.size());
        for (LogMessage message : batch) {
            requestRecords.add(new PutRecordsRequestEntry().withPartitionKey(this.partitionKey()).withData(ByteBuffer.wrap(message.getBytes())));
        }
        return new PutRecordsRequest().withStreamName(((KinesisWriterConfig)this.config).streamName).withRecords(requestRecords);
    }

    private List<Integer> attemptToSend(PutRecordsRequest request) {
        ArrayList<Integer> failures = new ArrayList<Integer>(request.getRecords().size());
        try {
            PutRecordsResult response = ((AmazonKinesis)this.client).putRecords(request);
            int ii = 0;
            for (PutRecordsResultEntry entry : response.getRecords()) {
                if (entry.getErrorCode() != null) {
                    failures.add(ii);
                }
                ++ii;
            }
            return failures;
        }
        catch (ResourceNotFoundException ex) {
            this.reportError("failed to send batch: stream " + request.getStreamName() + " no longer exists", null);
        }
        catch (Exception ex) {
            this.reportError("failed to send batch", ex);
        }
        for (int ii = 0; ii < request.getRecords().size(); ++ii) {
            failures.add(ii);
        }
        return failures;
    }

    private List<LogMessage> extractFailures(List<LogMessage> currentBatch, List<Integer> failureIndexes) {
        if (!failureIndexes.isEmpty()) {
            ArrayList<LogMessage> messages = new ArrayList<LogMessage>(failureIndexes.size());
            for (Integer idx : failureIndexes) {
                messages.add(currentBatch.get(idx));
            }
            return messages;
        }
        return Collections.emptyList();
    }

    private String partitionKey() {
        if (this.randomPartitionKeys) {
            StringBuilder sb = new StringBuilder(16);
            for (int ii = 0; ii < this.partitionKeyLength; ++ii) {
                sb.append((char)(48 + this.rnd.nextInt(10)));
            }
            return sb.toString();
        }
        return ((KinesisWriterConfig)this.config).partitionKey;
    }
}

