/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 0x100000;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = 0;
    private static final String CLIENT_ID = "clientId";
    private static final String METRIC_GROUP = "producer-metrics";
    private static final double EPS = 1.0E-4;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private TopicPartition tp = new TopicPartition("test", 0);
    private MockTime time = new MockTime();
    private MockClient client = new MockClient(this.time);
    private int batchSize = 16384;
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true, new ClusterResourceListeners());
    private Cluster cluster = TestUtils.singletonCluster("test", 1);
    private Metrics metrics = null;
    private RecordAccumulator accumulator = null;
    private Sender sender = null;

    @Before
    public void setup() {
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("client-id", CLIENT_ID);
        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
        this.metrics = new Metrics(metricConfig, (Time)this.time);
        this.accumulator = new RecordAccumulator(this.batchSize, 0x100000L, CompressionType.NONE, 0L, 0L, this.metrics, (Time)this.time);
        this.sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, 0, this.metrics, (Time)this.time, CLIENT_ID, 1000);
        this.metadata.update(this.cluster, this.time.milliseconds());
    }

    @After
    public void tearDown() {
        this.metrics.close();
    }

    @Test
    public void testSimple() throws Exception {
        long offset = 0L;
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"We should have a single produce request in flight.", (long)1L, (long)this.client.inFlightRequestCount());
        this.client.respond(this.produceResponse(this.tp, offset, Errors.NONE.code(), 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"All requests completed.", (long)offset, (long)this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        long offset = 0L;
        for (int i = 1; i <= 3; ++i) {
            FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
            this.sender.run(this.time.milliseconds());
            this.client.respond(this.produceResponse(this.tp, 0L, Errors.NONE.code(), 100 * i));
            this.sender.run(this.time.milliseconds());
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(this.metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, ""));
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(this.metrics.metricName("produce-throttle-time-max", METRIC_GROUP, ""));
        Assert.assertEquals((double)200.0, (double)avgMetric.value(), (double)1.0E-4);
        Assert.assertEquals((double)300.0, (double)maxMetric.value(), (double)1.0E-4);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetries() throws Exception {
        int maxRetries = 1;
        try (Metrics m = new Metrics();){
            Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, maxRetries, m, (Time)this.time, CLIENT_ID, 1000);
            FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            String id = this.client.requests().peek().request().destination();
            Node node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, 0L));
            this.client.disconnect(id);
            Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
            Assert.assertFalse((String)"Client ready status should be false", (boolean)this.client.isReady(node, 0L));
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            long offset = 0L;
            this.client.respond(this.produceResponse(this.tp, offset, Errors.NONE.code(), 0));
            sender.run(this.time.milliseconds());
            Assert.assertTrue((String)"Request should have retried and completed", (boolean)future.isDone());
            Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
            future = this.accumulator.append((TopicPartition)this.tp, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
            sender.run(this.time.milliseconds());
            for (int i = 0; i < maxRetries + 1; ++i) {
                this.client.disconnect(this.client.requests().peek().request().destination());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
            }
            sender.run(this.time.milliseconds());
            this.completedWithError((Future<RecordMetadata>)future, Errors.NETWORK_EXCEPTION);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSendInOrder() throws Exception {
        int maxRetries = 1;
        try (Metrics m = new Metrics();){
            Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, m, (Time)this.time, CLIENT_ID, 1000);
            Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
            this.metadata.update(cluster1, this.time.milliseconds());
            TopicPartition tp2 = new TopicPartition("test", 1);
            this.accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, 1000L);
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            String id = this.client.requests().peek().request().destination();
            Assert.assertEquals((long)ApiKeys.PRODUCE.id, (long)this.client.requests().peek().request().header().apiKey());
            Node node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, 0L));
            this.time.sleep(900L);
            this.accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, 1000L);
            Cluster cluster2 = TestUtils.singletonCluster("test", 2);
            this.metadata.update(cluster2, this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        }
    }

    @Test
    public void testMetadataTopicExpiry() throws Exception {
        long offset = 0L;
        this.metadata.update(Cluster.empty(), this.time.milliseconds());
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Topic not added to metadata", (boolean)this.metadata.containsTopic(this.tp.topic()));
        this.metadata.update(this.cluster, this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        this.client.respond(this.produceResponse(this.tp, offset++, Errors.NONE.code(), 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"Request completed.", (long)0L, (long)this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertTrue((String)"Topic not retained in metadata list", (boolean)this.metadata.containsTopic(this.tp.topic()));
        this.time.sleep(300000L);
        this.metadata.update(Cluster.empty(), this.time.milliseconds());
        Assert.assertFalse((String)"Unused topic has not been expired", (boolean)this.metadata.containsTopic(this.tp.topic()));
        future = this.accumulator.append((TopicPartition)this.tp, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Topic not added to metadata", (boolean)this.metadata.containsTopic(this.tp.topic()));
        this.metadata.update(this.cluster, this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        this.client.respond(this.produceResponse(this.tp, offset++, Errors.NONE.code(), 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"Request completed.", (long)0L, (long)this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
    }

    private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"Should have thrown an exception.");
        }
        catch (ExecutionException e) {
            Assert.assertEquals(error.exception().getClass(), e.getCause().getClass());
        }
    }

    private Struct produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) {
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short)error, offset, -1L);
        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
        ProduceResponse response = new ProduceResponse(partResp, throttleTimeMs);
        return response.toStruct();
    }
}

