/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesisanalytics.flink.connectors.producer.impl;

import com.amazonaws.services.kinesisanalytics.flink.connectors.exception.FlinkKinesisFirehoseException;
import com.amazonaws.services.kinesisanalytics.flink.connectors.exception.RecordCouldNotBeSentException;
import com.amazonaws.services.kinesisanalytics.flink.connectors.exception.TimeoutExpiredException;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.impl.FirehoseProducerConfiguration;
import com.amazonaws.services.kinesisanalytics.shaded.com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
import com.amazonaws.services.kinesisanalytics.shaded.com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException;
import com.amazonaws.services.kinesisanalytics.shaded.com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisanalytics.shaded.com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
import com.amazonaws.services.kinesisanalytics.shaded.com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisanalytics.shaded.com.amazonaws.services.kinesisfirehose.model.Record;
import com.amazonaws.services.kinesisanalytics.shaded.com.amazonaws.services.kinesisfirehose.model.ServiceUnavailableException;
import java.util.ArrayDeque;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class FirehoseProducer<O extends UserRecordResult, R extends Record>
implements IProducer<O, R> {
    private static final Logger LOGGER = LoggerFactory.getLogger(FirehoseProducer.class);
    private final FirehoseProducerConfiguration configuration;
    private final AmazonKinesisFirehose firehoseClient;
    private final String deliveryStream;
    private final ExecutorService flusher;
    @GuardedBy(value="this")
    private final Object producerBufferLock = new Object();
    private volatile Queue<Record> producerBuffer;
    private volatile Queue<Record> flusherBuffer;
    private volatile long lastSucceededFlushTimestamp;
    private volatile boolean isDestroyed;
    private volatile boolean syncFlush;
    private volatile boolean isFlusherFailed;

    public FirehoseProducer(@Nonnull String deliveryStream, @Nonnull AmazonKinesisFirehose firehoseClient, @Nonnull Properties config) {
        this(deliveryStream, firehoseClient, FirehoseProducerConfiguration.builder(config).build());
    }

    public FirehoseProducer(@Nonnull String deliveryStream, @Nonnull AmazonKinesisFirehose firehoseClient, @Nonnull FirehoseProducerConfiguration configuration) {
        this.firehoseClient = (AmazonKinesisFirehose)Validate.notNull((Object)firehoseClient, (String)"Kinesis Firehose client cannot be null", (Object[])new Object[0]);
        this.deliveryStream = (String)Validate.notBlank((CharSequence)deliveryStream, (String)"Kinesis Firehose delivery stream cannot be null or empty.", (Object[])new Object[0]);
        this.configuration = configuration;
        this.producerBuffer = new ArrayDeque<Record>(configuration.getMaxBufferSize());
        this.flusherBuffer = new ArrayDeque<Record>(configuration.getMaxBufferSize());
        this.flusher = Executors.newSingleThreadExecutor(new FirehoseThreadFactory());
        this.flusher.submit(this::flushBuffer);
    }

    @Override
    public CompletableFuture<O> addUserRecord(R record) throws Exception {
        return this.addUserRecord(record, this.configuration.getMaxOperationTimeoutInMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<O> addUserRecord(R record, long timeoutInMillis) throws TimeoutExpiredException, InterruptedException {
        Validate.notNull(record, (String)"Record cannot be null.", (Object[])new Object[0]);
        Validate.isTrue((timeoutInMillis > 0L ? 1 : 0) != 0, (String)"Operation timeout should be > 0.", (Object[])new Object[0]);
        long operationTimeoutInNanos = TimeUnit.MILLISECONDS.toNanos(timeoutInMillis);
        Object object = this.producerBufferLock;
        synchronized (object) {
            long lastTimestamp = System.nanoTime();
            while (this.producerBuffer.size() >= this.configuration.getMaxBufferSize()) {
                if (System.nanoTime() - lastTimestamp >= operationTimeoutInNanos) {
                    throw new TimeoutExpiredException("Timeout has expired for the given operation");
                }
                if (this.flusherBuffer.isEmpty()) {
                    this.producerBufferLock.notify();
                }
                this.producerBufferLock.wait(this.configuration.getBufferFullWaitTimeoutInMillis());
            }
            this.producerBuffer.offer((Record)record);
            if (this.producerBuffer.size() >= this.configuration.getMaxBufferSize() && this.flusherBuffer.isEmpty()) {
                this.producerBufferLock.notify();
            }
        }
        UserRecordResult recordResult = new UserRecordResult().setSuccessful(true);
        return CompletableFuture.completedFuture(recordResult);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushBuffer() {
        this.lastSucceededFlushTimestamp = System.nanoTime();
        long bufferTimeoutInNanos = TimeUnit.MILLISECONDS.toNanos(this.configuration.getBufferTimeoutInMillis());
        while (true) {
            boolean timeoutFlush = System.nanoTime() - this.lastSucceededFlushTimestamp >= bufferTimeoutInNanos;
            Object object = this.producerBufferLock;
            synchronized (object) {
                Validate.validState((boolean)this.flusherBuffer.isEmpty());
                if (this.isDestroyed) {
                    return;
                }
                if (!(this.syncFlush || this.producerBuffer.size() >= this.configuration.getMaxBufferSize() || timeoutFlush && this.producerBuffer.size() > 0)) {
                    try {
                        this.producerBufferLock.wait(this.configuration.getBufferTimeoutBetweenFlushes());
                    }
                    catch (InterruptedException e) {
                        LOGGER.info("An interrupted exception has been thrown, while trying to sleep and release the lock during a flush.", (Throwable)e);
                    }
                    continue;
                }
                this.prepareRecordsToSubmit(this.producerBuffer, this.flusherBuffer);
                this.producerBufferLock.notify();
            }
            try {
                this.submitBatchWithRetry(this.flusherBuffer);
                ArrayDeque<Record> emptyFlushBuffer = new ArrayDeque<Record>(this.configuration.getMaxBufferSize());
                Object e = this.producerBufferLock;
                synchronized (e) {
                    Validate.validState((!this.flusherBuffer.isEmpty() ? 1 : 0) != 0);
                    this.flusherBuffer = emptyFlushBuffer;
                    if (this.syncFlush) {
                        this.syncFlush = false;
                        this.producerBufferLock.notify();
                    }
                }
            }
            catch (Exception ex) {
                String errorMsg = "An error has occurred while trying to send data to Kinesis Firehose.";
                if (ex instanceof AmazonKinesisFirehoseException && ((AmazonKinesisFirehoseException)ex).getStatusCode() == 413) {
                    LOGGER.error(errorMsg + "Batch of records too large. Please try to reduce your batch size by passing FIREHOSE_PRODUCER_BUFFER_MAX_SIZE into your configuration.", (Throwable)ex);
                } else {
                    LOGGER.error(errorMsg, (Throwable)ex);
                }
                Object object2 = this.producerBufferLock;
                synchronized (object2) {
                    this.isFlusherFailed = true;
                }
                throw ex;
            }
        }
    }

    private void prepareRecordsToSubmit(@Nonnull Queue<Record> sourceQueue, @Nonnull Queue<Record> targetQueue) {
        int total = 0;
        while (!sourceQueue.isEmpty() && total + sourceQueue.peek().getData().capacity() <= this.configuration.getMaxPutRecordBatchBytes()) {
            total += sourceQueue.peek().getData().capacity();
            targetQueue.add(sourceQueue.poll());
        }
    }

    private void submitBatchWithRetry(Queue<Record> records) throws AmazonKinesisFirehoseException, RecordCouldNotBeSentException {
        String warnMessage = null;
        for (int attempts = 0; attempts < this.configuration.getNumberOfRetries(); ++attempts) {
            try {
                LOGGER.debug("Trying to flush Buffer of size: {} on attempt: {}", (Object)records.size(), (Object)attempts);
                PutRecordBatchResult lastResult = this.submitBatch(records);
                if (lastResult.getFailedPutCount() == null || lastResult.getFailedPutCount() == 0) {
                    this.lastSucceededFlushTimestamp = System.nanoTime();
                    LOGGER.debug("Firehose Buffer has been flushed with size: {} on attempt: {}", (Object)records.size(), (Object)attempts);
                    return;
                }
                PutRecordBatchResponseEntry failedRecord = lastResult.getRequestResponses().stream().filter(r -> r.getRecordId() == null).findFirst().orElse(null);
                warnMessage = String.format("Number of failed records: %s.", lastResult.getFailedPutCount());
                if (failedRecord != null) {
                    warnMessage = String.format("Last Kinesis Firehose putRecordBatch encountered an error and failed trying to put: %s records with error: %s - %s.", lastResult.getFailedPutCount(), failedRecord.getErrorCode(), failedRecord.getErrorMessage());
                }
                LOGGER.warn(warnMessage);
                long timeToSleep = RandomUtils.nextLong((long)0L, (long)Math.min(this.configuration.getMaxBackOffInMillis(), this.configuration.getBaseBackOffInMillis() * 2L * (long)attempts));
                LOGGER.info("Sleeping for: {}ms on attempt: {}", (Object)timeToSleep, (Object)attempts);
                Thread.sleep(timeToSleep);
                continue;
            }
            catch (ServiceUnavailableException ex) {
                LOGGER.info("Kinesis Firehose has thrown a recoverable exception.", (Throwable)ex);
                continue;
            }
            catch (InterruptedException e) {
                LOGGER.info("An interrupted exception has been thrown between retry attempts.", (Throwable)e);
                continue;
            }
            catch (AmazonKinesisFirehoseException ex) {
                throw ex;
            }
        }
        throw new RecordCouldNotBeSentException("Exceeded number of attempts! " + warnMessage);
    }

    private PutRecordBatchResult submitBatch(Queue<Record> records) throws AmazonKinesisFirehoseException {
        LOGGER.debug("Sending {} records to Kinesis Firehose on stream: {}", (Object)records.size(), (Object)this.deliveryStream);
        PutRecordBatchResult result = this.firehoseClient.putRecordBatch(new PutRecordBatchRequest().withDeliveryStreamName(this.deliveryStream).withRecords(records));
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroy() throws Exception {
        Object object = this.producerBufferLock;
        synchronized (object) {
            this.isDestroyed = true;
            this.producerBuffer = null;
            this.producerBufferLock.notify();
        }
        if (!this.flusher.isShutdown() && !this.flusher.isTerminated()) {
            LOGGER.info("Shutting down scheduled service.");
            this.flusher.shutdown();
            try {
                LOGGER.info("Awaiting executor service termination...");
                this.flusher.awaitTermination(1L, TimeUnit.MINUTES);
            }
            catch (InterruptedException e) {
                String errorMsg = "Error waiting executor writer termination.";
                LOGGER.error("Error waiting executor writer termination.", (Throwable)e);
                throw new FlinkKinesisFirehoseException("Error waiting executor writer termination.", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isDestroyed() {
        Object object = this.producerBufferLock;
        synchronized (object) {
            return this.isDestroyed;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getOutstandingRecordsCount() {
        Object object = this.producerBufferLock;
        synchronized (object) {
            return this.producerBuffer.size() + this.flusherBuffer.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isFlushFailed() {
        Object object = this.producerBufferLock;
        synchronized (object) {
            return this.isFlusherFailed;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() {
        Object object = this.producerBufferLock;
        synchronized (object) {
            this.syncFlush = true;
            this.producerBufferLock.notify();
        }
    }

    @Override
    public void flushSync() {
        while (this.getOutstandingRecordsCount() > 0 && !this.isFlushFailed()) {
            this.flush();
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e) {
                LOGGER.warn("An interruption has happened while trying to flush the buffer synchronously.");
                Thread.currentThread().interrupt();
            }
        }
        if (this.isFlushFailed()) {
            LOGGER.warn("The flusher thread has failed trying to synchronously flush the buffer.");
        }
    }

    static class FirehoseThreadFactory
    implements ThreadFactory {
        private static final AtomicLong count = new AtomicLong(0L);

        FirehoseThreadFactory() {
        }

        @Override
        public Thread newThread(@Nonnull Runnable runnable) {
            Thread thread = Executors.defaultThreadFactory().newThread(runnable);
            thread.setName("kda-writer-thread-" + count.getAndIncrement());
            thread.setDaemon(false);
            return thread;
        }
    }

    public static class UserRecordResult {
        private Throwable exception;
        private boolean successful;

        public Throwable getException() {
            return this.exception;
        }

        public UserRecordResult setException(Throwable exception) {
            this.exception = exception;
            return this;
        }

        public boolean isSuccessful() {
            return this.successful;
        }

        public UserRecordResult setSuccessful(boolean successful) {
            this.successful = successful;
            return this;
        }
    }
}

