/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import java.io.Closeable;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.KopResponseUtils;
import org.apache.kafka.common.requests.ListOffsetRequestV0;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseCallbackWrapper;
import org.apache.kafka.common.requests.ResponseHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaCommandDecoder
extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(KafkaCommandDecoder.class);
    protected ChannelHandlerContext ctx;
    protected SocketAddress remoteAddress;
    protected AtomicBoolean isActive = new AtomicBoolean(false);
    private final LinkedBlockingQueue<ResponseAndRequest> requestQueue;
    protected volatile RequestStats requestStats;
    protected final KafkaServiceConfiguration kafkaConfig;
    private final OrderedScheduler sendResponseScheduler;

    public KafkaCommandDecoder(RequestStats requestStats, KafkaServiceConfiguration kafkaConfig, OrderedScheduler sendResponseScheduler) {
        this.requestStats = requestStats;
        this.kafkaConfig = kafkaConfig;
        this.requestQueue = new LinkedBlockingQueue(kafkaConfig.getMaxQueuedRequests());
        this.sendResponseScheduler = sendResponseScheduler;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.remoteAddress = ctx.channel().remoteAddress();
        this.ctx = ctx;
        this.isActive.set(true);
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            if (log.isDebugEnabled()) {
                log.debug("About to close the idle connection from {} due to being idle for {} millis", (Object)this.getRemoteAddress(), (Object)this.kafkaConfig.getConnectionMaxIdleMs());
            }
            this.close();
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("[{}] Got exception: {}", new Object[]{this.remoteAddress, cause.getMessage(), cause});
        this.close();
    }

    protected void close() {
        ResponseAndRequest responseAndRequest;
        log.info("close channel {} with {} pending responses", (Object)this.ctx.channel(), (Object)this.requestQueue.size());
        while ((responseAndRequest = this.requestQueue.poll()) != null) {
            responseAndRequest.cancel();
            RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.decrementAndGet();
        }
        this.ctx.close();
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", (Object)ctx.channel().isWritable());
        }
    }

    protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg, SocketAddress remoteAddress) {
        Preconditions.checkArgument((msg.readableBytes() > 0 ? 1 : 0) != 0);
        ByteBuffer nio = msg.nioBuffer();
        RequestHeader header = RequestHeader.parse((ByteBuffer)nio);
        if (KafkaCommandDecoder.isUnsupportedApiVersionsRequest(header)) {
            ApiVersionsRequest apiVersionsRequest = (ApiVersionsRequest)new ApiVersionsRequest.Builder(header.apiVersion()).build();
            return new KafkaHeaderAndRequest(header, (AbstractRequest)apiVersionsRequest, msg, remoteAddress);
        }
        ApiKeys apiKey = header.apiKey();
        short apiVersion = header.apiVersion();
        AbstractRequest body = AbstractRequest.parseRequest((ApiKeys)apiKey, (short)apiVersion, (ByteBuffer)nio).request;
        return new KafkaHeaderAndRequest(header, body, msg, remoteAddress);
    }

    protected ListOffsetRequestV0 byteBufToListOffsetRequestV0(ByteBuf buf) {
        Preconditions.checkArgument((buf.readableBytes() > 0 ? 1 : 0) != 0);
        ByteBuffer nio = buf.nioBuffer();
        RequestHeader header = RequestHeader.parse((ByteBuffer)nio);
        short apiVersion = header.apiVersion();
        return ListOffsetRequestV0.parse(nio, apiVersion);
    }

    protected static ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeaderAndRequest request) {
        try {
            KafkaHeaderAndResponse kafkaHeaderAndResponse = KafkaHeaderAndResponse.responseForRequest(request, response);
            try {
                short apiVersion = kafkaHeaderAndResponse.getApiVersion();
                if (request.getHeader().apiKey() == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(apiVersion)) {
                    apiVersion = ApiKeys.API_VERSIONS.oldestVersion();
                }
                ByteBuf byteBuf = KopResponseUtils.serializeResponse(apiVersion, kafkaHeaderAndResponse.getHeader(), kafkaHeaderAndResponse.getResponse());
                if (kafkaHeaderAndResponse != null) {
                    kafkaHeaderAndResponse.close();
                }
                return byteBuf;
            }
            catch (Throwable throwable) {
                if (kafkaHeaderAndResponse != null) {
                    try {
                        kafkaHeaderAndResponse.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
        }
        finally {
            request.close();
        }
    }

    protected boolean channelReady() {
        return this.hasAuthenticated();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buffer = (ByteBuf)msg;
        this.requestStats.getNetworkTotalBytesIn().add((long)buffer.readableBytes());
        BiConsumer<Long, Throwable> registerRequestParseLatency = (timeBeforeParse, throwable) -> this.requestStats.getRequestParseLatencyStats().registerSuccessfulEvent(MathUtils.elapsedNanos((long)timeBeforeParse), TimeUnit.NANOSECONDS);
        BiConsumer<ApiKeys, Long> registerRequestLatency = (apiKey, startProcessTime) -> this.requestStats.getRequestStatsLogger((ApiKeys)apiKey, "REQUEST_LATENCY").registerSuccessfulEvent(MathUtils.elapsedNanos((long)startProcessTime), TimeUnit.NANOSECONDS);
        if (this.isActive.get() && !this.channelReady()) {
            try {
                this.channelPrepare(ctx, buffer, registerRequestParseLatency, registerRequestLatency);
                return;
            }
            catch (AuthenticationException e2) {
                log.error("Failed authentication with [{}] ({})", (Object)this.remoteAddress, (Object)e2.getMessage());
                this.maybeDelayCloseOnAuthenticationFailure();
                return;
            }
            finally {
                buffer.release();
            }
        }
        Channel channel = ctx.channel();
        SocketAddress remoteAddress = null;
        if (null != channel) {
            remoteAddress = channel.remoteAddress();
        }
        long timeBeforeParse2 = MathUtils.nowInNano();
        KafkaHeaderAndRequest kafkaHeaderAndRequest = this.byteBufToRequest(buffer, remoteAddress);
        registerRequestParseLatency.accept(timeBeforeParse2, null);
        try {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received kafka cmd {}, the request content is: {}", new Object[]{ctx.channel() != null ? ctx.channel().remoteAddress() : "Null channel", kafkaHeaderAndRequest.getHeader(), kafkaHeaderAndRequest});
            }
            CompletableFuture<AbstractResponse> responseFuture = new CompletableFuture<AbstractResponse>();
            long startProcessRequestTimestamp = MathUtils.nowInNano();
            responseFuture.whenComplete((response, e) -> {
                if (e instanceof CancellationException) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Request {} is cancelled", (Object)ctx.channel(), (Object)kafkaHeaderAndRequest.getHeader());
                    }
                    return;
                }
                registerRequestLatency.accept(kafkaHeaderAndRequest.getHeader().apiKey(), startProcessRequestTimestamp);
                this.sendResponseScheduler.executeOrdered(channel.remoteAddress().hashCode(), () -> this.writeAndFlushResponseToClient(channel));
            });
            this.requestQueue.put(ResponseAndRequest.of(responseFuture, kafkaHeaderAndRequest));
            RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.incrementAndGet();
            if (!this.isActive.get()) {
                this.handleInactive(kafkaHeaderAndRequest, responseFuture);
                return;
            }
            switch (kafkaHeaderAndRequest.getHeader().apiKey()) {
                case API_VERSIONS: {
                    this.handleApiVersionsRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case METADATA: {
                    this.handleTopicMetadataRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case PRODUCE: {
                    this.handleProduceRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case FIND_COORDINATOR: {
                    this.handleFindCoordinatorRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case LIST_OFFSETS: {
                    this.handleListOffsetRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case OFFSET_FETCH: {
                    this.handleOffsetFetchRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case OFFSET_COMMIT: {
                    this.handleOffsetCommitRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case FETCH: {
                    this.handleFetchRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case JOIN_GROUP: {
                    this.handleJoinGroupRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case SYNC_GROUP: {
                    this.handleSyncGroupRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case HEARTBEAT: {
                    this.handleHeartbeatRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case LEAVE_GROUP: {
                    this.handleLeaveGroupRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case DESCRIBE_GROUPS: {
                    this.handleDescribeGroupRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case LIST_GROUPS: {
                    this.handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case DELETE_GROUPS: {
                    this.handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case SASL_HANDSHAKE: {
                    this.handleSaslHandshake(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case SASL_AUTHENTICATE: {
                    this.handleSaslAuthenticate(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case CREATE_TOPICS: {
                    this.handleCreateTopics(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case INIT_PRODUCER_ID: {
                    this.handleInitProducerId(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case ADD_PARTITIONS_TO_TXN: {
                    this.handleAddPartitionsToTxn(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case ADD_OFFSETS_TO_TXN: {
                    this.handleAddOffsetsToTxn(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case TXN_OFFSET_COMMIT: {
                    this.handleTxnOffsetCommit(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case END_TXN: {
                    this.handleEndTxn(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case WRITE_TXN_MARKERS: {
                    this.handleWriteTxnMarkers(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case DESCRIBE_CONFIGS: {
                    this.handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case ALTER_CONFIGS: {
                    this.handleAlterConfigs(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case DELETE_TOPICS: {
                    this.handleDeleteTopics(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case DELETE_RECORDS: {
                    this.handleDeleteRecords(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                case CREATE_PARTITIONS: {
                    this.handleCreatePartitions(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
                default: {
                    this.handleError(kafkaHeaderAndRequest, responseFuture);
                    return;
                }
            }
        }
        catch (Exception e3) {
            log.error("error while handle command:", (Throwable)e3);
            this.close();
            return;
        }
        finally {
            buffer.release();
        }
    }

    protected void writeAndFlushResponseToClient(Channel channel) {
        ResponseAndRequest responseAndRequest;
        while (this.isActive.get() && (responseAndRequest = this.requestQueue.peek()) != null) {
            boolean expired;
            CompletableFuture<AbstractResponse> responseFuture = responseAndRequest.getResponseFuture();
            ApiKeys apiKey = responseAndRequest.getApiKey();
            long nanoSecondsSinceCreated = responseAndRequest.nanoSecondsSinceCreated();
            boolean bl = expired = nanoSecondsSinceCreated > TimeUnit.MILLISECONDS.toNanos(this.kafkaConfig.getRequestTimeoutMs());
            if (!responseFuture.isDone() && !expired) {
                this.requestStats.getResponseBlockedTimes().inc();
                long firstBlockTimestamp = responseAndRequest.getFirstBlockedTimestamp();
                if (firstBlockTimestamp != 0L) break;
                responseAndRequest.setFirstBlockedTimestamp(MathUtils.nowInNano());
                break;
            }
            if (!this.requestQueue.remove(responseAndRequest)) continue;
            RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.decrementAndGet();
            if (responseAndRequest.getFirstBlockedTimestamp() != 0L) {
                this.requestStats.getResponseBlockedLatency().registerSuccessfulEvent(MathUtils.elapsedNanos((long)responseAndRequest.getFirstBlockedTimestamp()), TimeUnit.NANOSECONDS);
            }
            KafkaHeaderAndRequest request = responseAndRequest.getRequest();
            if (responseFuture.isCompletedExceptionally()) {
                responseFuture.exceptionally(e -> {
                    log.error("[{}] request {} completed exceptionally", new Object[]{channel, request.getHeader(), e});
                    this.sendErrorResponse(request, channel, (Throwable)e);
                    this.requestStats.getRequestStatsLogger(apiKey, "REQUEST_QUEUED_LATENCY").registerFailedEvent(nanoSecondsSinceCreated, TimeUnit.NANOSECONDS);
                    return null;
                });
                continue;
            }
            if (responseFuture.isDone()) {
                responseFuture.thenAccept(response -> {
                    if (response == null) {
                        log.error("[{}] Unexpected null completed future for request {}", (Object)this.ctx.channel(), (Object)request.getHeader());
                        this.sendErrorResponse(request, channel, (Throwable)new ApiException("response is null"));
                        return;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Write kafka cmd to client. request content: {} responseAndRequest content: {}", (Object)request, response);
                    }
                    ByteBuf result = KafkaCommandDecoder.responseToByteBuf(response, request);
                    int resultSize = result.readableBytes();
                    channel.writeAndFlush((Object)result).addListener(future -> {
                        if (response instanceof ResponseCallbackWrapper) {
                            ((ResponseCallbackWrapper)((Object)response)).responseComplete();
                        }
                        if (!future.isSuccess()) {
                            log.error("[{}] Failed to write {}", new Object[]{channel, request.getHeader(), future.cause()});
                        } else {
                            this.requestStats.getNetworkTotalBytesOut().add((long)resultSize);
                        }
                    });
                    this.requestStats.getRequestStatsLogger(apiKey, "REQUEST_QUEUED_LATENCY").registerSuccessfulEvent(nanoSecondsSinceCreated, TimeUnit.NANOSECONDS);
                });
                continue;
            }
            if (!expired) continue;
            log.error("[{}] request {} is not completed for {} ns (> {} ms)", new Object[]{channel, request.getHeader(), nanoSecondsSinceCreated, this.kafkaConfig.getRequestTimeoutMs()});
            responseFuture.cancel(true);
            this.sendErrorResponse(request, channel, (Throwable)new ApiException("request is expired from server side"));
            this.requestStats.getRequestStatsLogger(apiKey, "REQUEST_QUEUED_LATENCY").registerFailedEvent(nanoSecondsSinceCreated, TimeUnit.NANOSECONDS);
        }
    }

    private void sendErrorResponse(KafkaHeaderAndRequest request, Channel channel, Throwable customError) {
        ByteBuf result = request.createErrorResponse(customError);
        int resultSize = result.readableBytes();
        channel.writeAndFlush((Object)result).addListener(future -> {
            if (future.isSuccess()) {
                this.requestStats.getNetworkTotalBytesOut().add((long)resultSize);
            }
        });
    }

    protected abstract boolean hasAuthenticated();

    protected abstract void channelPrepare(ChannelHandlerContext var1, ByteBuf var2, BiConsumer<Long, Throwable> var3, BiConsumer<ApiKeys, Long> var4) throws AuthenticationException;

    protected abstract void maybeDelayCloseOnAuthenticationFailure();

    protected abstract void completeCloseOnAuthenticationFailure();

    protected abstract void handleError(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleInactive(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleApiVersionsRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleTopicMetadataRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleProduceRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleFindCoordinatorRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleListOffsetRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleOffsetFetchRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleOffsetCommitRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleFetchRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleJoinGroupRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleSyncGroupRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleHeartbeatRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleLeaveGroupRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleDescribeGroupRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleListGroupsRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleSaslAuthenticate(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleSaslHandshake(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleCreateTopics(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleDescribeConfigs(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleAlterConfigs(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleInitProducerId(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleAddPartitionsToTxn(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleAddOffsetsToTxn(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleTxnOffsetCommit(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleEndTxn(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleWriteTxnMarkers(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleDeleteTopics(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleDeleteRecords(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    protected abstract void handleCreatePartitions(KafkaHeaderAndRequest var1, CompletableFuture<AbstractResponse> var2);

    private static boolean isUnsupportedApiVersionsRequest(RequestHeader header) {
        return header.apiKey() == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(header.apiVersion());
    }

    public SocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    public AtomicBoolean getIsActive() {
        return this.isActive;
    }

    public RequestStats getRequestStats() {
        return this.requestStats;
    }

    public void setRequestStats(RequestStats requestStats) {
        this.requestStats = requestStats;
    }

    public KafkaServiceConfiguration getKafkaConfig() {
        return this.kafkaConfig;
    }

    static class ResponseAndRequest {
        private final CompletableFuture<AbstractResponse> responseFuture;
        private final KafkaHeaderAndRequest request;
        private final long createdTimestamp;
        private long firstBlockedTimestamp;

        public static ResponseAndRequest of(CompletableFuture<AbstractResponse> response, KafkaHeaderAndRequest request) {
            return new ResponseAndRequest(response, request);
        }

        public long nanoSecondsSinceCreated() {
            return MathUtils.elapsedNanos((long)this.createdTimestamp);
        }

        public ApiKeys getApiKey() {
            return this.request.getHeader().apiKey();
        }

        public boolean expired(int requestTimeoutMs) {
            return MathUtils.elapsedNanos((long)this.createdTimestamp) > TimeUnit.MILLISECONDS.toNanos(requestTimeoutMs);
        }

        public void cancel() {
            this.responseFuture.cancel(true);
            this.request.close();
        }

        ResponseAndRequest(CompletableFuture<AbstractResponse> response, KafkaHeaderAndRequest request) {
            this.responseFuture = response;
            this.request = request;
            this.createdTimestamp = MathUtils.nowInNano();
            this.firstBlockedTimestamp = 0L;
        }

        public CompletableFuture<AbstractResponse> getResponseFuture() {
            return this.responseFuture;
        }

        public KafkaHeaderAndRequest getRequest() {
            return this.request;
        }

        public long getCreatedTimestamp() {
            return this.createdTimestamp;
        }

        public long getFirstBlockedTimestamp() {
            return this.firstBlockedTimestamp;
        }

        public void setFirstBlockedTimestamp(long firstBlockedTimestamp) {
            this.firstBlockedTimestamp = firstBlockedTimestamp;
        }
    }

    public static class KafkaHeaderAndRequest {
        private static final String DEFAULT_CLIENT_HOST = "";
        private final RequestHeader header;
        private final AbstractRequest request;
        private final ByteBuf buffer;
        private final SocketAddress remoteAddress;

        public KafkaHeaderAndRequest(RequestHeader header, AbstractRequest request, ByteBuf buffer, SocketAddress remoteAddress) {
            this.header = header;
            this.request = request;
            this.buffer = buffer.retain();
            this.remoteAddress = remoteAddress;
        }

        public ByteBuf getBuffer() {
            return this.buffer;
        }

        public RequestHeader getHeader() {
            return this.header;
        }

        public AbstractRequest getRequest() {
            return this.request;
        }

        public SocketAddress getRemoteAddress() {
            return this.remoteAddress;
        }

        public String getClientHost() {
            if (this.remoteAddress == null) {
                return DEFAULT_CLIENT_HOST;
            }
            return this.remoteAddress.toString();
        }

        public ByteBuf createErrorResponse(Throwable e) {
            return KafkaCommandDecoder.responseToByteBuf(this.request.getErrorResponse(e), this);
        }

        public String toString() {
            return String.format("KafkaHeaderAndRequest(header=%s, request=%s, remoteAddress=%s)", this.header, this.request, this.remoteAddress);
        }

        public void close() {
            ReferenceCountUtil.safeRelease((Object)this.buffer);
        }
    }

    public static class KafkaHeaderAndResponse
    implements Closeable {
        private final short apiVersion;
        private final ResponseHeader header;
        private final AbstractResponse response;

        private KafkaHeaderAndResponse(short apiVersion, ResponseHeader header, AbstractResponse response) {
            this.apiVersion = apiVersion;
            this.header = header;
            this.response = response;
        }

        public short getApiVersion() {
            return this.apiVersion;
        }

        public ResponseHeader getHeader() {
            return this.header;
        }

        public AbstractResponse getResponse() {
            return this.response;
        }

        static KafkaHeaderAndResponse responseForRequest(KafkaHeaderAndRequest request, AbstractResponse response) {
            return new KafkaHeaderAndResponse(request.getHeader().apiVersion(), request.getHeader().toResponseHeader(), response);
        }

        public String toString() {
            return String.format("KafkaHeaderAndResponse(header=%s,responseFuture=%s)", this.header, this.response);
        }

        @Override
        public void close() {
        }
    }
}

