/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.proxy;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import io.streamnative.pulsar.handlers.kop.EndPoint;
import io.streamnative.pulsar.handlers.kop.KafkaRequestHandler;
import io.streamnative.pulsar.handlers.kop.proxy.BrokerConnectionGroup;
import io.streamnative.pulsar.handlers.kop.proxy.ConnectionFactory;
import io.streamnative.pulsar.handlers.kop.proxy.ConnectionToBroker;
import io.streamnative.pulsar.handlers.kop.proxy.InflightRequest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProxyRequestHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(KafkaProxyRequestHandler.class);
    private static final LoadingCache<TopicPartition, InetSocketAddress> LEADER_CACHE = Caffeine.newBuilder().maximumSize(10000L).expireAfterWrite(Duration.ofMinutes(5L)).refreshAfterWrite(Duration.ofMinutes(1L)).build((CacheLoader)new CacheLoader<TopicPartition, InetSocketAddress>(){

        public @Nullable InetSocketAddress load(@NonNull TopicPartition topicPartition) {
            return null;
        }
    });
    private final LinkedBlockingQueue<InflightRequest> requestQueue = new LinkedBlockingQueue(500);
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final Node selfNode;
    private final List<Integer> replicaIds;
    private final BrokerConnectionGroup connectionGroup;
    private ChannelHandlerContext ctx;

    public KafkaProxyRequestHandler(EndPoint advertisedEndPoint, ConnectionFactory connectionFactory) {
        this.selfNode = new Node(Murmur3_32Hash.getInstance().makeHash((advertisedEndPoint.getHostname() + advertisedEndPoint.getPort()).getBytes(StandardCharsets.UTF_8)), advertisedEndPoint.getHostname(), advertisedEndPoint.getPort());
        this.replicaIds = Collections.singletonList(this.selfNode.id());
        this.connectionGroup = new BrokerConnectionGroup(connectionFactory);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        try {
            Channel channel = ctx.channel();
            InflightRequest inflightRequest = new InflightRequest(buf, channel.remoteAddress());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received kafka cmd {}", (Object)channel, (Object)inflightRequest);
            }
            ApiKeys apiKeys = inflightRequest.getHeader().apiKey();
            inflightRequest.registerCallback(() -> this.flush(channel), (Executor)ctx.executor());
            if (!ApiKeys.PRODUCE.equals((Object)apiKeys) || ((ProduceRequest)inflightRequest.getRequest()).acks() != 0) {
                this.requestQueue.put(inflightRequest);
            }
            switch (apiKeys) {
                case API_VERSIONS: {
                    this.handleApiVersions(inflightRequest);
                    return;
                }
                case METADATA: {
                    this.handleMetadata(inflightRequest);
                    return;
                }
                case PRODUCE: {
                    this.handleProduce(inflightRequest);
                    return;
                }
                case FIND_COORDINATOR: {
                    this.handleFindCoordinator(inflightRequest);
                    return;
                }
                case JOIN_GROUP: 
                case SYNC_GROUP: 
                case LEAVE_GROUP: 
                case OFFSET_FETCH: 
                case OFFSET_COMMIT: 
                case HEARTBEAT: {
                    this.connectionGroup.getGroupCoordinator().forwardRequest(inflightRequest);
                    return;
                }
                case LIST_OFFSETS: {
                    this.handleListOffsets(inflightRequest);
                    return;
                }
                case FETCH: {
                    this.handleFetch(inflightRequest);
                    return;
                }
                case SASL_HANDSHAKE: 
                case SASL_AUTHENTICATE: {
                    this.connectionGroup.authenticate(inflightRequest);
                    return;
                }
                default: {
                    inflightRequest.complete(inflightRequest.getRequest().getErrorResponse((Throwable)new UnsupportedVersionException("API " + apiKeys + " is not supported")));
                    return;
                }
            }
        }
        catch (IOException e) {
            log.warn("{}", (Object)e.getMessage());
            this.close(ctx);
            return;
        }
        catch (Throwable throwable) {
            log.error("[{}] Unexpected exception when handling request", (Object)ctx.channel(), (Object)throwable);
            this.close(ctx);
            return;
        }
        finally {
            ReferenceCountUtil.safeRelease((Object)buf);
        }
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.ctx = ctx;
        this.connectionGroup.setClientChannel(ctx);
        this.isActive.set(true);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("close channel {}", (Object)ctx.channel());
        this.connectionGroup.close();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[{}] Unexpected exception", (Object)ctx.channel(), (Object)cause);
        this.close(ctx);
    }

    private void close(ChannelHandlerContext ctx) {
        if (this.isActive.compareAndSet(true, false)) {
            ctx.close();
            if (!this.requestQueue.isEmpty()) {
                log.info("[{}] Close with {} pending requests", (Object)ctx, (Object)this.requestQueue.size());
            }
            this.requestQueue.clear();
            this.connectionGroup.close();
        }
    }

    private void flush(Channel channel) {
        InflightRequest inflightRequest;
        while (this.isActive.get() && (inflightRequest = this.requestQueue.peek()) != null && inflightRequest.hasReceivedResponse()) {
            if (!this.requestQueue.remove(inflightRequest)) continue;
            if (inflightRequest.hasFailed(e -> {
                if (e instanceof ConnectionToBroker.ConnectError) {
                    log.warn("[{}] {} failed with {}", new Object[]{channel, inflightRequest.getHeader(), e.getMessage()});
                } else {
                    log.error("[{}] request {} completed exceptionally", new Object[]{channel, inflightRequest.getHeader(), e});
                }
                this.close(this.ctx);
            })) {
                return;
            }
            ByteBuf buf = inflightRequest.toResponseBuf();
            if (log.isDebugEnabled()) {
                log.debug("[{}] Write kafka cmd to client ({} requests left): {}", new Object[]{channel, this.requestQueue.size(), inflightRequest.getHeader().toResponseHeader()});
            }
            channel.writeAndFlush((Object)buf).addListener(future -> {
                if (!future.isSuccess()) {
                    log.error("[{}] Failed to write {}", new Object[]{channel, inflightRequest.getHeader(), future.cause()});
                }
            });
        }
    }

    private void handleApiVersions(InflightRequest inflightRequest) {
        short version = inflightRequest.getHeader().apiVersion();
        inflightRequest.complete(KafkaRequestHandler.overloadDefaultApiVersionsResponse((!ApiKeys.API_VERSIONS.isVersionSupported(version) ? 1 : 0) != 0));
    }

    private void handleMetadata(InflightRequest inflightRequest) throws IOException {
        inflightRequest.setResponseMapper(originalResponse -> {
            MetadataResponse metadataResponse = (MetadataResponse)originalResponse;
            MetadataResponseData data = metadataResponse.data();
            MetadataResponseData.MetadataResponseBrokerCollection brokers = data.brokers();
            data.topics().forEach(topic -> {
                String topicName = topic.name();
                topic.partitions().forEach(partition -> {
                    MetadataResponseData.MetadataResponseBroker broker = brokers.find(partition.leaderId());
                    if (broker != null) {
                        LEADER_CACHE.put((Object)new TopicPartition(topicName, partition.partitionIndex()), (Object)InetSocketAddress.createUnresolved(broker.host(), broker.port()));
                    }
                    partition.setLeaderId(this.selfNode.id());
                    partition.setReplicaNodes(this.replicaIds);
                    partition.setIsrNodes(this.replicaIds);
                });
            });
            data.setControllerId(this.selfNode.id());
            brokers.clear();
            brokers.add((ImplicitLinkedHashCollection.Element)new MetadataResponseData.MetadataResponseBroker().setNodeId(this.selfNode.id()).setHost(this.selfNode.host()).setPort(this.selfNode.port()));
            return metadataResponse;
        });
        this.connectionGroup.getMetadataBroker().forwardRequest(inflightRequest);
    }

    private void handleProduce(InflightRequest inflightRequest) throws IOException {
        boolean cacheRequest;
        ProduceRequest request = (ProduceRequest)inflightRequest.getRequest();
        HashMap<TopicPartition, Errors> errorsMap = new HashMap<TopicPartition, Errors>();
        HashMap<InetSocketAddress, ProduceRequestData.TopicProduceData> partitionDataMap = new HashMap<InetSocketAddress, ProduceRequestData.TopicProduceData>();
        for (ProduceRequestData.TopicProduceData topicData : request.data().topicData()) {
            String topic = topicData.name();
            for (ProduceRequestData.PartitionProduceData partitionData : topicData.partitionData()) {
                TopicPartition topicPartition = new TopicPartition(topic, partitionData.index());
                InetSocketAddress leader = (InetSocketAddress)LEADER_CACHE.get((Object)topicPartition);
                if (leader == null) {
                    errorsMap.put(topicPartition, Errors.NOT_LEADER_OR_FOLLOWER);
                    continue;
                }
                ProduceRequestData.TopicProduceData topicProduceData = partitionDataMap.computeIfAbsent(leader, __ -> new ProduceRequestData.TopicProduceData().setName(topicData.name()));
                topicProduceData.partitionData().add(partitionData);
            }
        }
        if (partitionDataMap.isEmpty()) {
            log.warn("No leader found for {}", (Object)inflightRequest.getHeader());
            inflightRequest.complete(KafkaProxyRequestHandler.createProduceResponse(errorsMap, new ProduceResponseData()));
            return;
        }
        boolean bl = cacheRequest = request.acks() != 0;
        if (partitionDataMap.size() == 1) {
            inflightRequest.setResponseMapper(originalResponse -> {
                if (errorsMap.isEmpty()) {
                    return originalResponse;
                }
                ProduceResponse produceResponse = (ProduceResponse)originalResponse;
                return KafkaProxyRequestHandler.createProduceResponse(errorsMap, produceResponse.data());
            });
            InetSocketAddress firstLeader = (InetSocketAddress)partitionDataMap.keySet().iterator().next();
            this.connectionGroup.getLeader(firstLeader).forwardRequest(inflightRequest, cacheRequest);
        }
    }

    private static ProduceResponse createProduceResponse(Map<TopicPartition, Errors> errorsMap, ProduceResponseData responseData) {
        errorsMap.forEach((topicPartition, errors) -> {
            String topic = topicPartition.topic();
            ProduceResponseData.TopicProduceResponse topicProduceResponse = responseData.responses().find(topic);
            if (topicProduceResponse == null) {
                topicProduceResponse = new ProduceResponseData.TopicProduceResponse().setName(topic);
                responseData.responses().add((ImplicitLinkedHashCollection.Element)topicProduceResponse);
            }
            topicProduceResponse.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse().setErrorCode(errors.code()));
        });
        return new ProduceResponse(responseData);
    }

    private void handleFindCoordinator(InflightRequest inflightRequest) {
        FindCoordinatorRequest request = (FindCoordinatorRequest)inflightRequest.getRequest();
        FindCoordinatorResponseData data = new FindCoordinatorResponseData();
        if (request.version() < 4) {
            data.setErrorCode(Errors.NONE.code()).setErrorMessage(Errors.NONE.message()).setHost(this.selfNode.host()).setPort(this.selfNode.port()).setNodeId(this.selfNode.id());
        } else {
            List coordinatorKeys = request.data().coordinatorKeys();
            data.setCoordinators(coordinatorKeys.stream().map(key -> new FindCoordinatorResponseData.Coordinator().setKey(key).setErrorCode(Errors.NONE.code()).setErrorMessage(Errors.NONE.message()).setHost(this.selfNode.host()).setPort(this.selfNode.port()).setNodeId(this.selfNode.id())).toList());
        }
        inflightRequest.complete(new FindCoordinatorResponse(data));
    }

    private void handleListOffsets(InflightRequest inflightRequest) throws IOException {
        if (inflightRequest.getHeader().apiVersion() == 0) {
            throw new RuntimeException("KoP proxy does not support ListOffset v0 yet");
        }
        ListOffsetsRequest request = (ListOffsetsRequest)inflightRequest.getRequest();
        if (request.data().topics().size() == 0) {
            inflightRequest.complete(new ListOffsetsResponse(new ListOffsetsResponseData()));
            return;
        }
        this.handleListOffsetsV1OrAbove(inflightRequest, request);
    }

    private void handleListOffsetsV1OrAbove(InflightRequest originalRequest, ListOffsetsRequest request) throws IOException {
        HashMap errorsMap = new HashMap();
        HashMap leaderToOffsetData = new HashMap();
        request.data().topics().forEach(topic -> topic.partitions().forEach(partitionData -> {
            TopicPartition topicPartition = new TopicPartition(topic.name(), partitionData.partitionIndex());
            InetSocketAddress leader = (InetSocketAddress)LEADER_CACHE.get((Object)topicPartition);
            if (leader == null) {
                errorsMap.put(topicPartition, Errors.NOT_LEADER_OR_FOLLOWER);
                return;
            }
            leaderToOffsetData.computeIfAbsent(leader, __ -> new HashMap()).computeIfAbsent(topic.name(), __ -> new ArrayList()).add(partitionData);
        }));
        if (leaderToOffsetData.size() == 1) {
            InetSocketAddress leader = (InetSocketAddress)leaderToOffsetData.keySet().iterator().next();
            this.connectionGroup.getLeader(leader).forwardRequest(originalRequest);
        }
    }

    private void handleFetch(InflightRequest inflightRequest) throws IOException {
        FetchRequest request = (FetchRequest)inflightRequest.getRequest();
        HashMap<TopicPartition, Errors> errorsMap = new HashMap<TopicPartition, Errors>();
        HashMap fetchPartitionMap = new HashMap();
        request.data().topics().forEach(fetchTopic -> {
            String topic = fetchTopic.topic();
            fetchTopic.partitions().forEach(fetchPartition -> {
                TopicPartition topicPartition = new TopicPartition(topic, fetchPartition.partition());
                InetSocketAddress leader = (InetSocketAddress)LEADER_CACHE.get((Object)topicPartition);
                if (leader == null) {
                    errorsMap.put(topicPartition, Errors.NOT_LEADER_OR_FOLLOWER);
                    return;
                }
                fetchPartitionMap.computeIfAbsent(leader, __ -> new FetchRequestData.FetchTopic().setTopic(topic).setTopicId(fetchTopic.topicId())).partitions().add(fetchPartition);
            });
        });
        if (fetchPartitionMap.isEmpty()) {
            log.warn("No leader found for {}", (Object)inflightRequest.getRequest());
            inflightRequest.complete(KafkaProxyRequestHandler.createFetchResponse(errorsMap, new FetchResponseData()));
        }
        if (fetchPartitionMap.size() == 1) {
            InetSocketAddress leader = (InetSocketAddress)fetchPartitionMap.keySet().iterator().next();
            this.connectionGroup.getLeader(leader).forwardRequest(inflightRequest);
        }
    }

    private static FetchResponse createFetchResponse(Map<TopicPartition, Errors> errorsMap, FetchResponseData responseData) {
        errorsMap.forEach((topicPartition, errors) -> {
            FetchResponseData.PartitionData partitionResponse;
            String topic = topicPartition.topic();
            FetchResponseData.FetchableTopicResponse topicResponse = responseData.responses().stream().filter(__ -> __.topic().equals(topic)).findFirst().orElse(null);
            if (topicResponse == null) {
                topicResponse = new FetchResponseData.FetchableTopicResponse().setTopic(topic);
                responseData.responses().add(topicResponse);
            }
            if ((partitionResponse = (FetchResponseData.PartitionData)topicResponse.partitions().stream().filter(__ -> __.partitionIndex() == topicPartition.partition()).findFirst().orElse(null)) == null) {
                partitionResponse = new FetchResponseData.PartitionData().setPartitionIndex(topicPartition.partition()).setErrorCode(errors.code());
                topicResponse.partitions().add(partitionResponse);
            }
        });
        return new FetchResponse(responseData);
    }
}

