/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AbstractCoordinatorTest {
    private static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]);
    private static final int REBALANCE_TIMEOUT_MS = 60000;
    private static final int SESSION_TIMEOUT_MS = 10000;
    private static final int HEARTBEAT_INTERVAL_MS = 3000;
    private static final long RETRY_BACKOFF_MS = 100L;
    private static final long REQUEST_TIMEOUT_MS = 40000L;
    private static final String GROUP_ID = "dummy-group";
    private static final String METRIC_GROUP_PREFIX = "consumer";
    private MockClient mockClient;
    private MockTime mockTime;
    private Node node;
    private Node coordinatorNode;
    private ConsumerNetworkClient consumerClient;
    private DummyCoordinator coordinator;

    @Before
    public void setupCoordinator() {
        this.mockTime = new MockTime();
        this.mockClient = new MockClient(this.mockTime);
        Metadata metadata = new Metadata();
        this.consumerClient = new ConsumerNetworkClient((KafkaClient)this.mockClient, metadata, (Time)this.mockTime, 100L, 40000L);
        Metrics metrics = new Metrics();
        Cluster cluster = TestUtils.singletonCluster("topic", 1);
        metadata.update(cluster, this.mockTime.milliseconds());
        this.node = (Node)cluster.nodes().get(0);
        this.mockClient.setNode(this.node);
        this.coordinatorNode = new Node(Integer.MAX_VALUE - this.node.id(), this.node.host(), this.node.port());
        this.coordinator = new DummyCoordinator(this.consumerClient, metrics, this.mockTime);
    }

    @Test
    public void testCoordinatorDiscoveryBackoff() {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.blackout(this.coordinatorNode, 50L);
        long initialTime = this.mockTime.milliseconds();
        this.coordinator.ensureCoordinatorReady();
        long endTime = this.mockTime.milliseconds();
        Assert.assertTrue((endTime - initialTime >= 100L ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUncaughtExceptionInHeartbeatThread() throws Exception {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(this.syncGroupResponse(Errors.NONE));
        final RuntimeException e = new RuntimeException();
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(ClientRequest request) {
                if (request.request().header().apiKey() == ApiKeys.HEARTBEAT.id) {
                    throw e;
                }
                return false;
            }
        }, this.heartbeatResponse(Errors.UNKNOWN));
        try {
            this.coordinator.ensureActiveGroup();
            this.mockTime.sleep(3000L);
            DummyCoordinator dummyCoordinator = this.coordinator;
            synchronized (dummyCoordinator) {
                ((Object)((Object)this.coordinator)).notify();
            }
            long startMs = System.currentTimeMillis();
            while (System.currentTimeMillis() - startMs < 1000L) {
                Thread.sleep(10L);
                this.coordinator.pollHeartbeat(this.mockTime.milliseconds());
            }
            Assert.fail((String)"Expected pollHeartbeat to raise an error in 1 second");
        }
        catch (RuntimeException exception) {
            Assert.assertEquals((Object)exception, (Object)e);
        }
    }

    @Test
    public void testLookupCoordinator() throws Exception {
        this.mockClient.setNode(null);
        RequestFuture noBrokersAvailableFuture = this.coordinator.lookupCoordinator();
        Assert.assertTrue((String)"Failed future expected", (boolean)noBrokersAvailableFuture.failed());
        this.mockClient.setNode(this.node);
        RequestFuture future = this.coordinator.lookupCoordinator();
        Assert.assertFalse((String)"Request not sent", (boolean)future.isDone());
        Assert.assertTrue((String)"New request sent while one is in progress", (future == this.coordinator.lookupCoordinator() ? 1 : 0) != 0);
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        Assert.assertTrue((String)"New request not sent after previous completed", (future != this.coordinator.lookupCoordinator() ? 1 : 0) != 0);
    }

    @Test
    public void testWakeupAfterJoinGroupSent() throws Exception {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){
            private int invocations = 0;

            @Override
            public boolean matches(ClientRequest request) {
                boolean isJoinGroupRequest;
                ++this.invocations;
                boolean bl = isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
                if (isJoinGroupRequest && this.invocations == 1) {
                    throw new WakeupException();
                }
                return isJoinGroupRequest;
            }
        }, this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterJoinGroupSentExternalCompletion() throws Exception {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){
            private int invocations = 0;

            @Override
            public boolean matches(ClientRequest request) {
                boolean isJoinGroupRequest;
                ++this.invocations;
                boolean bl = isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
                if (isJoinGroupRequest && this.invocations == 1) {
                    throw new WakeupException();
                }
                return isJoinGroupRequest;
            }
        }, this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.consumerClient.poll(0L);
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterJoinGroupReceived() throws Exception {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(ClientRequest request) {
                boolean isJoinGroupRequest;
                boolean bl = isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
                if (isJoinGroupRequest) {
                    AbstractCoordinatorTest.this.consumerClient.wakeup();
                }
                return isJoinGroupRequest;
            }
        }, this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exception {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(ClientRequest request) {
                boolean isJoinGroupRequest;
                boolean bl = isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
                if (isJoinGroupRequest) {
                    AbstractCoordinatorTest.this.consumerClient.wakeup();
                }
                return isJoinGroupRequest;
            }
        }, this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.consumerClient.poll(0L);
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterSyncGroupSent() throws Exception {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){
            private int invocations = 0;

            @Override
            public boolean matches(ClientRequest request) {
                boolean isSyncGroupRequest;
                ++this.invocations;
                boolean bl = isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
                if (isSyncGroupRequest && this.invocations == 1) {
                    throw new WakeupException();
                }
                return isSyncGroupRequest;
            }
        }, this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){
            private int invocations = 0;

            @Override
            public boolean matches(ClientRequest request) {
                boolean isSyncGroupRequest;
                ++this.invocations;
                boolean bl = isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
                if (isSyncGroupRequest && this.invocations == 1) {
                    throw new WakeupException();
                }
                return isSyncGroupRequest;
            }
        }, this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.consumerClient.poll(0L);
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterSyncGroupReceived() throws Exception {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(ClientRequest request) {
                boolean isSyncGroupRequest;
                boolean bl = isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
                if (isSyncGroupRequest) {
                    AbstractCoordinatorTest.this.consumerClient.wakeup();
                }
                return isSyncGroupRequest;
            }
        }, this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception {
        this.mockClient.prepareResponse(this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(ClientRequest request) {
                boolean isSyncGroupRequest;
                boolean bl = isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
                if (isSyncGroupRequest) {
                    AbstractCoordinatorTest.this.consumerClient.wakeup();
                }
                return isSyncGroupRequest;
            }
        }, this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.consumerClient.poll(0L);
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    private AtomicBoolean prepareFirstHeartbeat() {
        final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(ClientRequest request) {
                boolean isHeartbeatRequest;
                boolean bl = isHeartbeatRequest = request.request().header().apiKey() == ApiKeys.HEARTBEAT.id;
                if (isHeartbeatRequest) {
                    heartbeatReceived.set(true);
                }
                return isHeartbeatRequest;
            }
        }, this.heartbeatResponse(Errors.UNKNOWN));
        return heartbeatReceived;
    }

    private void awaitFirstHeartbeat(final AtomicBoolean heartbeatReceived) throws Exception {
        this.mockTime.sleep(3000L);
        TestUtils.waitForCondition(new TestCondition(){

            @Override
            public boolean conditionMet() {
                return heartbeatReceived.get();
            }
        }, 3000L, "Should have received a heartbeat request after joining the group");
    }

    private Struct groupCoordinatorResponse(Node node, Errors error) {
        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error.code(), node);
        return response.toStruct();
    }

    private Struct heartbeatResponse(Errors error) {
        HeartbeatResponse response = new HeartbeatResponse(error.code());
        return response.toStruct();
    }

    private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
        return new JoinGroupResponse(error.code(), generationId, "dummy-subprotocol", memberId, leaderId, Collections.emptyMap()).toStruct();
    }

    private Struct syncGroupResponse(Errors error) {
        return new SyncGroupResponse(error.code(), ByteBuffer.allocate(0)).toStruct();
    }

    public class DummyCoordinator
    extends AbstractCoordinator {
        private int onJoinPrepareInvokes;
        private int onJoinCompleteInvokes;

        public DummyCoordinator(ConsumerNetworkClient client, Metrics metrics, Time time) {
            super(client, AbstractCoordinatorTest.GROUP_ID, 60000, 10000, 3000, metrics, AbstractCoordinatorTest.METRIC_GROUP_PREFIX, time, 100L);
            this.onJoinPrepareInvokes = 0;
            this.onJoinCompleteInvokes = 0;
        }

        protected String protocolType() {
            return "dummy";
        }

        protected List<JoinGroupRequest.ProtocolMetadata> metadata() {
            return Collections.singletonList(new JoinGroupRequest.ProtocolMetadata("dummy-subprotocol", EMPTY_DATA));
        }

        protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
            HashMap<String, ByteBuffer> assignment = new HashMap<String, ByteBuffer>();
            for (Map.Entry<String, ByteBuffer> metadata : allMemberMetadata.entrySet()) {
                assignment.put(metadata.getKey(), EMPTY_DATA);
            }
            return assignment;
        }

        protected void onJoinPrepare(int generation, String memberId) {
            ++this.onJoinPrepareInvokes;
        }

        protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
            ++this.onJoinCompleteInvokes;
        }
    }
}

