/*
 * Decompiled with CFR 0.152.
 */
package com.kdgregory.logging.aws.kinesis;

import com.kdgregory.logging.aws.facade.KinesisFacade;
import com.kdgregory.logging.aws.facade.KinesisFacadeException;
import com.kdgregory.logging.aws.internal.AbstractLogWriter;
import com.kdgregory.logging.aws.kinesis.KinesisConstants;
import com.kdgregory.logging.aws.kinesis.KinesisWriterConfig;
import com.kdgregory.logging.aws.kinesis.KinesisWriterStatistics;
import com.kdgregory.logging.common.LogMessage;
import com.kdgregory.logging.common.util.InternalLogger;
import com.kdgregory.logging.common.util.RetryManager2;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Consumer;

public class KinesisLogWriter
extends AbstractLogWriter<KinesisWriterConfig, KinesisWriterStatistics> {
    public static final String RANDOM_PARTITION_KEY_CONFIG = "{random}";
    private KinesisFacade facade;
    protected RetryManager2 describeRetry = new RetryManager2("describe", Duration.ofMillis(50L));
    protected RetryManager2 createRetry = new RetryManager2("create", Duration.ofMillis(200L));
    protected RetryManager2 postCreateRetry = new RetryManager2("describe", Duration.ofMillis(200L), false, true);
    protected Duration sendTimeout = Duration.ofMillis(2000L);
    protected RetryManager2 sendRetry = new RetryManager2("send", Duration.ofMillis(200L));

    public KinesisLogWriter(KinesisWriterConfig config, KinesisWriterStatistics stats, InternalLogger logger, KinesisFacade facade) {
        super(config, stats, logger);
        this.facade = facade;
        stats.setActualStreamName(config.getStreamName());
    }

    @Override
    public int maxMessageSize() {
        return 0x100000 - ((KinesisWriterConfig)this.config).getPartitionKeyHelper().getLength();
    }

    @Override
    protected boolean ensureDestinationAvailable() {
        List<String> configErrors = ((KinesisWriterConfig)this.config).validate();
        if (!configErrors.isEmpty()) {
            for (String error : configErrors) {
                this.reportError("configuration error: " + error, null);
            }
            return false;
        }
        Instant timeoutAt = Instant.now().plusMillis(((KinesisWriterConfig)this.config).getInitializationTimeout());
        try {
            this.logger.debug("checking status of stream: " + ((KinesisWriterConfig)this.config).getStreamName());
            KinesisConstants.StreamStatus status = this.describeRetry.invoke(timeoutAt, () -> this.facade.retrieveStreamStatus());
            if (status == KinesisConstants.StreamStatus.ACTIVE) {
                return true;
            }
            if (status == KinesisConstants.StreamStatus.DOES_NOT_EXIST) {
                if (((KinesisWriterConfig)this.config).getAutoCreate()) {
                    return this.createStream(timeoutAt) && this.setRetentionPeriod(timeoutAt);
                }
                this.reportError("stream \"" + ((KinesisWriterConfig)this.config).getStreamName() + "\" does not exist and auto-create not enabled", null);
                return false;
            }
            return this.waitForStreamToBeActive(timeoutAt);
        }
        catch (Exception ex) {
            this.reportError("exception during initialization", ex);
            return false;
        }
    }

    @Override
    protected List<LogMessage> sendBatch(List<LogMessage> currentBatch) {
        ((KinesisWriterStatistics)this.stats).setLastBatchSize(currentBatch.size());
        if (((KinesisWriterConfig)this.config).getEnableBatchLogging()) {
            this.logger.debug("about to write batch of " + currentBatch.size() + " message(s)");
        }
        if (currentBatch.isEmpty()) {
            return currentBatch;
        }
        try {
            List result = this.sendRetry.invoke(this.sendTimeout, () -> {
                try {
                    List<LogMessage> unsent = this.facade.putRecords(currentBatch);
                    if (((KinesisWriterConfig)this.config).getEnableBatchLogging()) {
                        this.logger.debug("wrote batch of " + currentBatch.size() + " message(s); " + unsent.size() + " rejected");
                    }
                    return unsent;
                }
                catch (KinesisFacadeException ex) {
                    if (ex.getReason() == KinesisFacadeException.ReasonCode.THROTTLING) {
                        ((KinesisWriterStatistics)this.stats).incrementThrottledWrites();
                        return null;
                    }
                    if (!ex.isRetryable()) {
                        throw ex;
                    }
                    return null;
                }
            });
            if (result == null) {
                this.logger.warn("timeout while sending batch");
                return currentBatch;
            }
            return result;
        }
        catch (Exception ex) {
            this.logger.error("exception while sending batch", ex);
            return currentBatch;
        }
    }

    @Override
    protected int effectiveSize(LogMessage message) {
        return message.size() + ((KinesisWriterConfig)this.config).getPartitionKeyHelper().getLength();
    }

    @Override
    protected boolean withinServiceLimits(int batchBytes, int numMessages) {
        return batchBytes < 0x500000 && numMessages <= 500;
    }

    @Override
    protected void stopAWSClient() {
        this.facade.shutdown();
    }

    private boolean createStream(Instant timeoutAt) {
        this.logger.debug("creating kinesis stream: " + ((KinesisWriterConfig)this.config).getStreamName());
        this.createRetry.invoke(timeoutAt, () -> {
            this.facade.createStream();
            return Boolean.TRUE;
        }, (Consumer<RuntimeException>)new DefaultExceptionHandler());
        return this.waitForStreamToBeActive(timeoutAt);
    }

    private boolean setRetentionPeriod(Instant timeoutAt) {
        if (((KinesisWriterConfig)this.config).getRetentionPeriod() == null) {
            return true;
        }
        this.logger.debug("setting retention period on stream \"" + ((KinesisWriterConfig)this.config).getStreamName() + "\" to " + ((KinesisWriterConfig)this.config).getRetentionPeriod() + " hours");
        this.createRetry.invoke(timeoutAt, () -> {
            this.facade.setRetentionPeriod();
            return Boolean.TRUE;
        }, (Consumer<RuntimeException>)new DefaultExceptionHandler());
        return this.waitForStreamToBeActive(timeoutAt);
    }

    private boolean waitForStreamToBeActive(Instant timeoutAt) {
        this.logger.debug("waiting for stream " + ((KinesisWriterConfig)this.config).getStreamName() + " to become active");
        KinesisConstants.StreamStatus status = this.postCreateRetry.invoke(timeoutAt, () -> {
            KinesisConstants.StreamStatus check = this.facade.retrieveStreamStatus();
            return check == KinesisConstants.StreamStatus.ACTIVE ? check : null;
        });
        if (status != KinesisConstants.StreamStatus.ACTIVE) {
            this.logger.error("timeout waiting for stream " + ((KinesisWriterConfig)this.config).getStreamName() + " to become active", null);
            return false;
        }
        return true;
    }

    private static class DefaultExceptionHandler
    implements Consumer<RuntimeException> {
        private DefaultExceptionHandler() {
        }

        @Override
        public void accept(RuntimeException ex) {
            if (ex instanceof KinesisFacadeException && ((KinesisFacadeException)ex).isRetryable()) {
                return;
            }
            throw ex;
        }
    }
}

