/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.proxy;

import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import io.streamnative.pulsar.handlers.kop.EndPoint;
import io.streamnative.pulsar.handlers.kop.KafkaRequestHandler;
import io.streamnative.pulsar.handlers.kop.proxy.BrokerConnectionGroup;
import io.streamnative.pulsar.handlers.kop.proxy.ConnectionFactory;
import io.streamnative.pulsar.handlers.kop.proxy.ConnectionToBroker;
import io.streamnative.pulsar.handlers.kop.proxy.InflightRequest;
import io.streamnative.pulsar.handlers.kop.proxy.KafkaProxyExtension;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.DeleteRecordsRequestData;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DescribeClusterResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.KopResponseUtils;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProxyRequestHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(KafkaProxyRequestHandler.class);
    private final LinkedBlockingQueue<InflightRequest> requestQueue = new LinkedBlockingQueue(500);
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final Node selfNode;
    private final List<Integer> replicaIds;
    private final BrokerConnectionGroup connectionGroup;
    @VisibleForTesting
    final Cache<TopicPartition, InetSocketAddress> leaderCache;
    private ChannelHandlerContext ctx;

    public KafkaProxyRequestHandler(EndPoint advertisedEndPoint, ConnectionFactory connectionFactory, Cache<TopicPartition, InetSocketAddress> leaderCache) {
        this.selfNode = new Node(Murmur3_32Hash.getInstance().makeHash((advertisedEndPoint.getHostname() + advertisedEndPoint.getPort()).getBytes(StandardCharsets.UTF_8)), advertisedEndPoint.getHostname(), advertisedEndPoint.getPort());
        this.replicaIds = Collections.singletonList(this.selfNode.id());
        this.connectionGroup = new BrokerConnectionGroup(connectionFactory);
        this.leaderCache = leaderCache;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        KafkaProxyExtension.BYTES_COUNTER.inc((double)buf.readableBytes());
        try {
            Channel channel = ctx.channel();
            InflightRequest inflightRequest = new InflightRequest(buf, channel.remoteAddress());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received kafka cmd {}", (Object)channel, (Object)inflightRequest);
            }
            ApiKeys apiKeys = inflightRequest.getHeader().apiKey();
            inflightRequest.registerCallback(() -> this.flush(channel), (Executor)ctx.executor());
            KafkaProxyExtension.OPS_COUNTER.inc();
            if (!ApiKeys.PRODUCE.equals((Object)apiKeys) || ((ProduceRequest)inflightRequest.getRequest()).acks() != 0) {
                this.requestQueue.put(inflightRequest);
            }
            switch (apiKeys) {
                case API_VERSIONS: {
                    this.handleApiVersions(inflightRequest);
                    return;
                }
                case METADATA: {
                    this.handleMetadata(inflightRequest);
                    return;
                }
                case PRODUCE: {
                    this.handleProduce(inflightRequest);
                    return;
                }
                case FIND_COORDINATOR: {
                    this.handleFindCoordinator(inflightRequest);
                    return;
                }
                case JOIN_GROUP: 
                case SYNC_GROUP: 
                case LEAVE_GROUP: 
                case OFFSET_FETCH: 
                case OFFSET_COMMIT: 
                case HEARTBEAT: 
                case OFFSET_DELETE: 
                case TXN_OFFSET_COMMIT: {
                    this.handleGroupRequest(apiKeys, inflightRequest);
                    return;
                }
                case LIST_OFFSETS: {
                    this.handleListOffsets(inflightRequest);
                    return;
                }
                case FETCH: {
                    this.handleFetch(inflightRequest);
                    return;
                }
                case SASL_HANDSHAKE: 
                case SASL_AUTHENTICATE: {
                    this.connectionGroup.authenticate(inflightRequest);
                    return;
                }
                case CREATE_TOPICS: 
                case DELETE_TOPICS: 
                case DESCRIBE_CONFIGS: 
                case ALTER_CONFIGS: 
                case LIST_GROUPS: 
                case DELETE_GROUPS: 
                case DESCRIBE_GROUPS: {
                    this.connectionGroup.getMetadataBroker().forwardRequest(inflightRequest);
                    return;
                }
                case DESCRIBE_CLUSTER: {
                    this.handleDescribeCluster(inflightRequest);
                    return;
                }
                case DELETE_RECORDS: {
                    this.handleDeleteRecords(inflightRequest);
                    return;
                }
                case INIT_PRODUCER_ID: 
                case ADD_PARTITIONS_TO_TXN: 
                case ADD_OFFSETS_TO_TXN: 
                case END_TXN: {
                    this.handleTxnRequest(apiKeys, inflightRequest);
                    return;
                }
                case WRITE_TXN_MARKERS: {
                    throw new IllegalStateException(apiKeys + " should be handled in broker");
                }
                default: {
                    inflightRequest.complete(inflightRequest.getRequest().getErrorResponse((Throwable)new UnsupportedVersionException("API " + apiKeys + " is not supported")));
                    return;
                }
            }
        }
        catch (IOException e) {
            log.warn("{}", (Object)e.getMessage());
            this.close(ctx);
            return;
        }
        catch (Throwable throwable) {
            log.error("[{}] Unexpected exception when handling request", (Object)ctx.channel(), (Object)throwable);
            this.close(ctx);
            return;
        }
        finally {
            ReferenceCountUtil.safeRelease((Object)buf);
        }
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.ctx = ctx;
        this.connectionGroup.setClientChannel(ctx);
        this.isActive.set(true);
        KafkaProxyExtension.ACTIVE_CONNECTIONS.inc();
        KafkaProxyExtension.NEW_CONNECTIONS.inc();
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("close channel {}", (Object)ctx.channel());
        KafkaProxyExtension.ACTIVE_CONNECTIONS.dec();
        this.connectionGroup.close();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[{}] Unexpected exception", (Object)ctx.channel(), (Object)cause);
        this.close(ctx);
    }

    private void close(ChannelHandlerContext ctx) {
        if (this.isActive.compareAndSet(true, false)) {
            ctx.close();
            if (!this.requestQueue.isEmpty()) {
                log.info("[{}] Close with {} pending requests", (Object)ctx, (Object)this.requestQueue.size());
            }
            this.requestQueue.clear();
            this.connectionGroup.close();
        }
    }

    private void flush(Channel channel) {
        InflightRequest inflightRequest;
        while (this.isActive.get() && (inflightRequest = this.requestQueue.peek()) != null && inflightRequest.hasReceivedResponse()) {
            if (!this.requestQueue.remove(inflightRequest)) continue;
            if (inflightRequest.hasFailed(e -> {
                if (e instanceof ConnectionToBroker.ConnectError) {
                    log.warn("[{}] {} failed with {}", new Object[]{channel, inflightRequest.getHeader(), e.getMessage()});
                } else {
                    log.error("[{}] request {} completed exceptionally", new Object[]{channel, inflightRequest.getHeader(), e});
                }
                this.close(this.ctx);
            })) {
                return;
            }
            ByteBuf buf = inflightRequest.toResponseBuf();
            if (log.isDebugEnabled()) {
                log.debug("[{}] Write kafka cmd to client ({} requests left): {}", new Object[]{channel, this.requestQueue.size(), inflightRequest.getHeader()});
            }
            channel.writeAndFlush((Object)buf).addListener(future -> {
                if (!future.isSuccess()) {
                    log.error("[{}] Failed to write {}", new Object[]{channel, inflightRequest.getHeader(), future.cause()});
                }
            });
        }
    }

    private void handleApiVersions(InflightRequest inflightRequest) {
        short version = inflightRequest.getHeader().apiVersion();
        inflightRequest.complete(KafkaRequestHandler.overloadDefaultApiVersionsResponse((!ApiKeys.API_VERSIONS.isVersionSupported(version) ? 1 : 0) != 0));
    }

    private void handleMetadata(InflightRequest inflightRequest) throws IOException {
        inflightRequest.setResponseMapper(originalResponse -> {
            MetadataResponse metadataResponse = (MetadataResponse)originalResponse;
            MetadataResponseData data = metadataResponse.data();
            MetadataResponseData.MetadataResponseBrokerCollection brokers = data.brokers();
            if (log.isDebugEnabled()) {
                HashMap leaderMap = new HashMap();
                data.topics().forEach(topic -> topic.partitions().forEach(partition -> {
                    TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
                    MetadataResponseData.MetadataResponseBroker broker = brokers.find(partition.leaderId());
                    if (broker != null) {
                        leaderMap.put(topicPartition, broker.host() + ":" + broker.port());
                    } else {
                        leaderMap.put(topicPartition, Errors.forCode((short)partition.errorCode()).message());
                    }
                }));
                log.debug("[{}] MetadataResponse: {}", (Object)inflightRequest.getHeader(), leaderMap);
            }
            data.topics().forEach(topic -> {
                String topicName = topic.name();
                topic.partitions().forEach(partition -> {
                    MetadataResponseData.MetadataResponseBroker broker = brokers.find(partition.leaderId());
                    if (broker != null) {
                        this.leaderCache.put((Object)new TopicPartition(topicName, partition.partitionIndex()), (Object)InetSocketAddress.createUnresolved(broker.host(), broker.port()));
                    }
                    partition.setLeaderId(this.selfNode.id());
                    partition.setReplicaNodes(this.replicaIds);
                    partition.setIsrNodes(this.replicaIds);
                });
            });
            data.setControllerId(this.selfNode.id());
            brokers.clear();
            brokers.add((ImplicitLinkedHashCollection.Element)new MetadataResponseData.MetadataResponseBroker().setNodeId(this.selfNode.id()).setHost(this.selfNode.host()).setPort(this.selfNode.port()));
            return metadataResponse;
        });
        this.connectionGroup.getMetadataBroker().forwardRequest(inflightRequest);
    }

    @VisibleForTesting
    void handleProduce(InflightRequest inflightRequest) throws IOException {
        boolean cacheRequest;
        ProduceRequest request = (ProduceRequest)inflightRequest.getRequest();
        HashMap<TopicPartition, Errors> errorsMap = new HashMap<TopicPartition, Errors>();
        HashMap<InetSocketAddress, Map> partitionDataMap = new HashMap<InetSocketAddress, Map>();
        for (ProduceRequestData.TopicProduceData topicData : request.data().topicData()) {
            String topic = topicData.name();
            for (ProduceRequestData.PartitionProduceData partitionData : topicData.partitionData()) {
                TopicPartition topicPartition = new TopicPartition(topic, partitionData.index());
                InetSocketAddress leader2 = (InetSocketAddress)this.leaderCache.getIfPresent((Object)topicPartition);
                if (leader2 == null) {
                    errorsMap.put(topicPartition, Errors.NOT_LEADER_OR_FOLLOWER);
                    continue;
                }
                partitionDataMap.computeIfAbsent(leader2, __ -> new HashMap()).computeIfAbsent(topic, __ -> new ProduceRequestData.TopicProduceData().setName(topic)).partitionData().add(partitionData);
            }
        }
        if (partitionDataMap.isEmpty()) {
            log.warn("No leader found for {}", (Object)inflightRequest.getHeader());
            inflightRequest.complete(KafkaProxyRequestHandler.createProduceResponse(errorsMap));
            return;
        }
        Function<InetSocketAddress, Optional> getLeader = address -> {
            try {
                return Optional.of(this.connectionGroup.getLeader((InetSocketAddress)address));
            }
            catch (IOException e) {
                log.warn("[{}] Failed to connect to leader {}: {}", new Object[]{this.ctx, address, e.getMessage()});
                Optional.ofNullable((Map)partitionDataMap.get(address)).ifPresent(map -> map.forEach((topic, data) -> data.partitionData().stream().map(__ -> new TopicPartition(topic, __.index())).forEach(topicPartition -> {
                    this.leaderCache.invalidate(topicPartition);
                    errorsMap.put((TopicPartition)topicPartition, Errors.NOT_LEADER_OR_FOLLOWER);
                })));
                return Optional.empty();
            }
        };
        boolean bl = cacheRequest = request.acks() != 0;
        if (errorsMap.isEmpty() && partitionDataMap.size() == 1) {
            inflightRequest.setResponseMapper(originalResponse -> {
                if (errorsMap.isEmpty()) {
                    return originalResponse;
                }
                if (originalResponse == null) {
                    return KafkaProxyRequestHandler.createProduceResponse(errorsMap);
                }
                ProduceResponse produceResponse = (ProduceResponse)originalResponse;
                return KafkaProxyRequestHandler.createProduceResponse(errorsMap, produceResponse.data());
            });
            getLeader.apply((InetSocketAddress)partitionDataMap.keySet().iterator().next()).ifPresentOrElse(leader -> {
                inflightRequest.setSkipParsingResponse(errorsMap.isEmpty());
                leader.forwardRequest(inflightRequest, cacheRequest);
            }, () -> inflightRequest.complete(null));
        } else {
            ArrayList responseFutures = new ArrayList();
            partitionDataMap.forEach((address, topicDataMap) -> ((Optional)getLeader.apply((InetSocketAddress)address)).ifPresent(connection -> {
                ProduceRequest singleRequest = new ProduceRequest(new ProduceRequestData().setAcks(request.acks()).setTimeoutMs(request.timeout()).setTransactionalId(request.transactionalId()).setTopicData(new ProduceRequestData.TopicProduceDataCollection(topicDataMap.values().iterator())), request.version());
                ByteBuf buf = KopResponseUtils.serializeRequestToPooledBuffer((RequestHeader)inflightRequest.getHeader(), (AbstractRequest)singleRequest);
                InflightRequest singleInflightRequest = new InflightRequest(buf, inflightRequest.getRemoteAddress(), false);
                responseFutures.add(singleInflightRequest.getResponseFuture());
                connection.forwardRequest(singleInflightRequest);
            }));
            ((CompletableFuture)FutureUtil.waitForAll(responseFutures).thenAccept(__ -> {
                Map map = CoreUtils.mapValue((Map)errorsMap, ProduceResponse.PartitionResponse::new);
                responseFutures.stream().map(CompletableFuture::join).forEach(singleResponse -> singleResponse.data().responses().forEach(topicProduceResponse -> {
                    String topic = topicProduceResponse.name();
                    topicProduceResponse.partitionResponses().forEach(r -> {
                        TopicPartition topicPartition = new TopicPartition(topic, r.index());
                        map.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.forCode((short)r.errorCode()), r.baseOffset(), r.logAppendTimeMs(), r.logStartOffset(), r.recordErrors().stream().map(e -> new ProduceResponse.RecordError(e.batchIndex(), e.batchIndexErrorMessage())).toList(), r.errorMessage()));
                    });
                }));
                if (log.isDebugEnabled()) {
                    log.debug("[{}] ProduceResponse: {}", (Object)inflightRequest.getHeader(), (Object)CoreUtils.mapValue((Map)map, r -> r.error));
                }
                inflightRequest.complete(new ProduceResponse(map));
            })).exceptionally(e -> {
                log.error("[{}] Failed to wait for the produce responses", (Object)this.ctx, e);
                this.close(this.ctx);
                return null;
            });
        }
    }

    private static ProduceResponse createProduceResponse(Map<TopicPartition, Errors> errorsMap) {
        return KafkaProxyRequestHandler.createProduceResponse(errorsMap, new ProduceResponseData());
    }

    private static ProduceResponse createProduceResponse(Map<TopicPartition, Errors> errorsMap, ProduceResponseData responseData) {
        errorsMap.forEach((topicPartition, errors) -> {
            String topic = topicPartition.topic();
            ProduceResponseData.TopicProduceResponse topicProduceResponse = responseData.responses().find(topic);
            if (topicProduceResponse == null) {
                topicProduceResponse = new ProduceResponseData.TopicProduceResponse().setName(topic);
                responseData.responses().add((ImplicitLinkedHashCollection.Element)topicProduceResponse);
            }
            topicProduceResponse.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse().setErrorCode(errors.code()).setIndex(topicPartition.partition()));
        });
        return new ProduceResponse(responseData);
    }

    private void handleFindCoordinator(InflightRequest inflightRequest) throws IOException {
        FindCoordinatorRequest request = (FindCoordinatorRequest)inflightRequest.getRequest();
        inflightRequest.setResponseMapper(__ -> {
            FindCoordinatorResponseData data = new FindCoordinatorResponseData();
            if (request.version() < 4) {
                data.setErrorCode(Errors.NONE.code()).setErrorMessage(Errors.NONE.message()).setHost(this.selfNode.host()).setPort(this.selfNode.port()).setNodeId(this.selfNode.id());
            } else {
                List coordinatorKeys = request.data().coordinatorKeys();
                data.setCoordinators(coordinatorKeys.stream().map(key -> new FindCoordinatorResponseData.Coordinator().setKey(key).setErrorCode(Errors.NONE.code()).setErrorMessage(Errors.NONE.message()).setHost(this.selfNode.host()).setPort(this.selfNode.port()).setNodeId(this.selfNode.id())).toList());
            }
            return new FindCoordinatorResponse(data);
        });
        this.connectionGroup.getMetadataBroker().forwardRequest(inflightRequest);
    }

    private void handleListOffsets(InflightRequest inflightRequest) throws IOException {
        if (inflightRequest.getHeader().apiVersion() == 0) {
            throw new RuntimeException("KoP proxy does not support ListOffset v0 yet");
        }
        ListOffsetsRequest request = (ListOffsetsRequest)inflightRequest.getRequest();
        if (request.data().topics().size() == 0) {
            inflightRequest.complete(new ListOffsetsResponse(new ListOffsetsResponseData()));
            return;
        }
        this.handleListOffsetsV1OrAbove(inflightRequest, request);
    }

    private void handleListOffsetsV1OrAbove(InflightRequest originalRequest, ListOffsetsRequest request) throws IOException {
        HashMap errorsMap = new HashMap();
        HashMap<InetSocketAddress, Map> leaderToOffsetData = new HashMap<InetSocketAddress, Map>();
        request.data().topics().forEach(topic -> topic.partitions().forEach(partitionData -> {
            TopicPartition topicPartition = new TopicPartition(topic.name(), partitionData.partitionIndex());
            InetSocketAddress leader = (InetSocketAddress)this.leaderCache.getIfPresent((Object)topicPartition);
            if (leader == null) {
                errorsMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION);
                return;
            }
            leaderToOffsetData.computeIfAbsent(leader, __ -> new HashMap()).computeIfAbsent(topic.name(), __ -> new ArrayList()).add(partitionData);
        }));
        if (leaderToOffsetData.size() == 1) {
            InetSocketAddress leader2 = (InetSocketAddress)leaderToOffsetData.keySet().iterator().next();
            originalRequest.setSkipParsingResponse(true);
            this.connectionGroup.getLeader(leader2).forwardRequest(originalRequest);
        } else {
            ArrayList responseFutures = new ArrayList();
            leaderToOffsetData.forEach((leader, offsetData) -> {
                try {
                    ConnectionToBroker connection = this.connectionGroup.getLeader((InetSocketAddress)leader);
                    List<ListOffsetsRequestData.ListOffsetsTopic> targetTimes = offsetData.entrySet().stream().map(e -> new ListOffsetsRequestData.ListOffsetsTopic().setName((String)e.getKey()).setPartitions((List)e.getValue())).toList();
                    ListOffsetsRequest singleRequest = ListOffsetsRequest.Builder.forConsumer((boolean)true, (IsolationLevel)request.isolationLevel(), (boolean)false).setTargetTimes(targetTimes).build(request.version());
                    ByteBuf buf = KopResponseUtils.serializeRequestToPooledBuffer((RequestHeader)originalRequest.getHeader(), (AbstractRequest)singleRequest);
                    InflightRequest singleInflightRequest = new InflightRequest(buf, originalRequest.getRemoteAddress(), false);
                    responseFutures.add(singleInflightRequest.getResponseFuture());
                    connection.forwardRequest(singleInflightRequest);
                }
                catch (IOException e2) {
                    log.warn("[{}] Failed to connect to leader {}: {}", new Object[]{this.ctx, leader, e2.getMessage()});
                    offsetData.forEach((topic, partitions) -> partitions.forEach(partition -> {
                        TopicPartition topicPartition = new TopicPartition(topic, partition.partitionIndex());
                        errorsMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION);
                        this.leaderCache.invalidate((Object)topicPartition);
                    }));
                }
            });
            FutureUtil.waitForAll(responseFutures).thenAccept(__ -> {
                HashMap topicMap = new HashMap();
                responseFutures.stream().map(CompletableFuture::join).forEach(response -> response.data().topics().forEach(topic -> {
                    List partitions = (List)topicMap.get(topic.name());
                    if (partitions == null) {
                        topicMap.put(topic.name(), topic.partitions());
                    } else {
                        partitions.addAll(topic.partitions());
                    }
                }));
                errorsMap.forEach((topicPartition, errors) -> topicMap.computeIfAbsent(topicPartition.topic(), topic -> new ArrayList()).add(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setErrorCode(errors.code())));
                List<ListOffsetsResponseData.ListOffsetsTopicResponse> data = topicMap.entrySet().stream().map(e -> new ListOffsetsResponseData.ListOffsetsTopicResponse().setName((String)e.getKey()).setPartitions((List)e.getValue())).toList();
                originalRequest.complete(new ListOffsetsResponse(new ListOffsetsResponseData().setTopics(data)));
            });
        }
    }

    private void handleFetch(InflightRequest inflightRequest) throws IOException {
        FetchRequest request = (FetchRequest)inflightRequest.getRequest();
        HashMap<TopicPartition, Errors> errorsMap = new HashMap<TopicPartition, Errors>();
        HashMap<InetSocketAddress, Map> fetchPartitionMap = new HashMap<InetSocketAddress, Map>();
        request.data().topics().forEach(fetchTopic -> {
            String topic = fetchTopic.topic();
            fetchTopic.partitions().forEach(fetchPartition -> {
                TopicPartition topicPartition = new TopicPartition(topic, fetchPartition.partition());
                InetSocketAddress leader = (InetSocketAddress)this.leaderCache.getIfPresent((Object)topicPartition);
                if (leader == null) {
                    errorsMap.put(topicPartition, Errors.NOT_LEADER_OR_FOLLOWER);
                    return;
                }
                fetchPartitionMap.computeIfAbsent(leader, __ -> new HashMap()).computeIfAbsent(topic, __ -> new FetchRequestData.FetchTopic().setTopicId(fetchTopic.topicId()).setTopic(topic)).partitions().add(fetchPartition);
            });
        });
        if (fetchPartitionMap.isEmpty()) {
            log.warn("No leader found for {}", (Object)inflightRequest.getRequest());
            inflightRequest.complete(KafkaProxyRequestHandler.createFetchResponse(errorsMap, new FetchResponseData()));
        }
        if (fetchPartitionMap.size() == 1) {
            InetSocketAddress leader2 = (InetSocketAddress)fetchPartitionMap.keySet().iterator().next();
            inflightRequest.setSkipParsingResponse(true);
            this.connectionGroup.getLeader(leader2).forwardRequest(inflightRequest);
        } else {
            ArrayList responseFutures = new ArrayList();
            fetchPartitionMap.forEach((leader, fetchTopics) -> {
                try {
                    ConnectionToBroker connection = this.connectionGroup.getLeader((InetSocketAddress)leader);
                    FetchRequest singleRequest = new FetchRequest(new FetchRequestData().setMaxWaitMs(request.maxWait()).setMaxBytes(request.maxBytes()).setMinBytes(request.minBytes()).setIsolationLevel(request.isolationLevel().id()).setSessionEpoch(request.metadata().epoch()).setSessionId(request.metadata().sessionId()).setReplicaId(request.replicaId()).setRackId(request.rackId()).setTopics(fetchTopics.values().stream().toList()), request.version());
                    InflightRequest singleInflightRequest = new InflightRequest(KopResponseUtils.serializeRequestToPooledBuffer((RequestHeader)inflightRequest.getHeader(), (AbstractRequest)singleRequest), inflightRequest.getRemoteAddress(), false);
                    singleInflightRequest.setSkipParsingResponse(true);
                    responseFutures.add(singleInflightRequest.getResponseFuture());
                    connection.forwardRequest(singleInflightRequest);
                }
                catch (IOException e) {
                    log.warn("[{}] Failed to connect to leader {}: {}", new Object[]{this.ctx, leader, e.getMessage()});
                    fetchTopics.values().forEach(fetchTopic -> fetchTopic.partitions().forEach(partition -> {
                        TopicPartition topicPartition = new TopicPartition(fetchTopic.topic(), partition.partition());
                        errorsMap.put(topicPartition, Errors.NOT_LEADER_OR_FOLLOWER);
                        this.leaderCache.invalidate((Object)topicPartition);
                    }));
                }
            });
            FutureUtil.waitForAll(responseFutures).thenAccept(__ -> {
                List<ByteBuf> buffersToRelease = responseFutures.stream().map(CompletableFuture::join).toList();
                HashMap map = new HashMap();
                buffersToRelease.forEach(buf -> {
                    FetchResponse fetchResponse = (FetchResponse)FetchResponse.parseResponse((ByteBuffer)buf.nioBuffer(), (RequestHeader)inflightRequest.getHeader());
                    fetchResponse.data().responses().forEach(topic -> {
                        FetchResponseData.FetchableTopicResponse topicResponse = (FetchResponseData.FetchableTopicResponse)map.get(topic.topic());
                        if (topicResponse == null) {
                            map.put(topic.topic(), new FetchResponseData.FetchableTopicResponse().setTopicId(topic.topicId()).setTopic(topic.topic()).setPartitions(topic.partitions()));
                            return;
                        }
                        topicResponse.partitions().addAll(topic.partitions());
                    });
                });
                FetchResponseData data = new FetchResponseData().setResponses(map.values().stream().toList()).setSessionId(request.metadata().sessionId());
                inflightRequest.complete(Pair.of((Object)KafkaProxyRequestHandler.createFetchResponse(errorsMap, data), buffersToRelease));
            });
        }
    }

    private static FetchResponse createFetchResponse(Map<TopicPartition, Errors> errorsMap, FetchResponseData responseData) {
        errorsMap.forEach((topicPartition, errors) -> {
            FetchResponseData.PartitionData partitionResponse;
            String topic = topicPartition.topic();
            FetchResponseData.FetchableTopicResponse topicResponse = responseData.responses().stream().filter(__ -> __.topic().equals(topic)).findFirst().orElse(null);
            if (topicResponse == null) {
                topicResponse = new FetchResponseData.FetchableTopicResponse().setTopic(topic);
                responseData.responses().add(topicResponse);
            }
            if ((partitionResponse = (FetchResponseData.PartitionData)topicResponse.partitions().stream().filter(__ -> __.partitionIndex() == topicPartition.partition()).findFirst().orElse(null)) == null) {
                partitionResponse = new FetchResponseData.PartitionData().setPartitionIndex(topicPartition.partition()).setErrorCode(errors.code());
                topicResponse.partitions().add(partitionResponse);
            }
        });
        return new FetchResponse(responseData);
    }

    private void handleDeleteRecords(InflightRequest inflightRequest) throws IOException {
        DeleteRecordsRequest request = (DeleteRecordsRequest)inflightRequest.getRequest();
        HashMap<InetSocketAddress, Map> deleteRecordsMap = new HashMap<InetSocketAddress, Map>();
        HashMap errorsMap = new HashMap();
        request.data().topics().forEach(topic -> topic.partitions().forEach(partition -> {
            TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
            InetSocketAddress leader = (InetSocketAddress)this.leaderCache.getIfPresent((Object)topicPartition);
            if (leader == null) {
                errorsMap.put(topicPartition, Errors.NOT_LEADER_OR_FOLLOWER);
                return;
            }
            deleteRecordsMap.computeIfAbsent(leader, __ -> new HashMap()).computeIfAbsent(topic.name(), __ -> new DeleteRecordsRequestData.DeleteRecordsTopic().setName(__)).partitions().add(partition);
        }));
        if (deleteRecordsMap.size() == 1) {
            inflightRequest.setResponseMapper(originalResponse -> {
                DeleteRecordsResponse response = (DeleteRecordsResponse)originalResponse;
                response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> errorsMap.put(new TopicPartition(topic.name(), partition.partitionIndex()), Errors.forCode((short)partition.errorCode()))));
                return KafkaResponseUtils.newDeleteRecords((Map)errorsMap);
            });
            this.connectionGroup.getLeader((InetSocketAddress)deleteRecordsMap.keySet().iterator().next()).forwardRequest(inflightRequest);
        } else {
            ArrayList responseFutures = new ArrayList();
            deleteRecordsMap.forEach((leader, topics) -> {
                try {
                    ConnectionToBroker connection = this.connectionGroup.getLeader((InetSocketAddress)leader);
                    DeleteRecordsRequest singleRequest = new DeleteRecordsRequest.Builder(new DeleteRecordsRequestData().setTimeoutMs(request.data().timeoutMs()).setTopics(topics.values().stream().toList())).build(request.version());
                    InflightRequest singleInflightRequest = new InflightRequest(KopResponseUtils.serializeRequestToPooledBuffer((RequestHeader)inflightRequest.getHeader(), (AbstractRequest)singleRequest), inflightRequest.getRemoteAddress());
                    CompletableFuture responseFuture = new CompletableFuture();
                    responseFutures.add(responseFuture);
                    singleInflightRequest.setResponseMapper(response -> {
                        responseFuture.complete((DeleteRecordsResponse)response);
                        return response;
                    });
                    connection.forwardRequest(singleInflightRequest);
                }
                catch (IOException e) {
                    log.warn("[{}] Failed to connect to leader {}: {}", new Object[]{this.ctx, leader, e.getMessage()});
                    topics.values().forEach(topic -> topic.partitions().forEach(partition -> {
                        TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
                        errorsMap.put(topicPartition, Errors.NOT_LEADER_OR_FOLLOWER);
                        this.leaderCache.invalidate((Object)topicPartition);
                    }));
                }
            });
            ((CompletableFuture)FutureUtil.waitForAll(responseFutures).thenAccept(__ -> {
                responseFutures.stream().map(CompletableFuture::join).forEach(singleResponse -> singleResponse.data().topics().forEach(topic -> topic.partitions().forEach(partition -> errorsMap.put(new TopicPartition(topic.name(), partition.partitionIndex()), Errors.forCode((short)partition.errorCode())))));
                inflightRequest.complete(KafkaResponseUtils.newDeleteRecords((Map)errorsMap));
            })).exceptionally(e -> {
                log.error("[{}] Failed to wait for the delete records responses", (Object)this.ctx, e);
                this.close(this.ctx);
                return null;
            });
        }
    }

    private void handleDescribeCluster(InflightRequest inflightRequest) throws IOException {
        inflightRequest.setResponseMapper(response -> {
            DescribeClusterResponse clusterResponse = (DescribeClusterResponse)response;
            clusterResponse.data().setControllerId(this.selfNode.id());
            clusterResponse.data().setBrokers(new DescribeClusterResponseData.DescribeClusterBrokerCollection(Collections.singletonList(new DescribeClusterResponseData.DescribeClusterBroker().setBrokerId(this.selfNode.id()).setHost(this.selfNode.host()).setPort(this.selfNode.port())).iterator()));
            return response;
        });
        this.connectionGroup.getMetadataBroker().forwardRequest(inflightRequest);
    }

    private void handleGroupRequest(ApiKeys apiKeys, InflightRequest inflightRequest) throws IOException {
        String groupId = inflightRequest.groupId();
        inflightRequest.setResponseMapper(response -> {
            Errors error = switch (apiKeys) {
                case ApiKeys.JOIN_GROUP -> ((JoinGroupResponse)response).error();
                case ApiKeys.SYNC_GROUP -> ((SyncGroupResponse)response).error();
                case ApiKeys.LEAVE_GROUP -> ((LeaveGroupResponse)response).error();
                case ApiKeys.OFFSET_FETCH -> ((OffsetFetchResponse)response).error();
                case ApiKeys.OFFSET_COMMIT -> ((OffsetCommitResponse)response).errorCounts().keySet().stream().filter(__ -> !__.equals((Object)Errors.NONE)).findFirst().orElse(Errors.NONE);
                case ApiKeys.HEARTBEAT -> ((HeartbeatResponse)response).error();
                case ApiKeys.OFFSET_DELETE -> Errors.forCode((short)((OffsetDeleteResponse)response).data().errorCode());
                case ApiKeys.TXN_OFFSET_COMMIT -> ((TxnOffsetCommitResponse)response).errors().values().stream().filter(__ -> !__.equals((Object)Errors.NONE)).findFirst().orElse(Errors.NONE);
                default -> throw new IllegalStateException(apiKeys + " is not group request");
            };
            if (error == Errors.NOT_COORDINATOR) {
                log.info("[{}] [group: {}] Disconnect the outdated group coordinator", (Object)this.ctx, (Object)groupId);
                try {
                    this.connectionGroup.getGroupCoordinator(groupId).disconnectBroker();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            return response;
        });
        this.connectionGroup.getGroupCoordinator(groupId).forwardRequest(inflightRequest);
    }

    private void handleTxnRequest(ApiKeys apiKeys, InflightRequest inflightRequest) throws IOException {
        String txnId = inflightRequest.txnId();
        inflightRequest.setResponseMapper(response -> {
            Errors error = switch (apiKeys) {
                case ApiKeys.INIT_PRODUCER_ID -> ((InitProducerIdResponse)response).error();
                case ApiKeys.ADD_PARTITIONS_TO_TXN -> ((AddPartitionsToTxnResponse)response).errors().values().stream().filter(__ -> !__.equals((Object)Errors.NONE)).findFirst().orElse(Errors.NONE);
                case ApiKeys.ADD_OFFSETS_TO_TXN -> Errors.forCode((short)((AddOffsetsToTxnResponse)response).data().errorCode());
                case ApiKeys.END_TXN -> ((EndTxnResponse)response).error();
                default -> throw new IllegalStateException(apiKeys + " is not txn request");
            };
            if (error == Errors.NOT_COORDINATOR) {
                log.info("[{}] [txnId: {}] Disconnect the outdated transaction coordinator", (Object)this.ctx, (Object)txnId);
                try {
                    this.connectionGroup.getGroupCoordinator(txnId).disconnectBroker();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            return response;
        });
        this.connectionGroup.getTransactionCoordinator(txnId).forwardRequest(inflightRequest);
    }
}

