/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClusterConnectionStates;
import org.apache.kafka.clients.InFlightRequests;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NetworkClient
implements KafkaClient {
    private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
    private final Selectable selector;
    private final Metadata metadata;
    private final ClusterConnectionStates connectionStates;
    private final InFlightRequests inFlightRequests;
    private final int socketSendBuffer;
    private final int socketReceiveBuffer;
    private final String clientId;
    private final int nodeIndexOffset;
    private int correlation;
    private boolean metadataFetchInProgress;
    private long lastNoNodeAvailableMs;

    public NetworkClient(Selectable selector, Metadata metadata, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, int socketReceiveBuffer) {
        this.selector = selector;
        this.metadata = metadata;
        this.clientId = clientId;
        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
        this.socketSendBuffer = socketSendBuffer;
        this.socketReceiveBuffer = socketReceiveBuffer;
        this.correlation = 0;
        this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
        this.metadataFetchInProgress = false;
        this.lastNoNodeAvailableMs = 0L;
    }

    @Override
    public boolean ready(Node node, long now) {
        if (this.isReady(node, now)) {
            return true;
        }
        if (this.connectionStates.canConnect(node.id(), now)) {
            this.initiateConnect(node, now);
        }
        return false;
    }

    @Override
    public long connectionDelay(Node node, long now) {
        return this.connectionStates.connectionDelay(node.id(), now);
    }

    @Override
    public boolean isReady(Node node, long now) {
        int nodeId = node.id();
        if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0L) {
            return false;
        }
        return this.isSendable(nodeId);
    }

    private boolean isSendable(int node) {
        return this.connectionStates.isConnected(node) && this.inFlightRequests.canSendMore(node);
    }

    @Override
    public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now) {
        ArrayList<NetworkSend> sends = new ArrayList<NetworkSend>();
        for (int i = 0; i < requests.size(); ++i) {
            ClientRequest request = requests.get(i);
            int nodeId = request.request().destination();
            if (!this.isSendable(nodeId)) {
                throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
            }
            this.inFlightRequests.add(request);
            sends.add(request.request());
        }
        long timeToNextMetadataUpdate = this.metadata.timeToNextUpdate(now);
        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + this.metadata.refreshBackoff() - now, 0L);
        long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch);
        if (!this.metadataFetchInProgress && metadataTimeout == 0L) {
            this.maybeUpdateMetadata(sends, now);
        }
        try {
            this.selector.poll(Math.min(timeout, metadataTimeout), sends);
        }
        catch (IOException e) {
            log.error("Unexpected error during I/O in producer network thread", (Throwable)e);
        }
        ArrayList<ClientResponse> responses = new ArrayList<ClientResponse>();
        this.handleCompletedSends(responses, now);
        this.handleCompletedReceives(responses, now);
        this.handleDisconnections(responses, now);
        this.handleConnections();
        return responses;
    }

    @Override
    public int inFlightRequestCount() {
        return this.inFlightRequests.inFlightRequestCount();
    }

    @Override
    public RequestHeader nextRequestHeader(ApiKeys key) {
        return new RequestHeader(key.id, this.clientId, this.correlation++);
    }

    @Override
    public void wakeup() {
        this.selector.wakeup();
    }

    @Override
    public void close() {
        this.selector.close();
    }

    @Override
    public Node leastLoadedNode(long now) {
        List<Node> nodes = this.metadata.fetch().nodes();
        int inflight = Integer.MAX_VALUE;
        Node found = null;
        for (int i = 0; i < nodes.size(); ++i) {
            int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
            Node node = nodes.get(idx);
            int currInflight = this.inFlightRequests.inFlightRequestCount(node.id());
            if (currInflight == 0 && this.connectionStates.isConnected(node.id())) {
                return node;
            }
            if (this.connectionStates.isBlackedOut(node.id(), now) || currInflight >= inflight) continue;
            inflight = currInflight;
            found = node;
        }
        return found;
    }

    private void handleCompletedSends(List<ClientResponse> responses, long now) {
        for (NetworkSend send : this.selector.completedSends()) {
            ClientRequest request = this.inFlightRequests.lastSent(send.destination());
            if (request.expectResponse()) continue;
            this.inFlightRequests.completeLastSent(send.destination());
            responses.add(new ClientResponse(request, now, false, null));
        }
    }

    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            int source = receive.source();
            ClientRequest req = this.inFlightRequests.completeNext(source);
            ResponseHeader header = ResponseHeader.parse(receive.payload());
            short apiKey = req.request().header().apiKey();
            Struct body = (Struct)ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
            this.correlate(req.request().header(), header);
            if (apiKey == ApiKeys.METADATA.id) {
                this.handleMetadataResponse(req.request().header(), body, now);
                continue;
            }
            responses.add(new ClientResponse(req, now, false, body));
        }
    }

    private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
        this.metadataFetchInProgress = false;
        MetadataResponse response = new MetadataResponse(body);
        Cluster cluster = response.cluster();
        if (cluster.nodes().size() > 0) {
            this.metadata.update(cluster, now);
        } else {
            log.trace("Ignoring empty metadata response with correlation id {}.", (Object)header.correlationId());
        }
    }

    private void handleDisconnections(List<ClientResponse> responses, long now) {
        for (int node : this.selector.disconnected()) {
            this.connectionStates.disconnected(node);
            log.debug("Node {} disconnected.", (Object)node);
            for (ClientRequest request : this.inFlightRequests.clearAll(node)) {
                log.trace("Cancelled request {} due to node {} being disconnected", (Object)request, (Object)node);
                ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
                if (requestKey == ApiKeys.METADATA) {
                    this.metadataFetchInProgress = false;
                    continue;
                }
                responses.add(new ClientResponse(request, now, true, null));
            }
        }
        if (this.selector.disconnected().size() > 0) {
            this.metadata.requestUpdate();
        }
    }

    private void handleConnections() {
        for (Integer id : this.selector.connected()) {
            log.debug("Completed connection to node {}", (Object)id);
            this.connectionStates.connected(id);
        }
    }

    private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
        if (requestHeader.correlationId() != responseHeader.correlationId()) {
            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + ") does not match request (" + requestHeader.correlationId() + ")");
        }
    }

    private ClientRequest metadataRequest(long now, int node, Set<String> topics) {
        MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
        RequestSend send = new RequestSend(node, this.nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
        return new ClientRequest(now, true, send, null);
    }

    private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
        Node node = this.leastLoadedNode(now);
        if (node == null) {
            log.debug("Give up sending metadata request since no node is available");
            this.lastNoNodeAvailableMs = now;
            return;
        }
        log.debug("Trying to send metadata request to node {}", (Object)node.id());
        if (this.connectionStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) {
            Set<String> topics = this.metadata.topics();
            this.metadataFetchInProgress = true;
            ClientRequest metadataRequest = this.metadataRequest(now, node.id(), topics);
            log.debug("Sending metadata request {} to node {}", (Object)metadataRequest, (Object)node.id());
            sends.add(metadataRequest.request());
            this.inFlightRequests.add(metadataRequest);
        } else if (this.connectionStates.canConnect(node.id(), now)) {
            log.debug("Init connection to node {} for sending metadata request in the next iteration", (Object)node.id());
            this.initiateConnect(node, now);
        } else {
            this.lastNoNodeAvailableMs = now;
        }
    }

    private void initiateConnect(Node node, long now) {
        try {
            log.debug("Initiating connection to node {} at {}:{}.", new Object[]{node.id(), node.host(), node.port()});
            this.connectionStates.connecting(node.id(), now);
            this.selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
        }
        catch (IOException e) {
            this.connectionStates.disconnected(node.id());
            this.metadata.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", new Object[]{node.id(), node.host(), node.port(), e});
        }
    }
}

