/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class WorkerCoordinator
extends AbstractCoordinator
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(WorkerCoordinator.class);
    public static final String DEFAULT_SUBPROTOCOL = "default";
    private final String restUrl;
    private final ConfigBackingStore configStorage;
    private ConnectProtocol.Assignment assignmentSnapshot;
    private final WorkerCoordinatorMetrics sensors;
    private ClusterConfigState configSnapshot;
    private final WorkerRebalanceListener listener;
    private LeaderState leaderState;
    private boolean rejoinRequested;

    public WorkerCoordinator(ConsumerNetworkClient client, String groupId, int sessionTimeoutMs, int heartbeatIntervalMs, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs, String restUrl, ConfigBackingStore configStorage, WorkerRebalanceListener listener) {
        super(client, groupId, sessionTimeoutMs, heartbeatIntervalMs, metrics, metricGrpPrefix, time, retryBackoffMs);
        this.restUrl = restUrl;
        this.configStorage = configStorage;
        this.assignmentSnapshot = null;
        this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
        this.listener = listener;
        this.rejoinRequested = false;
    }

    public void requestRejoin() {
        this.rejoinRequested = true;
    }

    public String protocolType() {
        return "connect";
    }

    public List<JoinGroupRequest.ProtocolMetadata> metadata() {
        this.configSnapshot = this.configStorage.snapshot();
        ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(this.restUrl, this.configSnapshot.offset());
        ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
        return Collections.singletonList(new JoinGroupRequest.ProtocolMetadata(DEFAULT_SUBPROTOCOL, metadata));
    }

    protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
        this.assignmentSnapshot = ConnectProtocol.deserializeAssignment(memberAssignment);
        this.rejoinRequested = false;
        this.listener.onAssigned(this.assignmentSnapshot, generation);
    }

    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
        log.debug("Performing task assignment");
        HashMap<String, ConnectProtocol.WorkerState> allConfigs = new HashMap<String, ConnectProtocol.WorkerState>();
        for (Map.Entry<String, ByteBuffer> entry : allMemberMetadata.entrySet()) {
            allConfigs.put(entry.getKey(), ConnectProtocol.deserializeMetadata(entry.getValue()));
        }
        long maxOffset = this.findMaxMemberConfigOffset(allConfigs);
        Long leaderOffset = this.ensureLeaderConfig(maxOffset);
        if (leaderOffset == null) {
            return this.fillAssignmentsAndSerialize(allConfigs.keySet(), (short)1, leaderId, ((ConnectProtocol.WorkerState)allConfigs.get(leaderId)).url(), maxOffset, new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>());
        }
        return this.performTaskAssignment(leaderId, leaderOffset, allConfigs);
    }

    private long findMaxMemberConfigOffset(Map<String, ConnectProtocol.WorkerState> allConfigs) {
        Long maxOffset = null;
        for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : allConfigs.entrySet()) {
            long memberRootOffset = stateEntry.getValue().offset();
            if (maxOffset == null) {
                maxOffset = memberRootOffset;
                continue;
            }
            maxOffset = Math.max(maxOffset, memberRootOffset);
        }
        log.debug("Max config offset root: {}, local snapshot config offsets root: {}", maxOffset, (Object)this.configSnapshot.offset());
        return maxOffset;
    }

    private Long ensureLeaderConfig(long maxOffset) {
        if (this.configSnapshot.offset() < maxOffset) {
            ClusterConfigState updatedSnapshot = this.configStorage.snapshot();
            if (updatedSnapshot.offset() < maxOffset) {
                log.info("Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync.");
                return null;
            }
            this.configSnapshot = updatedSnapshot;
            return this.configSnapshot.offset();
        }
        return maxOffset;
    }

    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) {
        HashMap<String, List<String>> connectorAssignments = new HashMap<String, List<String>>();
        HashMap<String, List<ConnectorTaskId>> taskAssignments = new HashMap<String, List<ConnectorTaskId>>();
        CircularIterator memberIt = new CircularIterator(WorkerCoordinator.sorted(allConfigs.keySet()));
        for (String connectorId : WorkerCoordinator.sorted(this.configSnapshot.connectors())) {
            String connectorAssignedTo = (String)memberIt.next();
            log.trace("Assigning connector {} to {}", (Object)connectorId, (Object)connectorAssignedTo);
            ArrayList<String> memberConnectors = (ArrayList<String>)connectorAssignments.get(connectorAssignedTo);
            if (memberConnectors == null) {
                memberConnectors = new ArrayList<String>();
                connectorAssignments.put(connectorAssignedTo, memberConnectors);
            }
            memberConnectors.add(connectorId);
            for (ConnectorTaskId taskId : WorkerCoordinator.sorted(this.configSnapshot.tasks(connectorId))) {
                String taskAssignedTo = (String)memberIt.next();
                log.trace("Assigning task {} to {}", (Object)taskId, (Object)taskAssignedTo);
                ArrayList<ConnectorTaskId> memberTasks = (ArrayList<ConnectorTaskId>)taskAssignments.get(taskAssignedTo);
                if (memberTasks == null) {
                    memberTasks = new ArrayList<ConnectorTaskId>();
                    taskAssignments.put(taskAssignedTo, memberTasks);
                }
                memberTasks.add(taskId);
            }
        }
        this.leaderState = new LeaderState(allConfigs, connectorAssignments, taskAssignments);
        return this.fillAssignmentsAndSerialize(allConfigs.keySet(), (short)0, leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments);
    }

    private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members, short error, String leaderId, String leaderUrl, long maxOffset, Map<String, List<String>> connectorAssignments, Map<String, List<ConnectorTaskId>> taskAssignments) {
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        for (String member : members) {
            List<ConnectorTaskId> tasks;
            List<String> connectors = connectorAssignments.get(member);
            if (connectors == null) {
                connectors = Collections.emptyList();
            }
            if ((tasks = taskAssignments.get(member)) == null) {
                tasks = Collections.emptyList();
            }
            ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, tasks);
            log.debug("Assignment: {} -> {}", (Object)member, (Object)assignment);
            groupAssignment.put(member, ConnectProtocol.serializeAssignment(assignment));
        }
        log.debug("Finished assignment");
        return groupAssignment;
    }

    protected void onJoinPrepare(int generation, String memberId) {
        this.leaderState = null;
        log.debug("Revoking previous assignment {}", (Object)this.assignmentSnapshot);
        if (this.assignmentSnapshot != null && !this.assignmentSnapshot.failed()) {
            this.listener.onRevoked(this.assignmentSnapshot.leader(), this.assignmentSnapshot.connectors(), this.assignmentSnapshot.tasks());
        }
    }

    public boolean needRejoin() {
        return super.needRejoin() || this.assignmentSnapshot == null || this.assignmentSnapshot.failed() || this.rejoinRequested;
    }

    public String memberId() {
        return this.memberId;
    }

    @Override
    public void close() {
        super.close();
    }

    private boolean isLeader() {
        return this.assignmentSnapshot != null && this.memberId.equals(this.assignmentSnapshot.leader());
    }

    public String ownerUrl(String connector) {
        if (this.needRejoin() || !this.isLeader()) {
            return null;
        }
        return this.leaderState.ownerUrl(connector);
    }

    public String ownerUrl(ConnectorTaskId task) {
        if (this.needRejoin() || !this.isLeader()) {
            return null;
        }
        return this.leaderState.ownerUrl(task);
    }

    private static <T extends Comparable<T>> List<T> sorted(Collection<T> members) {
        ArrayList<T> res = new ArrayList<T>(members);
        Collections.sort(res);
        return res;
    }

    private static <K, V> Map<V, K> invertAssignment(Map<K, List<V>> assignment) {
        HashMap<V, K> inverted = new HashMap<V, K>();
        for (Map.Entry<K, List<V>> assignmentEntry : assignment.entrySet()) {
            K key = assignmentEntry.getKey();
            for (V value : assignmentEntry.getValue()) {
                inverted.put(value, key);
            }
        }
        return inverted;
    }

    private static class LeaderState {
        private final Map<String, ConnectProtocol.WorkerState> allMembers;
        private final Map<String, String> connectorOwners;
        private final Map<ConnectorTaskId, String> taskOwners;

        public LeaderState(Map<String, ConnectProtocol.WorkerState> allMembers, Map<String, List<String>> connectorAssignment, Map<String, List<ConnectorTaskId>> taskAssignment) {
            this.allMembers = allMembers;
            this.connectorOwners = WorkerCoordinator.invertAssignment(connectorAssignment);
            this.taskOwners = WorkerCoordinator.invertAssignment(taskAssignment);
        }

        private String ownerUrl(ConnectorTaskId id) {
            String ownerId = this.taskOwners.get(id);
            if (ownerId == null) {
                return null;
            }
            return this.allMembers.get(ownerId).url();
        }

        private String ownerUrl(String connector) {
            String ownerId = this.connectorOwners.get(connector);
            if (ownerId == null) {
                return null;
            }
            return this.allMembers.get(ownerId).url();
        }
    }

    private class WorkerCoordinatorMetrics {
        public final Metrics metrics;
        public final String metricGrpName;

        public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metrics = metrics;
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            Measurable numConnectors = new Measurable(){

                public double measure(MetricConfig config, long now) {
                    return WorkerCoordinator.this.assignmentSnapshot.connectors().size();
                }
            };
            Measurable numTasks = new Measurable(){

                public double measure(MetricConfig config, long now) {
                    return WorkerCoordinator.this.assignmentSnapshot.tasks().size();
                }
            };
            metrics.addMetric(metrics.metricName("assigned-connectors", this.metricGrpName, "The number of connector instances currently assigned to this consumer"), numConnectors);
            metrics.addMetric(metrics.metricName("assigned-tasks", this.metricGrpName, "The number of tasks currently assigned to this consumer"), numTasks);
        }
    }
}

