/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.coordinator.group;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataConstants;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupState;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupMetadataManager {
    private static final Logger log = LoggerFactory.getLogger(GroupMetadataManager.class);
    private final byte magicValue = (byte)2;
    private final CompressionType compressionType;
    private final OffsetConfig offsetConfig;
    private final String namespacePrefix;
    private final ConcurrentMap<String, GroupMetadata> groupMetadataCache = new ConcurrentHashMap<String, GroupMetadata>();
    private final ReentrantLock partitionLock = new ReentrantLock();
    private final Set<Integer> loadingPartitions = new HashSet<Integer>();
    private final Set<Integer> ownedPartitions = new HashSet<Integer>();
    private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
    private final ConcurrentMap<Integer, CompletableFuture<Producer<ByteBuffer>>> offsetsProducers = new ConcurrentHashMap<Integer, CompletableFuture<Producer<ByteBuffer>>>();
    private final ConcurrentMap<Integer, CompletableFuture<Reader<ByteBuffer>>> offsetsReaders = new ConcurrentHashMap<Integer, CompletableFuture<Reader<ByteBuffer>>>();
    private final ScheduledExecutorService scheduler;
    private final Map<Long, Set<String>> openGroupsForProducer = new HashMap<Long, Set<String>>();
    private final ProducerBuilder<ByteBuffer> metadataTopicProducerBuilder;
    private final ReaderBuilder<ByteBuffer> metadataTopicReaderBuilder;
    private final Time time;

    public GroupMetadataManager(OffsetConfig offsetConfig, ProducerBuilder<ByteBuffer> metadataTopicProducerBuilder, ReaderBuilder<ByteBuffer> metadataTopicReaderBuilder, ScheduledExecutorService scheduler, String namespacePrefixForMetadata, Time time) {
        this.offsetConfig = offsetConfig;
        this.compressionType = offsetConfig.offsetsTopicCompressionType();
        this.metadataTopicProducerBuilder = metadataTopicProducerBuilder;
        this.metadataTopicReaderBuilder = metadataTopicReaderBuilder;
        this.scheduler = scheduler;
        this.namespacePrefix = namespacePrefixForMetadata;
        this.time = time;
    }

    public static int getPartitionId(String groupId, int offsetsTopicNumPartitions) {
        return MathUtils.signSafeMod((long)groupId.hashCode(), (int)offsetsTopicNumPartitions);
    }

    public void startup(boolean enableMetadataExpiration) {
        if (enableMetadataExpiration) {
            this.scheduler.scheduleAtFixedRate(this::cleanupGroupMetadata, this.offsetConfig.offsetsRetentionCheckIntervalMs(), this.offsetConfig.offsetsRetentionCheckIntervalMs(), TimeUnit.MILLISECONDS);
        }
    }

    public void shutdown() {
        this.shuttingDown.set(true);
        List producerCloses = this.offsetsProducers.values().stream().map(producerCompletableFuture -> producerCompletableFuture.thenComposeAsync(Producer::closeAsync, (Executor)this.scheduler)).collect(Collectors.toList());
        this.offsetsProducers.clear();
        List readerCloses = this.offsetsReaders.values().stream().map(readerCompletableFuture -> readerCompletableFuture.thenComposeAsync(Reader::closeAsync, (Executor)this.scheduler)).collect(Collectors.toList());
        this.offsetsReaders.clear();
        FutureUtil.waitForAll(producerCloses).whenCompleteAsync((ignore, t) -> {
            if (t != null) {
                log.error("Error when close all the {} offsetsProducers in GroupMetadataManager", (Object)producerCloses.size(), t);
            }
            if (log.isDebugEnabled()) {
                log.debug("Closed all the {} offsetsProducers in GroupMetadataManager", (Object)producerCloses.size());
            }
        }, (Executor)this.scheduler);
        FutureUtil.waitForAll(readerCloses).whenCompleteAsync((ignore, t) -> {
            if (t != null) {
                log.error("Error when close all the {} offsetsReaders in GroupMetadataManager", (Object)readerCloses.size(), t);
            }
            if (log.isDebugEnabled()) {
                log.debug("Closed all the {} offsetsReaders in GroupMetadataManager.", (Object)readerCloses.size());
            }
        }, (Executor)this.scheduler);
        this.scheduler.shutdown();
    }

    public ConcurrentMap<Integer, CompletableFuture<Producer<ByteBuffer>>> getOffsetsProducers() {
        return this.offsetsProducers;
    }

    public ConcurrentMap<Integer, CompletableFuture<Reader<ByteBuffer>>> getOffsetsReaders() {
        return this.offsetsReaders;
    }

    public Iterable<GroupMetadata> currentGroups() {
        return this.groupMetadataCache.values();
    }

    public Stream<GroupMetadata> currentGroupsStream() {
        return this.groupMetadataCache.values().stream();
    }

    public boolean isPartitionOwned(int partition) {
        return CoreUtils.inLock(this.partitionLock, () -> this.ownedPartitions.contains(partition));
    }

    public boolean isPartitionLoading(int partition) {
        return CoreUtils.inLock(this.partitionLock, () -> this.loadingPartitions.contains(partition));
    }

    public int partitionFor(String groupId) {
        return GroupMetadataManager.getPartitionId(groupId, this.offsetConfig().offsetsTopicNumPartitions());
    }

    public String getTopicPartitionName() {
        return this.offsetConfig.offsetsTopicName();
    }

    public String getTopicPartitionName(int partitionId) {
        return this.offsetConfig.offsetsTopicName() + "-partition-" + partitionId;
    }

    public boolean isGroupLocal(String groupId) {
        return this.isPartitionOwned(this.partitionFor(groupId));
    }

    public boolean isGroupLoading(String groupId) {
        return this.isPartitionLoading(this.partitionFor(groupId));
    }

    public boolean isLoading() {
        return CoreUtils.inLock(this.partitionLock, () -> !this.loadingPartitions.isEmpty());
    }

    public OffsetConfig offsetConfig() {
        return this.offsetConfig;
    }

    public boolean groupNotExists(String groupId) {
        return CoreUtils.inLock(this.partitionLock, () -> this.isGroupLocal(groupId) && this.getGroup(groupId).map(group -> (Boolean)group.inLock(() -> group.is(GroupState.Dead))).orElse(true) != false);
    }

    public Optional<GroupMetadata> getGroup(String groupId) {
        return Optional.ofNullable(this.groupMetadataCache.getOrDefault(groupId, null));
    }

    public GroupMetadata addGroup(GroupMetadata group) {
        GroupMetadata oldGroup = this.groupMetadataCache.putIfAbsent(group.groupId(), group);
        if (null != oldGroup) {
            return oldGroup;
        }
        return group;
    }

    public CompletableFuture<Errors> storeGroup(GroupMetadata group, Map<String, byte[]> groupAssignment) {
        TimestampType timestampType = TimestampType.CREATE_TIME;
        long timestamp = this.time.milliseconds();
        byte[] key = GroupMetadataConstants.groupMetadataKey(group.groupId());
        byte[] value = GroupMetadataConstants.groupMetadataValue(group, groupAssignment, (short)1);
        ByteBuffer buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes((byte)2, (CompressionType)this.compressionType, (Iterable)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(timestamp, key, value)})));
        MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)this.compressionType, (TimestampType)timestampType, (long)0L);
        recordsBuilder.append(timestamp, key, value);
        MemoryRecords records = recordsBuilder.build();
        return ((CompletableFuture)((CompletableFuture)this.getOffsetsTopicProducer(group.groupId()).thenComposeAsync(f -> f.newMessage().keyBytes(key).value((Object)records.buffer()).eventTime(timestamp).sendAsync(), (Executor)this.scheduler)).thenApplyAsync(msgId -> {
            if (!this.isGroupLocal(group.groupId())) {
                if (log.isDebugEnabled()) {
                    log.warn("add partition ownership for group {}", (Object)group.groupId());
                }
                this.addPartitionOwnership(this.partitionFor(group.groupId()));
            }
            return Errors.NONE;
        }, (Executor)this.scheduler)).exceptionally(cause -> Errors.COORDINATOR_NOT_AVAILABLE);
    }

    CompletableFuture<MessageId> storeOffsetMessage(String groupId, byte[] key, ByteBuffer buffer, long timestamp) {
        return this.getOffsetsTopicProducer(groupId).thenComposeAsync(f -> f.newMessage().keyBytes(key).value((Object)buffer).eventTime(timestamp).sendAsync(), (Executor)this.scheduler);
    }

    public CompletableFuture<Map<TopicPartition, Errors>> storeOffsets(GroupMetadata group, String consumerId, Map<TopicPartition, OffsetAndMetadata> offsetMetadata) {
        return this.storeOffsets(group, consumerId, offsetMetadata, -1L, (short)-1);
    }

    public CompletableFuture<Map<TopicPartition, Errors>> storeOffsets(GroupMetadata group, String consumerId, Map<TopicPartition, OffsetAndMetadata> offsetMetadata, long producerId, short producerEpoch) {
        boolean isTxnOffsetCommit;
        Map<TopicPartition, OffsetAndMetadata> filteredOffsetMetadata = offsetMetadata.entrySet().stream().filter(entry -> this.validateOffsetMetadataLength(((OffsetAndMetadata)entry.getValue()).metadata())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        group.inLock(() -> {
            if (!group.hasReceivedConsistentOffsetCommits()) {
                log.warn("group: {} with leader: {} has received offset commits from consumers as well as transactional offsetsProducers. Mixing both types of offset commits will generally result in surprises and should be avoided.", (Object)group.groupId(), (Object)group.leaderOrNull());
            }
            return null;
        });
        boolean bl = isTxnOffsetCommit = producerId != -1L;
        if (filteredOffsetMetadata.isEmpty()) {
            Map<TopicPartition, Errors> commitStatus = offsetMetadata.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> Errors.OFFSET_METADATA_TOO_LARGE));
            return CompletableFuture.completedFuture(commitStatus);
        }
        TimestampType timestampType = TimestampType.CREATE_TIME;
        long timestamp = this.time.milliseconds();
        List<SimpleRecord> records = filteredOffsetMetadata.entrySet().stream().map(e -> {
            byte[] key = GroupMetadataConstants.offsetCommitKey(group.groupId(), (TopicPartition)e.getKey(), this.namespacePrefix);
            byte[] value = GroupMetadataConstants.offsetCommitValue((OffsetAndMetadata)e.getValue());
            return new SimpleRecord(timestamp, key, value);
        }).collect(Collectors.toList());
        ByteBuffer buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes((byte)2, (CompressionType)this.compressionType, records));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)this.compressionType, (TimestampType)timestampType, (long)0L, (long)timestamp, (long)producerId, (short)producerEpoch, (int)0, (boolean)isTxnOffsetCommit, (int)-1);
        records.forEach(arg_0 -> ((MemoryRecordsBuilder)builder).append(arg_0));
        MemoryRecords entries = builder.build();
        if (isTxnOffsetCommit) {
            group.inLock(() -> {
                this.addProducerGroup(producerId, group.groupId());
                group.prepareTxnOffsetCommit(producerId, offsetMetadata);
                return null;
            });
        } else {
            group.inLock(() -> {
                group.prepareOffsetCommit(offsetMetadata);
                return null;
            });
        }
        byte[] key = GroupMetadataConstants.offsetCommitKey(group.groupId(), new TopicPartition("", -1), this.namespacePrefix);
        return ((CompletableFuture)((CompletableFuture)this.storeOffsetMessage(group.groupId(), key, entries.buffer(), timestamp).thenApplyAsync(messageId -> {
            if (!group.is(GroupState.Dead)) {
                MessageIdImpl lastMessageId = (MessageIdImpl)messageId;
                filteredOffsetMetadata.forEach((tp, offsetAndMetadata) -> {
                    GroupMetadata.CommitRecordMetadataAndOffset commitRecordMetadataAndOffset = new GroupMetadata.CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId())), (OffsetAndMetadata)offsetAndMetadata);
                    if (isTxnOffsetCommit) {
                        group.onTxnOffsetCommitAppend(producerId, (TopicPartition)tp, commitRecordMetadataAndOffset);
                    } else {
                        group.onOffsetCommitAppend((TopicPartition)tp, commitRecordMetadataAndOffset);
                    }
                });
            }
            return Errors.NONE;
        }, (Executor)this.scheduler)).exceptionally(cause -> {
            if (!group.is(GroupState.Dead)) {
                if (!group.hasPendingOffsetCommitsFromProducer(producerId)) {
                    this.removeProducerGroup(producerId, group.groupId());
                }
                filteredOffsetMetadata.forEach((tp, offsetAndMetadata) -> {
                    if (isTxnOffsetCommit) {
                        group.failPendingTxnOffsetCommit(producerId, (TopicPartition)tp);
                    } else {
                        group.failPendingOffsetWrite((TopicPartition)tp, (OffsetAndMetadata)offsetAndMetadata);
                    }
                });
            }
            log.error("Offset commit {} from group {}, consumer {} with generation {} failed when appending to log due to ", new Object[]{filteredOffsetMetadata, group.groupId(), consumerId, group.generationId(), cause});
            if (cause.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                this.offsetsProducers.remove(this.partitionFor(group.groupId()));
                return Errors.NOT_COORDINATOR;
            }
            return Errors.UNKNOWN_SERVER_ERROR;
        })).thenApplyAsync(errors -> offsetMetadata.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
            if (this.validateOffsetMetadataLength(((OffsetAndMetadata)e.getValue()).metadata())) {
                return errors;
            }
            return Errors.OFFSET_METADATA_TOO_LARGE;
        })), (Executor)this.scheduler);
    }

    public Map<TopicPartition, OffsetFetchResponse.PartitionData> getOffsets(String groupId, Optional<List<TopicPartition>> topicPartitionsOpt) {
        GroupMetadata group;
        if (log.isTraceEnabled()) {
            log.trace("Getting offsets of {} for group {}.", (Object)topicPartitionsOpt.map(Object::toString).orElse("all partitions"), (Object)groupId);
        }
        if (null == (group = (GroupMetadata)this.groupMetadataCache.get(groupId))) {
            return topicPartitionsOpt.orElse(Collections.emptyList()).stream().collect(Collectors.toMap(tp -> tp, __ -> KafkaResponseUtils.newOffsetFetchPartition()));
        }
        return (Map)group.inLock(() -> {
            if (group.is(GroupState.Dead)) {
                return topicPartitionsOpt.orElse(Collections.emptyList()).stream().collect(Collectors.toMap(tp -> tp, tp -> KafkaResponseUtils.newOffsetFetchPartition()));
            }
            return topicPartitionsOpt.map(topicPartitions -> topicPartitions.stream().collect(Collectors.toMap(tp -> tp, topicPartition -> group.offset((TopicPartition)topicPartition, this.namespacePrefix).map(offsetAndMetadata -> KafkaResponseUtils.newOffsetFetchPartition(offsetAndMetadata.offset(), offsetAndMetadata.metadata())).orElseGet(KafkaResponseUtils::newOffsetFetchPartition)))).orElseGet(() -> group.allOffsets().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
                OffsetAndMetadata oam = (OffsetAndMetadata)e.getValue();
                return KafkaResponseUtils.newOffsetFetchPartition(oam.offset(), oam.metadata());
            })));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addProducerGroup(long producerId, String groupId) {
        Map<Long, Set<String>> map = this.openGroupsForProducer;
        synchronized (map) {
            this.openGroupsForProducer.computeIfAbsent(producerId, pid -> new HashSet()).add(groupId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeProducerGroup(long producerId, String groupId) {
        Map<Long, Set<String>> map = this.openGroupsForProducer;
        synchronized (map) {
            Set<String> groups = this.openGroupsForProducer.get(producerId);
            if (null != groups) {
                groups.remove(groupId);
                if (groups.isEmpty()) {
                    this.openGroupsForProducer.remove(producerId);
                }
            }
            this.openGroupsForProducer.computeIfAbsent(producerId, pid -> new HashSet()).remove(producerId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<String> groupsBelongingToPartitions(long producerId, Set<Integer> partitions) {
        Map<Long, Set<String>> map = this.openGroupsForProducer;
        synchronized (map) {
            return this.openGroupsForProducer.computeIfAbsent(producerId, pid -> new HashSet()).stream().filter(group -> partitions.contains(this.partitionFor((String)group))).collect(Collectors.toSet());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeGroupFromAllProducers(String groupId) {
        Map<Long, Set<String>> map = this.openGroupsForProducer;
        synchronized (map) {
            this.openGroupsForProducer.forEach((pid, groups) -> groups.remove(groupId));
        }
    }

    private boolean validateOffsetMetadataLength(String metadata) {
        return metadata == null || metadata.length() <= this.offsetConfig.maxMetadataSize();
    }

    public CompletableFuture<Void> scheduleLoadGroupAndOffsets(int offsetsPartition, Consumer<GroupMetadata> onGroupLoaded) {
        String topicPartition = this.getTopicPartitionName(offsetsPartition);
        if (this.addLoadingPartition(offsetsPartition)) {
            log.info("Scheduling loading of offsets and group metadata from {}", (Object)topicPartition);
            long startMs = this.time.milliseconds();
            return ((CompletableFuture)((CompletableFuture)this.getOffsetsTopicProducer(offsetsPartition).thenComposeAsync(f -> f.newMessage().value((Object)ByteBuffer.allocate(0)).eventTime(this.time.milliseconds()).sendAsync(), (Executor)this.scheduler)).thenComposeAsync(lastMessageId -> {
                if (log.isTraceEnabled()) {
                    log.trace("Successfully write a placeholder record into {} @ {}", (Object)topicPartition, lastMessageId);
                }
                return this.doLoadGroupsAndOffsets(this.getOffsetsTopicReader(offsetsPartition), (MessageId)lastMessageId, onGroupLoaded);
            }, (Executor)this.scheduler)).whenCompleteAsync((ignored, cause) -> {
                CoreUtils.inLock(this.partitionLock, () -> {
                    this.ownedPartitions.add(offsetsPartition);
                    this.loadingPartitions.remove(offsetsPartition);
                    return null;
                });
                if (null != cause) {
                    log.error("Error loading offsets from {}", (Object)topicPartition, cause);
                    return;
                }
                log.info("Finished loading offsets and group metadata from {} in {} milliseconds", (Object)topicPartition, (Object)(this.time.milliseconds() - startMs));
            }, (Executor)this.scheduler);
        }
        log.info("Already loading offsets and group metadata from {}", (Object)topicPartition);
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> doLoadGroupsAndOffsets(CompletableFuture<Reader<ByteBuffer>> metadataConsumer, MessageId endMessageId, Consumer<GroupMetadata> onGroupLoaded) {
        HashMap<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> loadedOffsets = new HashMap<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>();
        HashMap<Long, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> pendingOffsets = new HashMap<Long, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>>();
        HashMap<String, GroupMetadata> loadedGroups = new HashMap<String, GroupMetadata>();
        HashSet<String> removedGroups = new HashSet<String>();
        CompletableFuture<Void> resultFuture = new CompletableFuture<Void>();
        this.loadNextMetadataMessage(metadataConsumer, endMessageId, resultFuture, onGroupLoaded, loadedOffsets, pendingOffsets, loadedGroups, removedGroups);
        return resultFuture;
    }

    private void loadNextMetadataMessage(CompletableFuture<Reader<ByteBuffer>> metadataConsumer, MessageId endMessageId, CompletableFuture<Void> resultFuture, Consumer<GroupMetadata> onGroupLoaded, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> loadedOffsets, Map<Long, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> pendingOffsets, Map<String, GroupMetadata> loadedGroups, Set<String> removedGroups) {
        try {
            this.unsafeLoadNextMetadataMessage(metadataConsumer, endMessageId, resultFuture, onGroupLoaded, loadedOffsets, pendingOffsets, loadedGroups, removedGroups);
        }
        catch (Throwable cause) {
            log.error("Unknown exception caught when loading group and offsets from topic", cause);
            resultFuture.completeExceptionally(cause);
        }
    }

    private void unsafeLoadNextMetadataMessage(CompletableFuture<Reader<ByteBuffer>> metadataConsumer, MessageId endMessageId, CompletableFuture<Void> resultFuture, Consumer<GroupMetadata> onGroupLoaded, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> loadedOffsets, Map<Long, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> pendingOffsets, Map<String, GroupMetadata> loadedGroups, Set<String> removedGroups) {
        if (this.shuttingDown.get()) {
            resultFuture.completeExceptionally(new Exception("Group metadata manager is shutting down"));
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("Reading the next metadata message from topic {}", (Object)metadataConsumer.join().getTopic());
        }
        BiConsumer<Message, Throwable> readNextComplete = (message, cause) -> {
            if (log.isTraceEnabled()) {
                log.trace("Metadata consumer received a metadata message from {} @ {}", (Object)((Reader)metadataConsumer.join()).getTopic(), (Object)message.getMessageId());
            }
            if (null != cause) {
                resultFuture.completeExceptionally((Throwable)cause);
                return;
            }
            if (message.getMessageId().compareTo((Object)endMessageId) >= 0) {
                this.processLoadedAndRemovedGroups(resultFuture, onGroupLoaded, loadedOffsets, pendingOffsets, loadedGroups, removedGroups);
                return;
            }
            if (!message.hasKey()) {
                this.loadNextMetadataMessage(metadataConsumer, endMessageId, resultFuture, onGroupLoaded, loadedOffsets, pendingOffsets, loadedGroups, removedGroups);
                return;
            }
            ByteBuffer buffer = (ByteBuffer)message.getValue();
            MemoryRecords memRecords = MemoryRecords.readableRecords((ByteBuffer)buffer);
            memRecords.batches().forEach(batch -> {
                boolean isTxnOffsetCommit = batch.isTransactional();
                if (batch.isControlBatch()) {
                    Iterator recordIterator = batch.iterator();
                    if (recordIterator.hasNext()) {
                        Record record = (Record)recordIterator.next();
                        ControlRecordType controlRecord = ControlRecordType.parse((ByteBuffer)record.key());
                        if (controlRecord == ControlRecordType.COMMIT) {
                            pendingOffsets.getOrDefault(batch.producerId(), Collections.emptyMap()).forEach((groupTopicPartition, commitRecordMetadataAndOffset) -> {
                                if (!loadedOffsets.containsKey(groupTopicPartition) || ((GroupMetadata.CommitRecordMetadataAndOffset)loadedOffsets.get(groupTopicPartition)).olderThan((GroupMetadata.CommitRecordMetadataAndOffset)commitRecordMetadataAndOffset)) {
                                    loadedOffsets.put((GroupTopicPartition)groupTopicPartition, (GroupMetadata.CommitRecordMetadataAndOffset)commitRecordMetadataAndOffset);
                                }
                            });
                        }
                        pendingOffsets.remove(batch.producerId());
                    }
                } else {
                    Optional<PositionImpl> batchBaseOffset = Optional.empty();
                    for (Record record : batch) {
                        Preconditions.checkArgument((boolean)record.hasKey(), (Object)"Group metadata/offset entry key should not be null");
                        if (!batchBaseOffset.isPresent()) {
                            batchBaseOffset = Optional.of(new PositionImpl(0L, record.offset()));
                        }
                        BaseKey bk = GroupMetadataConstants.readMessageKey(record.key());
                        if (log.isTraceEnabled()) {
                            log.trace("Applying metadata record {} received from {}", (Object)bk, (Object)((Reader)metadataConsumer.join()).getTopic());
                        }
                        if (bk instanceof OffsetKey) {
                            OffsetKey offsetKey = (OffsetKey)bk;
                            if (isTxnOffsetCommit && !pendingOffsets.containsKey(batch.producerId())) {
                                pendingOffsets.put(batch.producerId(), new HashMap());
                            }
                            GroupTopicPartition groupTopicPartition2 = offsetKey.key();
                            if (!record.hasValue()) {
                                if (isTxnOffsetCommit) {
                                    ((Map)pendingOffsets.get(batch.producerId())).remove(groupTopicPartition2);
                                    continue;
                                }
                                loadedOffsets.remove(groupTopicPartition2);
                                continue;
                            }
                            OffsetAndMetadata offsetAndMetadata = GroupMetadataConstants.readOffsetMessageValue(record.value());
                            GroupMetadata.CommitRecordMetadataAndOffset commitRecordMetadataAndOffset2 = new GroupMetadata.CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata);
                            if (isTxnOffsetCommit) {
                                ((Map)pendingOffsets.get(batch.producerId())).put(groupTopicPartition2, commitRecordMetadataAndOffset2);
                                continue;
                            }
                            loadedOffsets.put(groupTopicPartition2, commitRecordMetadataAndOffset2);
                            continue;
                        }
                        if (bk instanceof GroupMetadataKey) {
                            GroupMetadataKey groupMetadataKey = (GroupMetadataKey)bk;
                            String gid = groupMetadataKey.key();
                            GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue(gid, record.value());
                            if (gm != null) {
                                removedGroups.remove(gid);
                                loadedGroups.put(gid, gm);
                                continue;
                            }
                            loadedGroups.remove(gid);
                            removedGroups.add(gid);
                            continue;
                        }
                        resultFuture.completeExceptionally(new IllegalStateException("Unexpected message key " + bk + " while loading offsets and group metadata"));
                        return;
                    }
                }
            });
            this.loadNextMetadataMessage(metadataConsumer, endMessageId, resultFuture, onGroupLoaded, loadedOffsets, pendingOffsets, loadedGroups, removedGroups);
        };
        ((CompletableFuture)metadataConsumer.thenComposeAsync(Reader::readNextAsync)).whenCompleteAsync((message, cause) -> {
            try {
                readNextComplete.accept((Message)message, (Throwable)cause);
            }
            catch (Throwable completeCause) {
                log.error("Unknown exception caught when processing the received metadata message from topic {}", (Object)((Reader)metadataConsumer.join()).getTopic(), (Object)completeCause);
                resultFuture.completeExceptionally(completeCause);
            }
        }, (Executor)this.scheduler);
    }

    private void processLoadedAndRemovedGroups(CompletableFuture<Void> resultFuture, Consumer<GroupMetadata> onGroupLoaded, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> loadedOffsets, Map<Long, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> pendingOffsets, Map<String, GroupMetadata> loadedGroups, Set<String> removedGroups) {
        if (log.isTraceEnabled()) {
            log.trace("Completing loading : {} loaded groups, {} removed groups, {} loaded offsets, {} pending offsets", new Object[]{loadedGroups.size(), removedGroups.size(), loadedOffsets.size(), pendingOffsets.size()});
        }
        try {
            Map<String, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> groupLoadedOffsets = loadedOffsets.entrySet().stream().collect(Collectors.groupingBy(e -> ((GroupTopicPartition)e.getKey()).group(), Collectors.toMap(f -> ((GroupTopicPartition)f.getKey()).topicPartition(), Map.Entry::getValue)));
            Map<Boolean, Map<String, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>>> partitionedLoadedOffsets = CoreUtils.partition(groupLoadedOffsets, loadedGroups::containsKey);
            Map<String, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> groupOffsets = partitionedLoadedOffsets.get(true);
            Map<String, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> emptyGroupOffsets = partitionedLoadedOffsets.get(false);
            HashMap pendingOffsetsByGroup = new HashMap();
            pendingOffsets.forEach((producerId, producerOffsets) -> {
                producerOffsets.keySet().stream().map(GroupTopicPartition::group).forEach(group -> this.addProducerGroup((long)producerId, (String)group));
                producerOffsets.entrySet().stream().collect(Collectors.groupingBy(e -> ((GroupTopicPartition)e.getKey()).group, Collectors.toMap(f -> ((GroupTopicPartition)f.getKey()).topicPartition(), Map.Entry::getValue))).forEach((group, offsets) -> {
                    Map groupPendingOffsets = pendingOffsetsByGroup.computeIfAbsent(group, g -> new HashMap());
                    Map groupProducerOffsets = groupPendingOffsets.computeIfAbsent(producerId, p -> new HashMap());
                    groupProducerOffsets.putAll(offsets);
                });
            });
            Map partitionedPendingOffsetsByGroup = CoreUtils.partition(pendingOffsetsByGroup, loadedGroups::containsKey);
            Map pendingGroupOffsets = partitionedPendingOffsetsByGroup.get(true);
            Map pendingEmptyGroupOffsets = partitionedPendingOffsetsByGroup.get(false);
            loadedGroups.values().forEach(group -> {
                Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> offsets = groupOffsets.getOrDefault(group.groupId(), Collections.emptyMap());
                Map<Long, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> pOffsets = pendingGroupOffsets.getOrDefault(group.groupId(), Collections.emptyMap());
                if (log.isDebugEnabled()) {
                    log.debug("Loaded group metadata {} with offsets {} and pending offsets {}", new Object[]{group, offsets, pOffsets});
                }
                this.loadGroup((GroupMetadata)group, offsets, pOffsets);
                onGroupLoaded.accept((GroupMetadata)group);
            });
            Sets.union(emptyGroupOffsets.keySet(), pendingEmptyGroupOffsets.keySet()).forEach(groupId -> {
                GroupMetadata group = new GroupMetadata((String)groupId, GroupState.Empty);
                Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> offsets = emptyGroupOffsets.getOrDefault(groupId, Collections.emptyMap());
                Map<Long, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> pOffsets = pendingEmptyGroupOffsets.getOrDefault(groupId, Collections.emptyMap());
                if (log.isDebugEnabled()) {
                    log.debug("Loaded group metadata {} with offsets {} and pending offsets {}", new Object[]{group, offsets, pOffsets});
                }
                this.loadGroup(group, offsets, pOffsets);
                onGroupLoaded.accept(group);
            });
            removedGroups.forEach(groupId -> {
                if (this.groupMetadataCache.containsKey(groupId) && !emptyGroupOffsets.containsKey(groupId)) {
                    throw new IllegalStateException("Unexpected unload of active group " + groupId + " while loading partition");
                }
            });
            resultFuture.complete(null);
        }
        catch (RuntimeException re) {
            resultFuture.completeExceptionally(re);
        }
    }

    private void loadGroup(GroupMetadata group, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> offsets, Map<Long, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> pendingTransactionalOffsets) {
        Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> loadedOffsets = CoreUtils.mapValue(offsets, commitRecordMetadataAndOffset -> {
            OffsetAndMetadata updatedOffsetAndMetadata;
            OffsetAndMetadata offsetAndMetadata = commitRecordMetadataAndOffset.offsetAndMetadata();
            if (offsetAndMetadata.expireTimestamp() == -1L) {
                long expireTimestamp = offsetAndMetadata.commitTimestamp() + this.offsetConfig.offsetsRetentionMs();
                updatedOffsetAndMetadata = OffsetAndMetadata.apply(offsetAndMetadata.offset(), offsetAndMetadata.metadata(), offsetAndMetadata.commitTimestamp(), expireTimestamp);
            } else {
                updatedOffsetAndMetadata = offsetAndMetadata;
            }
            return new GroupMetadata.CommitRecordMetadataAndOffset(commitRecordMetadataAndOffset.appendedPosition(), updatedOffsetAndMetadata);
        });
        if (log.isTraceEnabled()) {
            log.trace("Initialized offsets {} from group {}", loadedOffsets, (Object)group.groupId());
        }
        group.initializeOffsets(loadedOffsets, pendingTransactionalOffsets);
        GroupMetadata currentGroup = this.addGroup(group);
        if (group != currentGroup) {
            log.debug("Attempt to load group {} from log with generation {} failed because there is already a cached group with generation {}", new Object[]{group.groupId(), group.generationId(), currentGroup.generationId()});
        }
    }

    public void removeGroupsForPartition(int offsetsPartition, Consumer<GroupMetadata> onGroupUnloaded) {
        TopicPartition topicPartition = new TopicPartition("__consumer_offsets", offsetsPartition);
        log.info("Scheduling unloading of offsets and group metadata from {}", (Object)topicPartition);
        this.scheduler.submit(() -> this.removeGroupsAndOffsets(offsetsPartition, onGroupUnloaded));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void removeGroupsAndOffsets(int partition, Consumer<GroupMetadata> onGroupUnloaded) {
        int numOffsetsRemoved = 0;
        int numGroupsRemoved = 0;
        if (log.isDebugEnabled()) {
            log.debug("Started unloading offsets and group metadata for {}", (Object)this.getTopicPartitionName(partition));
        }
        this.partitionLock.lock();
        try {
            this.ownedPartitions.remove(partition);
            this.loadingPartitions.remove(partition);
            for (GroupMetadata group : this.groupMetadataCache.values()) {
                if (this.partitionFor(group.groupId()) != partition) continue;
                onGroupUnloaded.accept(group);
                this.groupMetadataCache.remove(group.groupId(), group);
                this.removeGroupFromAllProducers(group.groupId());
                ++numGroupsRemoved;
                numOffsetsRemoved += group.numOffsets();
            }
        }
        finally {
            this.partitionLock.unlock();
        }
        this.removeProducerAndReaderFromCache(partition);
        log.info("Finished unloading {}. Removed {} cached offsets and {} cached groups.", new Object[]{this.getTopicPartitionName(partition), numOffsetsRemoved, numGroupsRemoved});
    }

    CompletableFuture<Void> cleanupGroupMetadata() {
        long startMs = this.time.milliseconds();
        return this.cleanGroupMetadata(this.groupMetadataCache.values().stream(), group -> group.removeExpiredOffsets(this.time.milliseconds())).thenAcceptAsync(offsetsRemoved -> log.info("Removed {} expired offsets in {} milliseconds.", offsetsRemoved, (Object)(this.time.milliseconds() - startMs)), (Executor)this.scheduler);
    }

    CompletableFuture<Integer> cleanGroupMetadata(Stream<GroupMetadata> groups, Function<GroupMetadata, Map<TopicPartition, OffsetAndMetadata>> selector) {
        List cleanFutures = groups.map(group -> {
            String groupId = group.groupId();
            Triple result = (Triple)group.inLock(() -> {
                Map removedOffsets = Collections.synchronizedMap((Map)selector.apply((GroupMetadata)group));
                if (group.is(GroupState.Empty) && !group.hasOffsets()) {
                    log.info("Group {} transitioned to Dead in generation {}", (Object)groupId, (Object)group.generationId());
                    group.transitionTo(GroupState.Dead);
                }
                return Triple.of(removedOffsets, (Object)group.is(GroupState.Dead), (Object)group.generationId());
            });
            Map removedOffsets = (Map)result.getLeft();
            boolean groupIsDead = (Boolean)result.getMiddle();
            int generation = (Integer)result.getRight();
            TimestampType timestampType = TimestampType.CREATE_TIME;
            long timestamp = this.time.milliseconds();
            ArrayList<SimpleRecord> tombstones = new ArrayList<SimpleRecord>();
            removedOffsets.forEach((topicPartition, offsetAndMetadata) -> {
                byte[] commitKey = GroupMetadataConstants.offsetCommitKey(groupId, topicPartition, this.namespacePrefix);
                tombstones.add(new SimpleRecord(timestamp, commitKey, null));
            });
            if (groupIsDead && this.groupMetadataCache.remove(groupId, group) && generation > 0) {
                byte[] groupMetadataKey = GroupMetadataConstants.groupMetadataKey(group.groupId());
                tombstones.add(new SimpleRecord(timestamp, groupMetadataKey, null));
            }
            if (!tombstones.isEmpty()) {
                MemoryRecords records = MemoryRecords.withRecords((byte)2, (long)0L, (CompressionType)this.compressionType, (TimestampType)timestampType, (SimpleRecord[])tombstones.toArray(new SimpleRecord[0]));
                byte[] groupKey = GroupMetadataConstants.groupMetadataKey(group.groupId());
                return ((CompletableFuture)((CompletableFuture)this.getOffsetsTopicProducer(group.groupId()).thenComposeAsync(f -> f.newMessage().keyBytes(groupKey).value((Object)records.buffer()).eventTime(timestamp).sendAsync(), (Executor)this.scheduler)).thenApplyAsync(ignored -> removedOffsets.size(), (Executor)this.scheduler)).exceptionally(cause -> {
                    log.error("Failed to append {} tombstones to topic {} for expired/deleted offsets and/or metadata for group {}", new Object[]{tombstones.size(), this.offsetConfig.offsetsTopicName() + "-" + this.partitionFor(group.groupId()), group.groupId(), cause});
                    return 0;
                });
            }
            return CompletableFuture.completedFuture(0);
        }).collect(Collectors.toList());
        return FutureUtils.collect(cleanFutures).thenApplyAsync(removedList -> removedList.stream().mapToInt(Integer::intValue).sum(), (Executor)this.scheduler);
    }

    public CompletableFuture<Void> scheduleHandleTxnCompletion(long producerId, Set<Integer> completedPartitions, boolean isCommit) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.scheduler.submit(() -> this.handleTxnCompletion(producerId, completedPartitions, isCommit, completableFuture));
        return completableFuture;
    }

    protected void handleTxnCompletion(long producerId, Set<Integer> completedPartitions, boolean isCommit, CompletableFuture<Void> completableFuture) {
        ArrayList groupFutureList = new ArrayList();
        Set<String> pendingGroups = this.groupsBelongingToPartitions(producerId, completedPartitions);
        pendingGroups.forEach(groupId -> {
            CompletableFuture groupFuture = new CompletableFuture();
            groupFutureList.add(groupFuture);
            this.getGroup((String)groupId).map(group -> group.inLock(() -> {
                if (!group.is(GroupState.Dead)) {
                    group.completePendingTxnOffsetCommit(producerId, isCommit);
                    this.removeProducerGroup(producerId, (String)groupId);
                }
                groupFuture.complete(null);
                return null;
            })).orElseGet(() -> {
                log.info("Group {} has moved away from this coordinator after transaction marker was written but before the cache was updated. The cache on the new group owner will be updated instead.", groupId);
                groupFuture.complete(null);
                return null;
            });
        });
        FutureUtil.waitForAll(groupFutureList).whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                log.error("Failed to handle txn completion.");
                completableFuture.completeExceptionally((Throwable)throwable);
            } else {
                completableFuture.complete(null);
            }
        });
    }

    void addPartitionOwnership(int partition) {
        CoreUtils.inLock(this.partitionLock, () -> {
            this.ownedPartitions.add(partition);
            return null;
        });
    }

    boolean addLoadingPartition(int partition) {
        return CoreUtils.inLock(this.partitionLock, () -> {
            if (this.ownedPartitions.contains(partition)) {
                return false;
            }
            return this.loadingPartitions.add(partition);
        });
    }

    CompletableFuture<Producer<ByteBuffer>> getOffsetsTopicProducer(String groupId) {
        return this.getOffsetsTopicProducer(this.partitionFor(groupId));
    }

    CompletableFuture<Producer<ByteBuffer>> getOffsetsTopicProducer(int partitionId) {
        return this.offsetsProducers.computeIfAbsent(partitionId, id -> {
            String partitionName = this.getTopicPartitionName(partitionId);
            if (log.isDebugEnabled()) {
                log.debug("Will create Partitioned producer: {}", (Object)partitionName);
            }
            return this.metadataTopicProducerBuilder.clone().topic(partitionName).createAsync();
        });
    }

    CompletableFuture<Reader<ByteBuffer>> getOffsetsTopicReader(int partitionId) {
        return this.offsetsReaders.computeIfAbsent(partitionId, id -> {
            String partitionName = this.getTopicPartitionName(partitionId);
            if (log.isDebugEnabled()) {
                log.debug("Will create Partitioned reader: {}", (Object)partitionName);
            }
            return this.metadataTopicReaderBuilder.clone().topic(partitionName).readCompacted(true).createAsync();
        });
    }

    private void removeProducerAndReaderFromCache(int partition) {
        String partitionName = this.getTopicPartitionName(partition);
        Optional.ofNullable((CompletableFuture)this.offsetsProducers.remove(partition)).ifPresent(producerFuture -> ((CompletableFuture)producerFuture.thenApplyAsync(Producer::closeAsync)).whenCompleteAsync((__, e) -> {
            if (e != null) {
                log.error("Failed to close producer for {}", (Object)partitionName);
            } else if (log.isDebugEnabled()) {
                log.debug("Closed offset producer for {}", (Object)partitionName);
            }
        }, (Executor)this.scheduler));
        Optional.ofNullable((CompletableFuture)this.offsetsReaders.remove(partition)).ifPresent(readerFuture -> ((CompletableFuture)readerFuture.thenApplyAsync(Reader::closeAsync)).whenCompleteAsync((__, e) -> {
            if (e != null) {
                log.error("Failed to close reader for {}", (Object)partitionName);
            } else if (log.isDebugEnabled()) {
                log.debug("Closed offset reader for {}", (Object)partitionName);
            }
        }, (Executor)this.scheduler));
    }

    public OffsetConfig getOffsetConfig() {
        return this.offsetConfig;
    }

    static class GroupTopicPartition {
        private final String group;
        private final TopicPartition topicPartition;

        GroupTopicPartition(String group, String topic, int partition) {
            this.group = group;
            this.topicPartition = new TopicPartition(topic, partition);
        }

        public String toString() {
            return String.format("[%s, %s, %d]", this.group, this.topicPartition.topic(), this.topicPartition.partition());
        }

        public String group() {
            return this.group;
        }

        public TopicPartition topicPartition() {
            return this.topicPartition;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof GroupTopicPartition)) {
                return false;
            }
            GroupTopicPartition other = (GroupTopicPartition)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$group = this.group();
            String other$group = other.group();
            if (this$group == null ? other$group != null : !this$group.equals(other$group)) {
                return false;
            }
            TopicPartition this$topicPartition = this.topicPartition();
            TopicPartition other$topicPartition = other.topicPartition();
            return !(this$topicPartition == null ? other$topicPartition != null : !this$topicPartition.equals(other$topicPartition));
        }

        protected boolean canEqual(Object other) {
            return other instanceof GroupTopicPartition;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $group = this.group();
            result = result * 59 + ($group == null ? 43 : $group.hashCode());
            TopicPartition $topicPartition = this.topicPartition();
            result = result * 59 + ($topicPartition == null ? 43 : $topicPartition.hashCode());
            return result;
        }
    }

    public static interface BaseKey {
        public short version();

        public Object key();
    }

    public static class OffsetKey
    implements BaseKey {
        private final short version;
        private final GroupTopicPartition key;

        public String toString() {
            return this.key.toString();
        }

        public OffsetKey(short version, GroupTopicPartition key) {
            this.version = version;
            this.key = key;
        }

        @Override
        public short version() {
            return this.version;
        }

        @Override
        public GroupTopicPartition key() {
            return this.key;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof OffsetKey)) {
                return false;
            }
            OffsetKey other = (OffsetKey)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.version() != other.version()) {
                return false;
            }
            GroupTopicPartition this$key = this.key();
            GroupTopicPartition other$key = other.key();
            return !(this$key == null ? other$key != null : !((Object)this$key).equals(other$key));
        }

        protected boolean canEqual(Object other) {
            return other instanceof OffsetKey;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.version();
            GroupTopicPartition $key = this.key();
            result = result * 59 + ($key == null ? 43 : ((Object)$key).hashCode());
            return result;
        }
    }

    static class GroupMetadataKey
    implements BaseKey {
        private final short version;
        private final String key;

        public String toString() {
            return this.key;
        }

        public GroupMetadataKey(short version, String key) {
            this.version = version;
            this.key = key;
        }

        @Override
        public short version() {
            return this.version;
        }

        @Override
        public String key() {
            return this.key;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof GroupMetadataKey)) {
                return false;
            }
            GroupMetadataKey other = (GroupMetadataKey)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.version() != other.version()) {
                return false;
            }
            String this$key = this.key();
            String other$key = other.key();
            return !(this$key == null ? other$key != null : !this$key.equals(other$key));
        }

        protected boolean canEqual(Object other) {
            return other instanceof GroupMetadataKey;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.version();
            String $key = this.key();
            result = result * 59 + ($key == null ? 43 : $key.hashCode());
            return result;
        }
    }
}

