/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 0x100000;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = 0;
    private static final int REQUEST_TIMEOUT_MS = 10000;
    private TopicPartition tp = new TopicPartition("test", 0);
    private MockTime time = new MockTime();
    private MockClient client = new MockClient(this.time);
    private int batchSize = 16384;
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE);
    private Cluster cluster = TestUtils.singletonCluster("test", 1);
    private Metrics metrics = new Metrics((Time)this.time);
    Map<String, String> metricTags = new LinkedHashMap<String, String>();
    private RecordAccumulator accumulator = new RecordAccumulator(this.batchSize, 0x100000L, 0L, 0L, false, this.metrics, (Time)this.time, this.metricTags);
    private Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, 0x100000, -1, 0, 10000, this.metrics, (Time)this.time, "clientId");

    @Before
    public void setup() {
        this.metadata.update(this.cluster, this.time.milliseconds());
    }

    @Test
    public void testSimple() throws Exception {
        int offset = 0;
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp, (byte[])"key".getBytes(), (byte[])"value".getBytes(), (CompressionType)CompressionType.NONE, null).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"We should have a single produce request in flight.", (long)1L, (long)this.client.inFlightRequestCount());
        this.client.respond(this.produceResponse(this.tp.topic(), this.tp.partition(), offset, Errors.NONE.code()));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"All requests completed.", (long)offset, (long)this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
    }

    @Test
    public void testRetries() throws Exception {
        int maxRetries = 1;
        Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, 0x100000, -1, maxRetries, 10000, new Metrics(), (Time)this.time, "clientId");
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp, (byte[])"key".getBytes(), (byte[])"value".getBytes(), (CompressionType)CompressionType.NONE, null).future;
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        this.client.disconnect(this.client.requests().peek().request().destination());
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        int offset = 0;
        this.client.respond(this.produceResponse(this.tp.topic(), this.tp.partition(), offset, Errors.NONE.code()));
        sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should have retried and completed", (boolean)future.isDone());
        Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
        future = this.accumulator.append((TopicPartition)this.tp, (byte[])"key".getBytes(), (byte[])"value".getBytes(), (CompressionType)CompressionType.NONE, null).future;
        sender.run(this.time.milliseconds());
        for (int i = 0; i < maxRetries + 1; ++i) {
            this.client.disconnect(this.client.requests().peek().request().destination());
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
        }
        sender.run(this.time.milliseconds());
        this.completedWithError((Future<RecordMetadata>)future, Errors.NETWORK_EXCEPTION);
    }

    private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"Should have thrown an exception.");
        }
        catch (ExecutionException e) {
            Assert.assertEquals(error.exception().getClass(), e.getCause().getClass());
        }
    }

    private Struct produceResponse(String topic, int part, long offset, int error) {
        Struct struct = new Struct(ProtoUtils.currentResponseSchema((int)ApiKeys.PRODUCE.id));
        Struct response = struct.instance("responses");
        response.set("topic", (Object)topic);
        Struct partResp = response.instance("partition_responses");
        partResp.set("partition", (Object)part);
        partResp.set("error_code", (Object)((short)error));
        partResp.set("base_offset", (Object)offset);
        response.set("partition_responses", (Object)new Object[]{partResp});
        struct.set("responses", (Object)new Object[]{response});
        return struct;
    }
}

