/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.RecordBatch;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class RecordAccumulatorTest {
    private String topic = "test";
    private int partition1 = 0;
    private int partition2 = 1;
    private int partition3 = 2;
    private Node node1 = new Node(0, "localhost", 1111);
    private Node node2 = new Node(1, "localhost", 1112);
    private TopicPartition tp1 = new TopicPartition(this.topic, this.partition1);
    private TopicPartition tp2 = new TopicPartition(this.topic, this.partition2);
    private TopicPartition tp3 = new TopicPartition(this.topic, this.partition3);
    private PartitionInfo part1 = new PartitionInfo(this.topic, this.partition1, this.node1, null, null);
    private PartitionInfo part2 = new PartitionInfo(this.topic, this.partition2, this.node1, null, null);
    private PartitionInfo part3 = new PartitionInfo(this.topic, this.partition3, this.node2, null, null);
    private MockTime time = new MockTime();
    private SystemTime systemTime = new SystemTime();
    private byte[] key = "key".getBytes();
    private byte[] value = "value".getBytes();
    private int msgSize = 12 + Record.recordSize((byte[])this.key, (byte[])this.value);
    private Cluster cluster = new Cluster(null, Arrays.asList(this.node1, this.node2), Arrays.asList(this.part1, this.part2, this.part3), Collections.emptySet(), Collections.emptySet());
    private Metrics metrics = new Metrics((Time)this.time);
    private final long maxBlockTimeMs = 1000L;

    @After
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testFull() throws Exception {
        long now = this.time.milliseconds();
        int batchSize = 1024;
        RecordAccumulator accum = new RecordAccumulator(batchSize, (long)(10 * batchSize), CompressionType.NONE, 10L, 100L, this.metrics, (Time)this.time);
        int appends = batchSize / this.msgSize;
        for (int i = 0; i < appends; ++i) {
            accum.append(this.tp1, 0L, this.key, this.value, null, 1000L);
            Deque partitionBatches = (Deque)accum.batches().get(this.tp1);
            Assert.assertEquals((long)1L, (long)partitionBatches.size());
            Assert.assertTrue((boolean)((RecordBatch)partitionBatches.peekFirst()).records.isWritable());
            Assert.assertEquals((String)"No partitions should be ready.", (long)0L, (long)accum.ready((Cluster)this.cluster, (long)now).readyNodes.size());
        }
        accum.append(this.tp1, 0L, this.key, this.value, null, 1000L);
        Deque partitionBatches = (Deque)accum.batches().get(this.tp1);
        Assert.assertEquals((long)2L, (long)partitionBatches.size());
        Iterator partitionBatchesIterator = partitionBatches.iterator();
        Assert.assertFalse((boolean)((RecordBatch)partitionBatchesIterator.next()).records.isWritable());
        Assert.assertTrue((boolean)((RecordBatch)partitionBatchesIterator.next()).records.isWritable());
        Assert.assertEquals((String)"Our partition's leader should be ready", Collections.singleton(this.node1), (Object)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes);
        List batches = (List)accum.drain(this.cluster, Collections.singleton(this.node1), Integer.MAX_VALUE, 0L).get(this.node1.id());
        Assert.assertEquals((long)1L, (long)batches.size());
        RecordBatch batch = (RecordBatch)batches.get(0);
        Iterator iter = batch.records.iterator();
        for (int i = 0; i < appends; ++i) {
            LogEntry entry = (LogEntry)iter.next();
            Assert.assertEquals((String)"Keys should match", (Object)ByteBuffer.wrap(this.key), (Object)entry.record().key());
            Assert.assertEquals((String)"Values should match", (Object)ByteBuffer.wrap(this.value), (Object)entry.record().value());
        }
        Assert.assertFalse((String)"No more records", (boolean)iter.hasNext());
    }

    @Test
    public void testAppendLarge() throws Exception {
        int batchSize = 512;
        RecordAccumulator accum = new RecordAccumulator(batchSize, 10240L, CompressionType.NONE, 0L, 100L, this.metrics, (Time)this.time);
        accum.append(this.tp1, 0L, this.key, new byte[2 * batchSize], null, 1000L);
        Assert.assertEquals((String)"Our partition's leader should be ready", Collections.singleton(this.node1), (Object)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes);
    }

    @Test
    public void testLinger() throws Exception {
        long lingerMs = 10L;
        RecordAccumulator accum = new RecordAccumulator(1024, 10240L, CompressionType.NONE, lingerMs, 100L, this.metrics, (Time)this.time);
        accum.append(this.tp1, 0L, this.key, this.value, null, 1000L);
        Assert.assertEquals((String)"No partitions should be ready", (long)0L, (long)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size());
        this.time.sleep(10L);
        Assert.assertEquals((String)"Our partition's leader should be ready", Collections.singleton(this.node1), (Object)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes);
        List batches = (List)accum.drain(this.cluster, Collections.singleton(this.node1), Integer.MAX_VALUE, 0L).get(this.node1.id());
        Assert.assertEquals((long)1L, (long)batches.size());
        RecordBatch batch = (RecordBatch)batches.get(0);
        Iterator iter = batch.records.iterator();
        LogEntry entry = (LogEntry)iter.next();
        Assert.assertEquals((String)"Keys should match", (Object)ByteBuffer.wrap(this.key), (Object)entry.record().key());
        Assert.assertEquals((String)"Values should match", (Object)ByteBuffer.wrap(this.value), (Object)entry.record().value());
        Assert.assertFalse((String)"No more records", (boolean)iter.hasNext());
    }

    @Test
    public void testPartialDrain() throws Exception {
        RecordAccumulator accum = new RecordAccumulator(1024, 10240L, CompressionType.NONE, 10L, 100L, this.metrics, (Time)this.time);
        int appends = 1024 / this.msgSize + 1;
        List<TopicPartition> partitions = Arrays.asList(this.tp1, this.tp2);
        for (TopicPartition tp : partitions) {
            for (int i = 0; i < appends; ++i) {
                accum.append(tp, 0L, this.key, this.value, null, 1000L);
            }
        }
        Assert.assertEquals((String)"Partition's leader should be ready", Collections.singleton(this.node1), (Object)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes);
        List batches = (List)accum.drain(this.cluster, Collections.singleton(this.node1), 1024, 0L).get(this.node1.id());
        Assert.assertEquals((String)"But due to size bound only one partition should have been retrieved", (long)1L, (long)batches.size());
    }

    @Test
    public void testStressfulSituation() throws Exception {
        int numThreads = 5;
        int msgs = 10000;
        int numParts = 2;
        final RecordAccumulator accum = new RecordAccumulator(1024, 10240L, CompressionType.NONE, 0L, 100L, this.metrics, (Time)this.time);
        ArrayList<1> threads = new ArrayList<1>();
        for (int i = 0; i < 5; ++i) {
            threads.add(new Thread(){

                @Override
                public void run() {
                    for (int i = 0; i < 10000; ++i) {
                        try {
                            accum.append(new TopicPartition(RecordAccumulatorTest.this.topic, i % 2), 0L, RecordAccumulatorTest.this.key, RecordAccumulatorTest.this.value, null, 1000L);
                            continue;
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        for (Thread thread : threads) {
            thread.start();
        }
        int read = 0;
        long l = this.time.milliseconds();
        while (read < 50000) {
            Set nodes = accum.ready((Cluster)this.cluster, (long)l).readyNodes;
            List list = (List)accum.drain(this.cluster, nodes, 5120, 0L).get(this.node1.id());
            if (list == null) continue;
            for (RecordBatch batch : list) {
                for (LogEntry entry : batch.records) {
                    ++read;
                }
                accum.deallocate(batch);
            }
        }
        for (Thread thread : threads) {
            thread.join();
        }
    }

    @Test
    public void testNextReadyCheckDelay() throws Exception {
        int i;
        long lingerMs = 10L;
        RecordAccumulator accum = new RecordAccumulator(1024, 10240L, CompressionType.NONE, lingerMs, 100L, this.metrics, (Time)this.time);
        int appends = 1024 / this.msgSize;
        for (int i2 = 0; i2 < appends; ++i2) {
            accum.append(this.tp1, 0L, this.key, this.value, null, 1000L);
        }
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assert.assertEquals((String)"No nodes should be ready.", (long)0L, (long)result.readyNodes.size());
        Assert.assertEquals((String)"Next check time should be the linger time", (long)lingerMs, (long)result.nextReadyCheckDelayMs);
        this.time.sleep(lingerMs / 2L);
        for (i = 0; i < appends; ++i) {
            accum.append(this.tp3, 0L, this.key, this.value, null, 1000L);
        }
        result = accum.ready(this.cluster, this.time.milliseconds());
        Assert.assertEquals((String)"No nodes should be ready.", (long)0L, (long)result.readyNodes.size());
        Assert.assertEquals((String)"Next check time should be defined by node1, half remaining linger time", (long)(lingerMs / 2L), (long)result.nextReadyCheckDelayMs);
        for (i = 0; i < appends + 1; ++i) {
            accum.append(this.tp2, 0L, this.key, this.value, null, 1000L);
        }
        result = accum.ready(this.cluster, this.time.milliseconds());
        Assert.assertEquals((String)"Node1 should be ready", Collections.singleton(this.node1), (Object)result.readyNodes);
        Assert.assertTrue((String)"Next check time should be defined by node2, at most linger time", (result.nextReadyCheckDelayMs <= lingerMs ? 1 : 0) != 0);
    }

    @Test
    public void testRetryBackoff() throws Exception {
        long lingerMs = 0x1FFFFFFFFFFFFFFFL;
        long retryBackoffMs = 0x3FFFFFFFFFFFFFFFL;
        RecordAccumulator accum = new RecordAccumulator(1024, 10240L, CompressionType.NONE, lingerMs, retryBackoffMs, this.metrics, (Time)this.time);
        long now = this.time.milliseconds();
        accum.append(this.tp1, 0L, this.key, this.value, null, 1000L);
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, now + lingerMs + 1L);
        Assert.assertEquals((String)"Node1 should be ready", Collections.singleton(this.node1), (Object)result.readyNodes);
        Map batches = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1L);
        Assert.assertEquals((String)"Node1 should be the only ready node.", (long)1L, (long)batches.size());
        Assert.assertEquals((String)"Partition 0 should only have one batch drained.", (long)1L, (long)((List)batches.get(0)).size());
        now = this.time.milliseconds();
        accum.reenqueue((RecordBatch)((List)batches.get(0)).get(0), now);
        accum.append(this.tp2, 0L, this.key, this.value, null, 1000L);
        result = accum.ready(this.cluster, now + lingerMs + 1L);
        Assert.assertEquals((String)"Node1 should be ready", Collections.singleton(this.node1), (Object)result.readyNodes);
        batches = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1L);
        Assert.assertEquals((String)"Node1 should be the only ready node.", (long)1L, (long)batches.size());
        Assert.assertEquals((String)"Node1 should only have one batch drained.", (long)1L, (long)((List)batches.get(0)).size());
        Assert.assertEquals((String)"Node1 should only have one batch for partition 1.", (Object)this.tp2, (Object)((RecordBatch)((List)batches.get((Object)Integer.valueOf((int)0))).get((int)0)).topicPartition);
        result = accum.ready(this.cluster, now + retryBackoffMs + 1L);
        Assert.assertEquals((String)"Node1 should be ready", Collections.singleton(this.node1), (Object)result.readyNodes);
        batches = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1L);
        Assert.assertEquals((String)"Node1 should be the only ready node.", (long)1L, (long)batches.size());
        Assert.assertEquals((String)"Node1 should only have one batch drained.", (long)1L, (long)((List)batches.get(0)).size());
        Assert.assertEquals((String)"Node1 should only have one batch for partition 0.", (Object)this.tp1, (Object)((RecordBatch)((List)batches.get((Object)Integer.valueOf((int)0))).get((int)0)).topicPartition);
    }

    @Test
    public void testFlush() throws Exception {
        long lingerMs = Long.MAX_VALUE;
        RecordAccumulator accum = new RecordAccumulator(4096, 65536L, CompressionType.NONE, lingerMs, 100L, this.metrics, (Time)this.time);
        for (int i = 0; i < 100; ++i) {
            accum.append(new TopicPartition(this.topic, i % 3), 0L, this.key, this.value, null, 1000L);
        }
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assert.assertEquals((String)"No nodes should be ready.", (long)0L, (long)result.readyNodes.size());
        accum.beginFlush();
        result = accum.ready(this.cluster, this.time.milliseconds());
        Map results = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        for (List batches : results.values()) {
            for (RecordBatch batch : batches) {
                accum.deallocate(batch);
            }
        }
        accum.awaitFlushCompletion();
        Assert.assertFalse((boolean)accum.hasUnsent());
    }

    private void delayedInterrupt(final Thread thread, final long delayMs) {
        Thread t = new Thread(){

            @Override
            public void run() {
                RecordAccumulatorTest.this.systemTime.sleep(delayMs);
                thread.interrupt();
            }
        };
        t.start();
    }

    @Test
    public void testAwaitFlushComplete() throws Exception {
        RecordAccumulator accum = new RecordAccumulator(4096, 65536L, CompressionType.NONE, Long.MAX_VALUE, 100L, this.metrics, (Time)this.time);
        accum.append(new TopicPartition(this.topic, 0), 0L, this.key, this.value, null, 1000L);
        accum.beginFlush();
        Assert.assertTrue((boolean)accum.flushInProgress());
        this.delayedInterrupt(Thread.currentThread(), 1000L);
        try {
            accum.awaitFlushCompletion();
            Assert.fail((String)"awaitFlushCompletion should throw InterruptException");
        }
        catch (InterruptedException e) {
            Assert.assertFalse((String)"flushInProgress count should be decremented even if thread is interrupted", (boolean)accum.flushInProgress());
        }
    }

    @Test
    public void testAbortIncompleteBatches() throws Exception {
        long lingerMs = Long.MAX_VALUE;
        final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
        RecordAccumulator accum = new RecordAccumulator(4096, 65536L, CompressionType.NONE, lingerMs, 100L, this.metrics, (Time)this.time);
        for (int i = 0; i < 100; ++i) {
            class TestCallback
            implements Callback {
                TestCallback() {
                }

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    Assert.assertTrue((boolean)exception.getMessage().equals("Producer is closed forcefully."));
                    numExceptionReceivedInCallback.incrementAndGet();
                }
            }
            accum.append(new TopicPartition(this.topic, i % 3), 0L, this.key, this.value, (Callback)new TestCallback(), 1000L);
        }
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assert.assertEquals((String)"No nodes should be ready.", (long)0L, (long)result.readyNodes.size());
        accum.abortIncompleteBatches();
        Assert.assertEquals((long)numExceptionReceivedInCallback.get(), (long)100L);
        Assert.assertFalse((boolean)accum.hasUnsent());
    }

    @Test
    public void testExpiredBatches() throws InterruptedException {
        long retryBackoffMs = 100L;
        long lingerMs = 3000L;
        int requestTimeout = 60;
        RecordAccumulator accum = new RecordAccumulator(1024, 10240L, CompressionType.NONE, lingerMs, retryBackoffMs, this.metrics, (Time)this.time);
        int appends = 1024 / this.msgSize;
        for (int i = 0; i < appends; ++i) {
            accum.append(this.tp1, 0L, this.key, this.value, null, 1000L);
            Assert.assertEquals((String)"No partitions should be ready.", (long)0L, (long)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size());
        }
        accum.append(this.tp1, 0L, this.key, this.value, null, 0L);
        Set readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
        Assert.assertEquals((String)"Our partition's leader should be ready", Collections.singleton(this.node1), (Object)readyNodes);
        this.time.sleep(requestTimeout + 1);
        accum.mutePartition(this.tp1);
        List expiredBatches = accum.abortExpiredBatches(requestTimeout, this.time.milliseconds());
        Assert.assertEquals((String)"The batch should not be expired when the partition is muted", (long)0L, (long)expiredBatches.size());
        accum.unmutePartition(this.tp1);
        expiredBatches = accum.abortExpiredBatches(requestTimeout, this.time.milliseconds());
        Assert.assertEquals((String)"The batch should be expired", (long)1L, (long)expiredBatches.size());
        Assert.assertEquals((String)"No partitions should be ready.", (long)0L, (long)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size());
        this.time.sleep(lingerMs);
        Assert.assertEquals((String)"Our partition's leader should be ready", Collections.singleton(this.node1), (Object)readyNodes);
        this.time.sleep(requestTimeout + 1);
        accum.mutePartition(this.tp1);
        expiredBatches = accum.abortExpiredBatches(requestTimeout, this.time.milliseconds());
        Assert.assertEquals((String)"The batch should not be expired when metadata is still available and partition is muted", (long)0L, (long)expiredBatches.size());
        accum.unmutePartition(this.tp1);
        expiredBatches = accum.abortExpiredBatches(requestTimeout, this.time.milliseconds());
        Assert.assertEquals((String)"The batch should be expired when the partition is not muted", (long)1L, (long)expiredBatches.size());
        Assert.assertEquals((String)"No partitions should be ready.", (long)0L, (long)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size());
        accum.append(this.tp1, 0L, this.key, this.value, null, 0L);
        this.time.sleep(lingerMs);
        readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
        Assert.assertEquals((String)"Our partition's leader should be ready", Collections.singleton(this.node1), (Object)readyNodes);
        Map drained = accum.drain(this.cluster, readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertEquals((String)"There should be only one batch.", (long)((List)drained.get(this.node1.id())).size(), (long)1L);
        this.time.sleep(1000L);
        accum.reenqueue((RecordBatch)((List)drained.get(this.node1.id())).get(0), this.time.milliseconds());
        this.time.sleep((long)requestTimeout + retryBackoffMs);
        expiredBatches = accum.abortExpiredBatches(requestTimeout, this.time.milliseconds());
        Assert.assertEquals((String)"The batch should not be expired.", (long)0L, (long)expiredBatches.size());
        this.time.sleep(1L);
        accum.mutePartition(this.tp1);
        expiredBatches = accum.abortExpiredBatches(requestTimeout, this.time.milliseconds());
        Assert.assertEquals((String)"The batch should not be expired when the partition is muted", (long)0L, (long)expiredBatches.size());
        accum.unmutePartition(this.tp1);
        expiredBatches = accum.abortExpiredBatches(requestTimeout, this.time.milliseconds());
        Assert.assertEquals((String)"The batch should be expired when the partition is not muted.", (long)1L, (long)expiredBatches.size());
    }

    @Test
    public void testMutedPartitions() throws InterruptedException {
        long now = this.time.milliseconds();
        RecordAccumulator accum = new RecordAccumulator(1024, 10240L, CompressionType.NONE, 10L, 100L, this.metrics, (Time)this.time);
        int appends = 1024 / this.msgSize;
        for (int i = 0; i < appends; ++i) {
            accum.append(this.tp1, 0L, this.key, this.value, null, 1000L);
            Assert.assertEquals((String)"No partitions should be ready.", (long)0L, (long)accum.ready((Cluster)this.cluster, (long)now).readyNodes.size());
        }
        this.time.sleep(2000L);
        accum.mutePartition(this.tp1);
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assert.assertEquals((String)"No node should be ready", (long)0L, (long)result.readyNodes.size());
        accum.unmutePartition(this.tp1);
        result = accum.ready(this.cluster, this.time.milliseconds());
        Assert.assertTrue((String)"The batch should be ready", (result.readyNodes.size() > 0 ? 1 : 0) != 0);
        accum.mutePartition(this.tp1);
        Map drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertEquals((String)"No batch should have been drained", (long)0L, (long)((List)drained.get(this.node1.id())).size());
        accum.unmutePartition(this.tp1);
        drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue((String)"The batch should have been drained.", (((List)drained.get(this.node1.id())).size() > 0 ? 1 : 0) != 0);
    }
}

