/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.coordinator.group;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
import io.streamnative.pulsar.handlers.kop.coordinator.group.DelayedHeartbeat;
import io.streamnative.pulsar.handlers.kop.coordinator.group.DelayedJoin;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupState;
import io.streamnative.pulsar.handlers.kop.coordinator.group.InitialDelayedJoin;
import io.streamnative.pulsar.handlers.kop.coordinator.group.JoinGroupResult;
import io.streamnative.pulsar.handlers.kop.coordinator.group.MemberMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.Timer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupCoordinator {
    private static final Logger log = LoggerFactory.getLogger(GroupCoordinator.class);
    static final String NoState = "";
    static final String NoProtocolType = "";
    static final String NoProtocol = "";
    static final String NoLeader = "";
    static final int NoGeneration = -1;
    static final String NoMemberId = "";
    static final GroupMetadata.GroupSummary EmptyGroup = new GroupMetadata.GroupSummary("", "", "", Collections.emptyList());
    static final GroupMetadata.GroupSummary DeadGroup = new GroupMetadata.GroupSummary(GroupState.Dead.toString(), "", "", Collections.emptyList());
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final GroupConfig groupConfig;
    private final GroupMetadataManager groupManager;
    private final DelayedOperationPurgatory<DelayedHeartbeat> heartbeatPurgatory;
    private final DelayedOperationPurgatory<DelayedJoin> joinPurgatory;
    private final Time time;

    public static GroupCoordinator of(String tenant, SystemTopicClient client, GroupConfig groupConfig, OffsetConfig offsetConfig, String namespacePrefixForMetadata, Timer timer, Time time) {
        ScheduledExecutorService coordinatorExecutor = (ScheduledExecutorService)OrderedScheduler.newSchedulerBuilder().name("group-coordinator-executor").numThreads(1).build();
        GroupMetadataManager metadataManager = new GroupMetadataManager(offsetConfig, client.newProducerBuilder(), client.newReaderBuilder(), coordinatorExecutor, namespacePrefixForMetadata, time);
        DelayedOperationPurgatory<DelayedJoin> joinPurgatory = DelayedOperationPurgatory.builder().purgatoryName("group-coordinator-delayed-join").timeoutTimer(timer).build();
        DelayedOperationPurgatory<DelayedHeartbeat> heartbeatPurgatory = DelayedOperationPurgatory.builder().purgatoryName("group-coordinator-delayed-heartbeat").timeoutTimer(timer).build();
        return new GroupCoordinator(groupConfig, metadataManager, heartbeatPurgatory, joinPurgatory, time);
    }

    public GroupCoordinator(GroupConfig groupConfig, GroupMetadataManager groupManager, DelayedOperationPurgatory<DelayedHeartbeat> heartbeatPurgatory, DelayedOperationPurgatory<DelayedJoin> joinPurgatory, Time time) {
        this.groupConfig = groupConfig;
        this.groupManager = groupManager;
        this.heartbeatPurgatory = heartbeatPurgatory;
        this.joinPurgatory = joinPurgatory;
        this.time = time;
    }

    public void startup(boolean enableMetadataExpiration) {
        log.info("Starting up group coordinator.");
        this.groupManager.startup(enableMetadataExpiration);
        this.isActive.set(true);
        log.info("Group coordinator started.");
    }

    public void shutdown() {
        log.info("Shutting down group coordinator ...");
        this.isActive.set(false);
        this.groupManager.shutdown();
        this.heartbeatPurgatory.shutdown();
        this.joinPurgatory.shutdown();
        log.info("Shutdown group coordinator completely.");
    }

    public int partitionFor(String coordinatorKey) {
        return this.groupManager.partitionFor(coordinatorKey);
    }

    public String getTopicPartitionName(int partition) {
        return this.groupManager.getTopicPartitionName(partition);
    }

    public ConcurrentMap<Integer, CompletableFuture<Producer<ByteBuffer>>> getOffsetsProducers() {
        return this.groupManager.getOffsetsProducers();
    }

    public ConcurrentMap<Integer, CompletableFuture<Reader<ByteBuffer>>> getOffsetsReaders() {
        return this.groupManager.getOffsetsReaders();
    }

    public GroupMetadataManager getGroupManager() {
        return this.groupManager;
    }

    public GroupConfig groupConfig() {
        return this.groupConfig;
    }

    public OffsetConfig offsetConfig() {
        return this.groupManager.offsetConfig();
    }

    public CompletableFuture<JoinGroupResult> handleJoinGroup(String groupId, String memberId, String clientId, String clientHost, int rebalanceTimeoutMs, int sessionTimeoutMs, String protocolType, Map<String, byte[]> protocols) {
        Optional<Errors> errors = this.validateGroupStatus(groupId, ApiKeys.JOIN_GROUP);
        if (errors.isPresent()) {
            return CompletableFuture.completedFuture(this.joinError(memberId, errors.get()));
        }
        if (sessionTimeoutMs < this.groupConfig.groupMinSessionTimeoutMs() || sessionTimeoutMs > this.groupConfig.groupMaxSessionTimeoutMs()) {
            return CompletableFuture.completedFuture(this.joinError(memberId, Errors.INVALID_SESSION_TIMEOUT));
        }
        return this.groupManager.getGroup(groupId).map(group -> this.doJoinGroup((GroupMetadata)group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols)).orElseGet(() -> {
            if (!"".equals(memberId)) {
                return CompletableFuture.completedFuture(this.joinError(memberId, Errors.UNKNOWN_MEMBER_ID));
            }
            GroupMetadata group = this.groupManager.addGroup(new GroupMetadata(groupId, GroupState.Empty));
            return this.doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols);
        });
    }

    private CompletableFuture<JoinGroupResult> doJoinGroup(GroupMetadata group, String memberId, String clientId, String clientHost, int rebalanceTimeoutMs, int sessionTimeoutMs, String protocolType, Map<String, byte[]> protocols) {
        return (CompletableFuture)group.inLock(() -> this.unsafeJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols));
    }

    private CompletableFuture<JoinGroupResult> unsafeJoinGroup(GroupMetadata group, String memberId, String clientId, String clientHost, int rebalanceTimeoutMs, int sessionTimeoutMs, String protocolType, Map<String, byte[]> protocols) {
        if (!(group.is(GroupState.Empty) || group.protocolType().isPresent() && Objects.equals(group.protocolType().get(), protocolType) && group.supportsProtocols(protocols.keySet()))) {
            return CompletableFuture.completedFuture(this.joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL));
        }
        if (group.is(GroupState.Empty) && (protocols.isEmpty() || protocolType.isEmpty())) {
            return CompletableFuture.completedFuture(this.joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL));
        }
        if (!"".equals(memberId) && !group.has(memberId)) {
            return CompletableFuture.completedFuture(this.joinError(memberId, Errors.UNKNOWN_MEMBER_ID));
        }
        CompletableFuture<JoinGroupResult> resultFuture = switch (group.currentState()) {
            case GroupState.Dead -> CompletableFuture.completedFuture(this.joinError(memberId, Errors.UNKNOWN_MEMBER_ID));
            case GroupState.PreparingRebalance -> {
                if ("".equals(memberId)) {
                    yield this.addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group);
                }
                MemberMetadata member = group.get(memberId);
                yield this.updateMemberAndRebalance(group, member, protocols);
            }
            case GroupState.CompletingRebalance -> {
                if ("".equals(memberId)) {
                    yield this.addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group);
                }
                MemberMetadata member = group.get(memberId);
                if (member.matches(protocols)) {
                    Map<Object, Object> members = group.isLeader(memberId) ? group.currentMemberMetadata() : Collections.emptyMap();
                    yield CompletableFuture.completedFuture(new JoinGroupResult(members, memberId, group.generationId(), group.protocolOrNull(), group.protocolTypeOrNull(), group.leaderOrNull(), Errors.NONE));
                }
                yield this.updateMemberAndRebalance(group, member, protocols);
            }
            case GroupState.Empty, GroupState.Stable -> {
                if ("".equals(memberId)) {
                    yield this.addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group);
                }
                MemberMetadata member = group.get(memberId);
                if (group.isLeader(memberId) || !member.matches(protocols)) {
                    yield this.updateMemberAndRebalance(group, member, protocols);
                }
                yield CompletableFuture.completedFuture(new JoinGroupResult(Collections.emptyMap(), memberId, group.generationId(), group.protocolOrNull(), group.protocolTypeOrNull(), group.leaderOrNull(), Errors.NONE));
            }
            default -> FutureUtil.failedFuture((Throwable)new IllegalStateException("Unknown state " + group.currentState()));
        };
        if (group.is(GroupState.PreparingRebalance)) {
            this.joinPurgatory.checkAndComplete(new DelayedOperationKey.GroupKey(group.groupId()));
        }
        return resultFuture;
    }

    public CompletableFuture<KeyValue<Errors, byte[]>> handleSyncGroup(String groupId, int generation, String memberId, Map<String, byte[]> groupAssignment) {
        CompletableFuture<KeyValue<Errors, byte[]>> resultFuture = new CompletableFuture<KeyValue<Errors, byte[]>>();
        this.handleSyncGroup(groupId, generation, memberId, groupAssignment, (assignment, errors) -> resultFuture.complete(new KeyValue(errors, assignment)));
        return resultFuture;
    }

    public void handleSyncGroup(String groupId, int generation, String memberId, Map<String, byte[]> groupAssignment, BiConsumer<byte[], Errors> responseCallback) {
        Optional<Errors> errorsOpt = this.validateGroupStatus(groupId, ApiKeys.SYNC_GROUP);
        if (errorsOpt.isPresent()) {
            Errors error = errorsOpt.get();
            if (Errors.COORDINATOR_LOAD_IN_PROGRESS == error) {
                responseCallback.accept(new byte[0], Errors.REBALANCE_IN_PROGRESS);
            } else {
                responseCallback.accept(new byte[0], error);
            }
        } else {
            Optional<GroupMetadata> groupOpt = this.groupManager.getGroup(groupId);
            if (groupOpt.isPresent()) {
                this.doSyncGroup(groupOpt.get(), generation, memberId, groupAssignment, responseCallback);
            } else {
                responseCallback.accept(new byte[0], Errors.UNKNOWN_MEMBER_ID);
            }
        }
    }

    private void doSyncGroup(GroupMetadata group, int generationId, String memberId, Map<String, byte[]> groupAssignment, BiConsumer<byte[], Errors> responseCallback) {
        group.inLock(() -> {
            if (!group.has(memberId)) {
                responseCallback.accept(new byte[0], Errors.UNKNOWN_MEMBER_ID);
            } else if (generationId != group.generationId()) {
                responseCallback.accept(new byte[0], Errors.ILLEGAL_GENERATION);
            } else {
                switch (group.currentState()) {
                    case Dead: 
                    case Empty: {
                        responseCallback.accept(new byte[0], Errors.UNKNOWN_MEMBER_ID);
                        break;
                    }
                    case PreparingRebalance: {
                        responseCallback.accept(new byte[0], Errors.REBALANCE_IN_PROGRESS);
                        break;
                    }
                    case CompletingRebalance: {
                        group.get(memberId).awaitingSyncCallback(responseCallback);
                        if (!group.isLeader(memberId)) break;
                        log.info("Assignment received from leader for group {} for generation {}", (Object)group.groupId(), (Object)group.generationId());
                        Sets.SetView missing = Sets.difference(group.allMembers(), groupAssignment.keySet());
                        HashMap<String, byte[]> assignment = new HashMap<String, byte[]>();
                        assignment.putAll(groupAssignment);
                        assignment.putAll(missing.stream().collect(Collectors.toMap(k -> k, k -> new byte[0])));
                        this.groupManager.storeGroup(group, assignment).thenApply(error -> group.inLock(() -> {
                            if (group.is(GroupState.CompletingRebalance) && generationId == group.generationId()) {
                                if (error != Errors.NONE) {
                                    this.resetAndPropagateAssignmentError(group, (Errors)error);
                                    this.maybePrepareRebalance(group);
                                } else {
                                    this.setAndPropagateAssignment(group, assignment);
                                    group.transitionTo(GroupState.Stable);
                                }
                            }
                            return null;
                        }));
                        break;
                    }
                    case Stable: {
                        MemberMetadata memberMetadata = group.get(memberId);
                        responseCallback.accept(memberMetadata.assignment(), Errors.NONE);
                        this.completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId));
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Should not reach here");
                    }
                }
            }
            return null;
        });
    }

    public CompletableFuture<Errors> handleLeaveGroup(String groupId, Set<String> members) {
        return this.validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).map(CompletableFuture::completedFuture).orElseGet(() -> this.groupManager.getGroup(groupId).map(group -> (CompletableFuture)group.inLock(() -> {
            if (group.is(GroupState.Dead)) {
                return CompletableFuture.completedFuture(Errors.COORDINATOR_NOT_AVAILABLE);
            }
            if (!members.stream().allMatch(group::has)) {
                return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
            }
            for (String memberId : members) {
                MemberMetadata member = group.get(memberId);
                this.removeHeartbeatForLeavingMember(member);
                if (log.isDebugEnabled()) {
                    log.debug("Member {} in group {} has left, removing it from the group", (Object)member.memberId(), (Object)group.groupId());
                }
                this.removeMemberAndUpdateGroup((GroupMetadata)group, member);
            }
            return CompletableFuture.completedFuture(Errors.NONE);
        })).orElseGet(() -> CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID)));
    }

    public Map<String, Errors> handleDeleteGroups(Collection<String> groupIds) {
        Map<String, Errors> groupErrors = Collections.synchronizedMap(new HashMap());
        ArrayList groupsEligibleForDeletion = new ArrayList();
        groupIds.forEach(groupId -> {
            Optional<Errors> validateErrorsOpt = this.validateGroupStatus((String)groupId, ApiKeys.DELETE_GROUPS);
            validateErrorsOpt.map(error -> {
                groupErrors.put((String)groupId, (Errors)error);
                return error;
            }).orElseGet(() -> this.groupManager.getGroup((String)groupId).map(group -> (Errors)group.inLock(() -> {
                switch (group.currentState()) {
                    case Dead: {
                        if (this.groupManager.groupNotExists((String)groupId)) {
                            groupErrors.put((String)groupId, Errors.GROUP_ID_NOT_FOUND);
                            break;
                        }
                        groupErrors.put((String)groupId, Errors.NOT_COORDINATOR);
                        break;
                    }
                    case Empty: {
                        group.transitionTo(GroupState.Dead);
                        groupsEligibleForDeletion.add(group);
                        break;
                    }
                    default: {
                        groupErrors.put((String)groupId, Errors.NON_EMPTY_GROUP);
                    }
                }
                return Errors.NONE;
            })).orElseGet(() -> {
                Errors error = this.groupManager.groupNotExists((String)groupId) ? Errors.GROUP_ID_NOT_FOUND : Errors.NOT_COORDINATOR;
                groupErrors.put((String)groupId, error);
                return Errors.NONE;
            }));
        });
        if (!groupsEligibleForDeletion.isEmpty()) {
            this.groupManager.cleanGroupMetadata(groupsEligibleForDeletion.stream(), GroupMetadata::removeAllOffsets).thenAccept(offsetsRemoved -> log.info("The following groups were deleted {}. A total of {} offsets were removed.", (Object)groupsEligibleForDeletion.stream().map(GroupMetadata::groupId).collect(Collectors.joining(",")), offsetsRemoved));
            groupErrors.putAll(groupsEligibleForDeletion.stream().collect(Collectors.toMap(GroupMetadata::groupId, ignored -> Errors.NONE)));
        }
        return groupErrors;
    }

    public CompletableFuture<Errors> handleHeartbeat(String groupId, String memberId, int generationId) {
        return this.validateGroupStatus(groupId, ApiKeys.HEARTBEAT).map(error -> {
            if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                return CompletableFuture.completedFuture(Errors.NONE);
            }
            return CompletableFuture.completedFuture(error);
        }).orElseGet(() -> this.groupManager.getGroup(groupId).map(group -> (CompletableFuture)group.inLock(() -> {
            switch (group.currentState()) {
                case Dead: {
                    return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                }
                case Empty: {
                    return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                }
                case CompletingRebalance: {
                    if (!group.has(memberId)) {
                        return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                    }
                    return CompletableFuture.completedFuture(Errors.REBALANCE_IN_PROGRESS);
                }
                case PreparingRebalance: {
                    if (!group.has(memberId)) {
                        return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                    }
                    if (generationId != group.generationId()) {
                        return CompletableFuture.completedFuture(Errors.ILLEGAL_GENERATION);
                    }
                    MemberMetadata member = group.get(memberId);
                    this.completeAndScheduleNextHeartbeatExpiration((GroupMetadata)group, member);
                    return CompletableFuture.completedFuture(Errors.REBALANCE_IN_PROGRESS);
                }
                case Stable: {
                    if (!group.has(memberId)) {
                        return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                    }
                    if (generationId != group.generationId()) {
                        return CompletableFuture.completedFuture(Errors.ILLEGAL_GENERATION);
                    }
                    MemberMetadata member = group.get(memberId);
                    this.completeAndScheduleNextHeartbeatExpiration((GroupMetadata)group, member);
                    return CompletableFuture.completedFuture(Errors.NONE);
                }
            }
            return CompletableFuture.completedFuture(Errors.NONE);
        })).orElseGet(() -> CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID)));
    }

    public CompletableFuture<Map<TopicPartition, Errors>> handleTxnCommitOffsets(String groupId, long producerId, short producerEpoch, Map<TopicPartition, OffsetAndMetadata> offsetMetadata) {
        return this.validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT).map(error -> CompletableFuture.completedFuture(CoreUtils.mapValue(offsetMetadata, ignored -> error))).orElseGet(() -> {
            GroupMetadata group = this.groupManager.getGroup(groupId).orElseGet(() -> this.groupManager.addGroup(new GroupMetadata(groupId, GroupState.Empty)));
            return this.doCommitOffsets(group, "", -1, producerId, producerEpoch, offsetMetadata);
        });
    }

    public CompletableFuture<Map<TopicPartition, Errors>> handleCommitOffsets(String groupId, String memberId, int generationId, Map<TopicPartition, OffsetAndMetadata> offsetMetadata) {
        return this.validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT).map(error -> CompletableFuture.completedFuture(CoreUtils.mapValue(offsetMetadata, ignored -> error))).orElseGet(() -> this.groupManager.getGroup(groupId).map(group -> this.doCommitOffsets((GroupMetadata)group, memberId, generationId, -1L, (short)-1, offsetMetadata)).orElseGet(() -> {
            if (generationId < 0) {
                GroupMetadata group = this.groupManager.addGroup(new GroupMetadata(groupId, GroupState.Empty));
                return this.doCommitOffsets(group, memberId, generationId, -1L, (short)-1, offsetMetadata);
            }
            return CompletableFuture.completedFuture(CoreUtils.mapValue(offsetMetadata, ignored -> Errors.ILLEGAL_GENERATION));
        }));
    }

    public CompletableFuture<Void> scheduleHandleTxnCompletion(long producerId, Set<Integer> offsetsPartitions, TransactionResult transactionResult) {
        boolean isCommit = TransactionResult.COMMIT == transactionResult;
        return this.groupManager.scheduleHandleTxnCompletion(producerId, offsetsPartitions, isCommit);
    }

    private CompletableFuture<Map<TopicPartition, Errors>> doCommitOffsets(GroupMetadata group, String memberId, int generationId, long producerId, short producerEpoch, Map<TopicPartition, OffsetAndMetadata> offsetMetadata) {
        return (CompletableFuture)group.inLock(() -> {
            if (group.is(GroupState.Dead)) {
                return CompletableFuture.completedFuture(CoreUtils.mapValue(offsetMetadata, ignored -> Errors.UNKNOWN_MEMBER_ID));
            }
            if (generationId < 0 && group.is(GroupState.Empty) || producerId != -1L) {
                return this.groupManager.storeOffsets(group, memberId, offsetMetadata, producerId, producerEpoch);
            }
            if (group.is(GroupState.CompletingRebalance)) {
                return CompletableFuture.completedFuture(CoreUtils.mapValue(offsetMetadata, ignored -> Errors.REBALANCE_IN_PROGRESS));
            }
            if (!group.has(memberId)) {
                return CompletableFuture.completedFuture(CoreUtils.mapValue(offsetMetadata, ignored -> Errors.UNKNOWN_MEMBER_ID));
            }
            if (generationId != group.generationId()) {
                return CompletableFuture.completedFuture(CoreUtils.mapValue(offsetMetadata, ignored -> Errors.ILLEGAL_GENERATION));
            }
            MemberMetadata member = group.get(memberId);
            this.completeAndScheduleNextHeartbeatExpiration(group, member);
            return this.groupManager.storeOffsets(group, memberId, offsetMetadata);
        });
    }

    public KeyValue<Errors, Map<TopicPartition, OffsetFetchResponse.PartitionData>> handleFetchOffsets(String groupId, Optional<List<TopicPartition>> partitions) {
        return this.validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH).map(errors -> new KeyValue(errors, new HashMap())).orElseGet(() -> new KeyValue((Object)Errors.NONE, this.groupManager.getOffsets(groupId, partitions)));
    }

    public KeyValue<Errors, List<GroupMetadata.GroupOverview>> handleListGroups() {
        if (!this.isActive.get()) {
            return new KeyValue((Object)Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList());
        }
        Errors errors = this.groupManager.isLoading() ? Errors.COORDINATOR_LOAD_IN_PROGRESS : Errors.NONE;
        ArrayList overviews = new ArrayList();
        this.groupManager.currentGroups().forEach(group -> overviews.add(group.overview()));
        return new KeyValue((Object)errors, overviews);
    }

    public KeyValue<Errors, GroupMetadata.GroupSummary> handleDescribeGroup(String groupId) {
        return this.validateGroupStatus(groupId, ApiKeys.DESCRIBE_GROUPS).map(error -> new KeyValue(error, (Object)EmptyGroup)).orElseGet(() -> this.groupManager.getGroup(groupId).map(group -> (KeyValue)group.inLock(() -> new KeyValue((Object)Errors.NONE, (Object)group.summary()))).orElseGet(() -> new KeyValue((Object)Errors.NONE, (Object)DeadGroup)));
    }

    public CompletableFuture<Integer> handleDeletedPartitions(Set<TopicPartition> topicPartitions) {
        return this.groupManager.cleanGroupMetadata(this.groupManager.currentGroupsStream(), group -> group.removeOffsets(topicPartitions.stream())).thenApply(offsetsRemoved -> {
            log.info("Removed {} offsets associated with deleted partitions: {}", offsetsRemoved, (Object)topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",")));
            return offsetsRemoved;
        });
    }

    private boolean isValidGroupId(String groupId, ApiKeys api) {
        switch (api) {
            case OFFSET_COMMIT: 
            case OFFSET_FETCH: 
            case DESCRIBE_GROUPS: 
            case DELETE_GROUPS: {
                return groupId != null;
            }
        }
        return groupId != null && !groupId.isEmpty();
    }

    private Optional<Errors> validateGroupStatus(String groupId, ApiKeys api) {
        if (!this.isValidGroupId(groupId, api)) {
            return Optional.of(Errors.INVALID_GROUP_ID);
        }
        if (!this.isActive.get()) {
            return Optional.of(Errors.COORDINATOR_NOT_AVAILABLE);
        }
        if (this.groupManager.isGroupLoading(groupId)) {
            return Optional.of(Errors.COORDINATOR_LOAD_IN_PROGRESS);
        }
        if (!this.groupManager.isGroupLocal(groupId)) {
            return Optional.of(Errors.NOT_COORDINATOR);
        }
        return Optional.empty();
    }

    private void onGroupUnloaded(GroupMetadata group) {
        group.inLock(() -> {
            log.info("Unloading group metadata for {} with generation {}", (Object)group.groupId(), (Object)group.generationId());
            GroupState previousState = group.currentState();
            group.transitionTo(GroupState.Dead);
            switch (previousState) {
                case Dead: 
                case PreparingRebalance: 
                case Empty: {
                    for (MemberMetadata member : group.allMemberMetadata()) {
                        if (member.awaitingJoinCallback() == null) continue;
                        member.awaitingJoinCallback().complete(this.joinError(member.memberId(), Errors.NOT_COORDINATOR));
                        member.awaitingJoinCallback(null);
                    }
                    this.joinPurgatory.checkAndComplete(new DelayedOperationKey.GroupKey(group.groupId()));
                    break;
                }
                case CompletingRebalance: 
                case Stable: {
                    for (MemberMetadata member : group.allMemberMetadata()) {
                        if (member.awaitingSyncCallback() != null) {
                            member.awaitingSyncCallback().accept(new byte[0], Errors.NOT_COORDINATOR);
                            member.awaitingSyncCallback(null);
                        }
                        this.heartbeatPurgatory.checkAndComplete(new DelayedOperationKey.MemberKey(member.groupId(), member.memberId()));
                    }
                    break;
                }
            }
            return null;
        });
    }

    private void onGroupLoaded(GroupMetadata group) {
        group.inLock(() -> {
            log.info("Loading group metadata for {} with generation {}", (Object)group.groupId(), (Object)group.generationId());
            Preconditions.checkArgument((group.is(GroupState.Stable) || group.is(GroupState.Empty) ? 1 : 0) != 0);
            group.allMemberMetadata().forEach(member -> this.completeAndScheduleNextHeartbeatExpiration(group, (MemberMetadata)member));
            return null;
        });
    }

    public CompletableFuture<Void> handleGroupImmigration(int offsetTopicPartitionId) {
        return this.groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, this::onGroupLoaded);
    }

    public void handleGroupEmigration(int offsetTopicPartition) {
        this.groupManager.removeGroupsForPartition(offsetTopicPartition, this::onGroupUnloaded);
    }

    private void setAndPropagateAssignment(GroupMetadata group, Map<String, byte[]> assignment) {
        Preconditions.checkState((boolean)group.is(GroupState.CompletingRebalance));
        group.allMemberMetadata().forEach(member -> member.assignment((byte[])assignment.get(member.memberId())));
        this.propagateAssignment(group, Errors.NONE);
    }

    private void resetAndPropagateAssignmentError(GroupMetadata group, Errors error) {
        Preconditions.checkState((boolean)group.is(GroupState.CompletingRebalance));
        group.allMemberMetadata().forEach(m -> m.assignment(new byte[0]));
        this.propagateAssignment(group, error);
    }

    private void propagateAssignment(GroupMetadata group, Errors error) {
        for (MemberMetadata member : group.allMemberMetadata()) {
            if (member.awaitingSyncCallback() == null) continue;
            member.awaitingSyncCallback().accept(member.assignment(), error);
            member.awaitingSyncCallback(null);
            this.completeAndScheduleNextHeartbeatExpiration(group, member);
        }
    }

    private JoinGroupResult joinError(String memberId, Errors error) {
        return new JoinGroupResult(Collections.emptyMap(), memberId, 0, "", "", "", error);
    }

    private void completeAndScheduleNextHeartbeatExpiration(GroupMetadata group, MemberMetadata member) {
        member.latestHeartbeat(this.time.milliseconds());
        DelayedOperationKey.MemberKey memberKey = new DelayedOperationKey.MemberKey(member.groupId(), member.memberId());
        this.heartbeatPurgatory.checkAndComplete(memberKey);
        long newHeartbeatDeadline = member.latestHeartbeat() + (long)member.sessionTimeoutMs();
        DelayedHeartbeat delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs());
        this.heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Lists.newArrayList((Object[])new Object[]{memberKey}));
    }

    private void removeHeartbeatForLeavingMember(MemberMetadata member) {
        member.isLeaving(true);
        DelayedOperationKey.MemberKey memberKey = new DelayedOperationKey.MemberKey(member.groupId(), member.memberId());
        this.heartbeatPurgatory.checkAndComplete(memberKey);
    }

    private CompletableFuture<JoinGroupResult> addMemberAndRebalance(int rebalanceTimeoutMs, int sessionTimeoutMs, String clientId, String clientHost, String protocolType, Map<String, byte[]> protocols, GroupMetadata group) {
        String memberId = clientId + "-" + group.generateMemberIdSuffix();
        MemberMetadata member = new MemberMetadata(memberId, group.groupId(), clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols);
        CompletableFuture<JoinGroupResult> joinFuture = new CompletableFuture<JoinGroupResult>();
        member.awaitingJoinCallback(joinFuture);
        if (group.is(GroupState.PreparingRebalance) && group.generationId() == 0) {
            group.newMemberAdded(true);
        }
        group.add(member);
        this.maybePrepareRebalance(group);
        return joinFuture;
    }

    private CompletableFuture<JoinGroupResult> updateMemberAndRebalance(GroupMetadata group, MemberMetadata member, Map<String, byte[]> protocols) {
        CompletableFuture<JoinGroupResult> resultFuture = new CompletableFuture<JoinGroupResult>();
        member.supportedProtocols(protocols);
        member.awaitingJoinCallback(resultFuture);
        this.maybePrepareRebalance(group);
        return resultFuture;
    }

    private void maybePrepareRebalance(GroupMetadata group) {
        group.inLock(() -> {
            if (group.canRebalance()) {
                this.prepareRebalance(group);
            }
            return null;
        });
    }

    private void prepareRebalance(GroupMetadata group) {
        if (group.is(GroupState.CompletingRebalance)) {
            this.resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS);
        }
        DelayedJoin delayedRebalance = group.is(GroupState.Empty) ? new InitialDelayedJoin(this, this.joinPurgatory, group, this.groupConfig.groupInitialRebalanceDelayMs(), this.groupConfig.groupInitialRebalanceDelayMs(), Math.max(group.rebalanceTimeoutMs() - this.groupConfig.groupInitialRebalanceDelayMs(), 0)) : new DelayedJoin(this, group, group.rebalanceTimeoutMs());
        log.info("Preparing to rebalance group {} ({}) with old generation {} ({}-{})", new Object[]{group.groupId(), group.currentState().name(), group.generationId(), "__consumer_offsets", this.groupManager.partitionFor(group.groupId())});
        group.transitionTo(GroupState.PreparingRebalance);
        DelayedOperationKey.GroupKey groupKey = new DelayedOperationKey.GroupKey(group.groupId());
        this.joinPurgatory.tryCompleteElseWatch(delayedRebalance, Lists.newArrayList((Object[])new Object[]{groupKey}));
    }

    private void removeMemberAndUpdateGroup(GroupMetadata group, MemberMetadata member) {
        group.remove(member.memberId());
        switch (group.currentState()) {
            case Dead: 
            case Empty: {
                break;
            }
            case CompletingRebalance: 
            case Stable: {
                this.maybePrepareRebalance(group);
                break;
            }
            case PreparingRebalance: {
                this.joinPurgatory.checkAndComplete(new DelayedOperationKey.GroupKey(group.groupId()));
                break;
            }
        }
    }

    boolean tryCompleteJoin(GroupMetadata group, Supplier<Boolean> forceComplete) {
        return (Boolean)group.inLock(() -> {
            if (group.notYetRejoinedMembers().isEmpty()) {
                return (Boolean)forceComplete.get();
            }
            return false;
        });
    }

    void onExpireJoin() {
    }

    void onCompleteJoin(GroupMetadata group) {
        group.inLock(() -> {
            group.notYetRejoinedMembers().forEach(failedMember -> {
                this.removeHeartbeatForLeavingMember((MemberMetadata)failedMember);
                group.remove(failedMember.memberId());
            });
            if (!group.is(GroupState.Dead)) {
                group.initNextGeneration();
                if (group.is(GroupState.Empty)) {
                    log.info("Group {} with generation {} is now empty {}-{}", new Object[]{group.groupId(), group.generationId(), "__consumer_offsets", this.groupManager.partitionFor(group.groupId())});
                    this.groupManager.storeGroup(group, Collections.emptyMap()).thenAccept(error -> {
                        if (error != Errors.NONE) {
                            log.warn("Failed to write empty metadata for group {}: {}", (Object)group.groupId(), (Object)error.message());
                        }
                        if (log.isDebugEnabled()) {
                            log.warn("add partition ownership for group {}", (Object)group.groupId());
                        }
                        this.groupManager.addPartitionOwnership(this.groupManager.partitionFor(group.groupId()));
                    });
                } else {
                    log.info("Stabilized group {} generation {} ({}-{})", new Object[]{group.groupId(), group.generationId(), "__consumer_offsets", this.groupManager.partitionFor(group.groupId())});
                    for (MemberMetadata member : group.allMemberMetadata()) {
                        Objects.requireNonNull(member.awaitingJoinCallback());
                        Map<Object, Object> members = group.isLeader(member.memberId()) ? group.currentMemberMetadata() : Collections.emptyMap();
                        JoinGroupResult joinResult = new JoinGroupResult(members, member.memberId(), group.generationId(), group.protocolOrNull(), group.protocolTypeOrNull(), group.leaderOrNull(), Errors.NONE);
                        member.awaitingJoinCallback().complete(joinResult);
                        member.awaitingJoinCallback(null);
                        this.completeAndScheduleNextHeartbeatExpiration(group, member);
                    }
                }
            }
            return null;
        });
    }

    boolean tryCompleteHeartbeat(GroupMetadata group, MemberMetadata member, long heartbeatDeadline, Supplier<Boolean> forceComplete) {
        return (Boolean)group.inLock(() -> {
            if (this.shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving()) {
                return (Boolean)forceComplete.get();
            }
            return false;
        });
    }

    void onExpireHeartbeat(GroupMetadata group, MemberMetadata member, long heartbeatDeadline) {
        group.inLock(() -> {
            if (!this.shouldKeepMemberAlive(member, heartbeatDeadline)) {
                log.info("Member {} in group {} has failed, removing it from the group", (Object)member.memberId(), (Object)group.groupId());
                this.removeMemberAndUpdateGroup(group, member);
            }
            return null;
        });
    }

    void onCompleteHeartbeat() {
    }

    private boolean shouldKeepMemberAlive(MemberMetadata member, long heartbeatDeadline) {
        return member.awaitingJoinCallback() != null || member.awaitingSyncCallback() != null || member.latestHeartbeat() + (long)member.sessionTimeoutMs() > heartbeatDeadline;
    }

    public boolean isActive() {
        return this.isActive.get();
    }
}

