/*
 * Decompiled with CFR 0.152.
 */
package org.radarbase.producer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.avro.SchemaValidationException;
import org.radarbase.data.AvroRecordData;
import org.radarbase.data.RecordData;
import org.radarbase.producer.AuthenticationException;
import org.radarbase.producer.KafkaSender;
import org.radarbase.producer.KafkaTopicSender;
import org.radarbase.topic.AvroTopic;

public class BatchedKafkaSender
implements KafkaSender {
    private final KafkaSender wrappedSender;
    private final long ageNanos;
    private final int maxBatchSize;

    public BatchedKafkaSender(KafkaSender sender, int ageMillis, int maxBatchSize) {
        this.wrappedSender = sender;
        this.ageNanos = TimeUnit.MILLISECONDS.toNanos(ageMillis);
        this.maxBatchSize = maxBatchSize;
    }

    @Override
    public <K, V> KafkaTopicSender<K, V> sender(AvroTopic<K, V> topic) throws IOException, SchemaValidationException {
        return new BatchedKafkaTopicSender(topic);
    }

    @Override
    public boolean isConnected() throws AuthenticationException {
        return this.wrappedSender.isConnected();
    }

    @Override
    public boolean resetConnection() throws AuthenticationException {
        return this.wrappedSender.resetConnection();
    }

    @Override
    public synchronized void close() throws IOException {
        this.wrappedSender.close();
    }

    private class BatchedKafkaTopicSender<K, V>
    implements KafkaTopicSender<K, V> {
        private long nanoAdded;
        private K cachedKey;
        private final List<V> cache = new ArrayList<V>();
        private final KafkaTopicSender<K, V> topicSender;
        private final AvroTopic<K, V> topic;

        private BatchedKafkaTopicSender(AvroTopic<K, V> topic) throws IOException, SchemaValidationException {
            this.topic = topic;
            this.topicSender = BatchedKafkaSender.this.wrappedSender.sender(topic);
        }

        @Override
        public void send(K key, V value) throws IOException, SchemaValidationException {
            if (!BatchedKafkaSender.this.isConnected()) {
                throw new IOException("Cannot send records to unconnected producer.");
            }
            this.trySend(key, value);
        }

        @Override
        public void send(RecordData<K, V> records) throws IOException, SchemaValidationException {
            if (records.isEmpty()) {
                return;
            }
            K key = records.getKey();
            for (Object value : records) {
                this.trySend(key, value);
            }
        }

        private void trySend(K key, V record) throws IOException, SchemaValidationException {
            boolean keysMatch;
            if (this.cache.isEmpty()) {
                this.cachedKey = key;
                this.nanoAdded = System.nanoTime();
                keysMatch = true;
            } else {
                keysMatch = Objects.equals(key, this.cachedKey);
            }
            if (keysMatch) {
                this.cache.add(record);
                if (this.exceedsBuffer(this.cache)) {
                    this.doSend();
                }
            } else {
                this.doSend();
                this.trySend(key, record);
            }
        }

        private void doSend() throws IOException, SchemaValidationException {
            this.topicSender.send(new AvroRecordData<K, V>(this.topic, this.cachedKey, this.cache));
            this.cache.clear();
            this.cachedKey = null;
        }

        @Override
        public void clear() {
            this.cache.clear();
            this.topicSender.clear();
        }

        @Override
        public void flush() throws IOException {
            if (!this.cache.isEmpty()) {
                try {
                    this.doSend();
                }
                catch (SchemaValidationException ex) {
                    throw new IOException("Schemas do not match", ex);
                }
            }
            this.topicSender.flush();
        }

        @Override
        public void close() throws IOException {
            try {
                this.flush();
            }
            finally {
                BatchedKafkaSender.this.wrappedSender.close();
            }
        }

        private boolean exceedsBuffer(List<?> records) {
            return records.size() >= BatchedKafkaSender.this.maxBatchSize || System.nanoTime() - this.nanoAdded >= BatchedKafkaSender.this.ageNanos;
        }
    }
}

