/*
 * Decompiled with CFR 0.152.
 */
package com.kdgregory.logging.aws.kinesis;

import com.kdgregory.logging.aws.facade.KinesisFacade;
import com.kdgregory.logging.aws.facade.KinesisFacadeException;
import com.kdgregory.logging.aws.internal.AbstractLogWriter;
import com.kdgregory.logging.aws.kinesis.KinesisConstants;
import com.kdgregory.logging.aws.kinesis.KinesisWriterConfig;
import com.kdgregory.logging.aws.kinesis.KinesisWriterStatistics;
import com.kdgregory.logging.common.LogMessage;
import com.kdgregory.logging.common.util.InternalLogger;
import com.kdgregory.logging.common.util.RetryManager;
import java.util.List;
import java.util.function.Consumer;

public class KinesisLogWriter
extends AbstractLogWriter<KinesisWriterConfig, KinesisWriterStatistics> {
    public static final String RANDOM_PARTITION_KEY_CONFIG = "{random}";
    private KinesisFacade facade;
    protected RetryManager describeRetry = new RetryManager(50L, 1000L, true);
    protected RetryManager createRetry = new RetryManager(200L, 5000L, true);
    protected RetryManager postCreateRetry = new RetryManager(200L, 120000L, true);
    protected RetryManager sendRetry = new RetryManager(200L, 2000L, true);

    public KinesisLogWriter(KinesisWriterConfig config, KinesisWriterStatistics stats, InternalLogger logger, KinesisFacade facade) {
        super(config, stats, logger);
        this.facade = facade;
        stats.setActualStreamName(config.getStreamName());
    }

    @Override
    public int maxMessageSize() {
        return 0x100000 - ((KinesisWriterConfig)this.config).getPartitionKeyHelper().getLength();
    }

    @Override
    protected boolean ensureDestinationAvailable() {
        List<String> configErrors = ((KinesisWriterConfig)this.config).validate();
        if (!configErrors.isEmpty()) {
            for (String error : configErrors) {
                this.reportError("configuration error: " + error, null);
            }
            return false;
        }
        try {
            KinesisConstants.StreamStatus status = this.describeRetry.invoke(() -> this.facade.retrieveStreamStatus());
            if (status == KinesisConstants.StreamStatus.ACTIVE) {
                return true;
            }
            if (status == KinesisConstants.StreamStatus.DOES_NOT_EXIST) {
                if (((KinesisWriterConfig)this.config).getAutoCreate()) {
                    return this.createStream() && this.setRetentionPeriod();
                }
                this.reportError("stream \"" + ((KinesisWriterConfig)this.config).getStreamName() + "\" does not exist and auto-create not enabled", null);
                return false;
            }
            return this.waitForStreamToBeActive();
        }
        catch (Exception ex) {
            this.reportError("unable to configure stream: " + ((KinesisWriterConfig)this.config).getStreamName(), ex);
            return false;
        }
    }

    @Override
    protected List<LogMessage> sendBatch(List<LogMessage> currentBatch) {
        try {
            List result = this.sendRetry.invoke(() -> {
                try {
                    return this.facade.putRecords(currentBatch);
                }
                catch (KinesisFacadeException ex) {
                    if (ex.getReason() == KinesisFacadeException.ReasonCode.THROTTLING) {
                        ((KinesisWriterStatistics)this.stats).incrementThrottledWrites();
                        return null;
                    }
                    if (!ex.isRetryable()) {
                        throw ex;
                    }
                    return null;
                }
            });
            if (result == null) {
                this.logger.warn("timeout while sending batch");
                return currentBatch;
            }
            return result;
        }
        catch (Exception ex) {
            this.logger.error("exception while sending batch", ex);
            return currentBatch;
        }
    }

    @Override
    protected int effectiveSize(LogMessage message) {
        return message.size() + ((KinesisWriterConfig)this.config).getPartitionKeyHelper().getLength();
    }

    @Override
    protected boolean withinServiceLimits(int batchBytes, int numMessages) {
        return batchBytes < 0x500000 && numMessages <= 500;
    }

    @Override
    protected void stopAWSClient() {
        this.facade.shutdown();
    }

    private boolean createStream() {
        this.logger.debug("creating kinesis stream: " + ((KinesisWriterConfig)this.config).getStreamName());
        this.createRetry.invoke(() -> {
            this.facade.createStream();
            return Boolean.TRUE;
        }, new DefaultExceptionHandler());
        return this.waitForStreamToBeActive();
    }

    private boolean setRetentionPeriod() {
        if (((KinesisWriterConfig)this.config).getRetentionPeriod() == null) {
            return true;
        }
        this.logger.debug("setting retention period on stream \"" + ((KinesisWriterConfig)this.config).getStreamName() + "\" to " + ((KinesisWriterConfig)this.config).getRetentionPeriod() + " hours");
        this.createRetry.invoke(() -> {
            this.facade.setRetentionPeriod();
            return Boolean.TRUE;
        }, new DefaultExceptionHandler());
        return this.waitForStreamToBeActive();
    }

    private boolean waitForStreamToBeActive() {
        KinesisConstants.StreamStatus status = this.postCreateRetry.invoke(() -> {
            KinesisConstants.StreamStatus check = this.facade.retrieveStreamStatus();
            return check == KinesisConstants.StreamStatus.ACTIVE ? check : null;
        });
        if (status != KinesisConstants.StreamStatus.ACTIVE) {
            this.logger.error("timeout waiting for stream " + ((KinesisWriterConfig)this.config).getStreamName() + " to become active", null);
            return false;
        }
        return true;
    }

    private static class DefaultExceptionHandler
    implements Consumer<RuntimeException> {
        private DefaultExceptionHandler() {
        }

        @Override
        public void accept(RuntimeException ex) {
            if (ex instanceof KinesisFacadeException && ((KinesisFacadeException)ex).isRetryable()) {
                return;
            }
            throw ex;
        }
    }
}

