/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.CorrelationIdGenerator;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.PendingRequest;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.ResponseContext;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerChannelManager;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.security.sasl.SaslException;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMarkerChannelHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(TransactionMarkerChannelHandler.class);
    private final CompletableFuture<ChannelHandlerContext> cnxFuture = new CompletableFuture();
    private final CorrelationIdGenerator correlationIdGenerator = new CorrelationIdGenerator();
    private final ResponseContext responseContext = new ResponseContext();
    private final ConcurrentLongHashMap<PendingRequest> pendingRequestMap = new ConcurrentLongHashMap();
    private final TransactionMarkerChannelManager transactionMarkerChannelManager;
    private final String mechanism;

    public TransactionMarkerChannelHandler(TransactionMarkerChannelManager transactionMarkerChannelManager) {
        this.transactionMarkerChannelManager = transactionMarkerChannelManager;
        this.mechanism = transactionMarkerChannelManager.getAuthentication() instanceof AuthenticationToken ? "PLAIN" : (transactionMarkerChannelManager.getAuthentication() instanceof AuthenticationOAuth2 ? "OAUTHBEARER" : "");
    }

    private void enqueueRequest(ChannelHandlerContext channel, PendingRequest pendingRequest) {
        long correlationId = pendingRequest.getCorrelationId();
        this.pendingRequestMap.put(correlationId, (Object)pendingRequest);
        channel.writeAndFlush((Object)Unpooled.wrappedBuffer((ByteBuffer)pendingRequest.serialize())).addListener(writeFuture -> {
            if (!writeFuture.isSuccess()) {
                pendingRequest.completeExceptionally(writeFuture.cause());
                this.pendingRequestMap.remove(correlationId);
                channel.close();
            }
        });
    }

    public void enqueueWriteTxnMarkers(List<WriteTxnMarkersRequest.TxnMarkerEntry> txnMarkerEntries, Consumer<ResponseContext> responseContextConsumer) {
        this.cnxFuture.whenComplete((cnx, e) -> {
            if (e == null) {
                this.enqueueRequest((ChannelHandlerContext)cnx, new PendingRequest(ApiKeys.WRITE_TXN_MARKERS, this.correlationIdGenerator.next(), (AbstractRequest)TransactionMarkerChannelHandler.newWriteTxnMarkers(txnMarkerEntries), responseContextConsumer));
            } else {
                log.error("Failed to enqueue request because the channel failed", e);
            }
        });
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        log.info("[TransactionMarkerChannelHandler] channelActive to {}", (Object)channelHandlerContext.channel());
        this.handleAuthentication(channelHandlerContext);
        super.channelActive(channelHandlerContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        log.info("[TransactionMarkerChannelHandler] channelInactive, failing {} pending requests", (Object)this.pendingRequestMap.size());
        this.pendingRequestMap.forEach((__, pendingRequest) -> log.warn("Pending request ({}) was not sent when the txn marker channel is inactive", pendingRequest));
        this.pendingRequestMap.clear();
        this.transactionMarkerChannelManager.channelFailed((InetSocketAddress)channelHandlerContext.channel().remoteAddress(), this);
        super.channelInactive(channelHandlerContext);
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        ByteBuffer nio = ((ByteBuf)o).nioBuffer();
        if (nio.remaining() < 4) {
            log.error("Short read from channel {}", (Object)channelHandlerContext.channel());
            channelHandlerContext.close();
            return;
        }
        int correlationId = nio.getInt(0);
        PendingRequest pendingRequest = (PendingRequest)this.pendingRequestMap.remove((long)correlationId);
        if (pendingRequest != null) {
            pendingRequest.complete(this.responseContext.set(channelHandlerContext.channel().remoteAddress(), pendingRequest.getApiVersion(), correlationId, pendingRequest.parseResponse(nio)));
        } else {
            log.error("Miss the inFlightRequest with correlationId {}.", (Object)correlationId);
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
        log.error("Transaction marker channel handler caught exception.", throwable);
        this.pendingRequestMap.forEach((__, pendingRequest) -> log.warn("Pending request ({}) failed because the txn marker channel caught exception", pendingRequest, (Object)throwable));
        this.pendingRequestMap.clear();
        channelHandlerContext.close();
    }

    public void close() {
        log.info("[TransactionMarkerChannelHandler] closing");
        this.cnxFuture.whenComplete((ctx, err) -> {
            if (ctx != null) {
                ctx.close();
            }
        });
    }

    public void handleAuthentication(ChannelHandlerContext channelHandlerContext) {
        if (!this.transactionMarkerChannelManager.getKafkaConfig().isAuthenticationEnabled()) {
            this.cnxFuture.complete(channelHandlerContext);
            return;
        }
        ((CompletableFuture)((CompletableFuture)this.saslHandshake(channelHandlerContext).thenCompose(this::authenticate)).thenApply(this.cnxFuture::complete)).exceptionally(err -> {
            this.cnxFuture.completeExceptionally((Throwable)err);
            return false;
        });
    }

    private CompletableFuture<ChannelHandlerContext> saslHandshake(ChannelHandlerContext channel) {
        PendingRequest pendingRequest = new PendingRequest(ApiKeys.SASL_HANDSHAKE, this.correlationIdGenerator.next(), (AbstractRequest)TransactionMarkerChannelHandler.newSaslHandshake(this.mechanism), __ -> {});
        this.enqueueRequest(channel, pendingRequest);
        return ((CompletableFuture)pendingRequest.getSendFuture().thenApply(__ -> {
            log.debug("SASL Handshake completed with success");
            return channel;
        })).exceptionally(error -> {
            log.error("SASL handshake failed", error);
            channel.close();
            return null;
        });
    }

    private CompletableFuture<ChannelHandlerContext> authenticate(ChannelHandlerContext channel) {
        CompletableFuture<ChannelHandlerContext> internal = this.authenticateInternal(channel);
        internal.exceptionally(error -> {
            channel.close();
            return null;
        });
        return internal;
    }

    private CompletableFuture<ChannelHandlerContext> authenticateInternal(ChannelHandlerContext channel) {
        CompletableFuture<ChannelHandlerContext> result = new CompletableFuture<ChannelHandlerContext>();
        Authentication authentication = this.transactionMarkerChannelManager.getAuthentication();
        try {
            String commandData = authentication.getAuthData().getCommandData();
            PendingRequest pendingRequest = new PendingRequest(ApiKeys.SASL_AUTHENTICATE, this.correlationIdGenerator.next(), (AbstractRequest)TransactionMarkerChannelHandler.newSaslAuthenticate(switch (this.mechanism) {
                case "PLAIN" -> {
                    String prefix = "TX";
                    String authUsername = this.transactionMarkerChannelManager.getAuthenticationUsername();
                    String authPassword = "token:" + commandData;
                    String usernamePassword = prefix + "\u0000" + authUsername + "\u0000" + authPassword;
                    yield usernamePassword.getBytes(StandardCharsets.UTF_8);
                }
                case "OAUTHBEARER" -> new OAuthBearerClientInitialResponse(commandData, null, null).toBytes();
                default -> {
                    log.error("No corresponding mechanism to {}", (Object)authentication.getClass().getName());
                    yield new byte[]{};
                }
            }), __ -> {});
            this.enqueueRequest(channel, pendingRequest);
            pendingRequest.getSendFuture().whenComplete((response, e) -> {
                if (e != null) {
                    result.completeExceptionally((Throwable)e);
                    return;
                }
                SaslAuthenticateResponse saslResponse = (SaslAuthenticateResponse)response;
                if (saslResponse.error() == Errors.NONE) {
                    log.debug("Success step AUTH to KOP broker {} {} {}", new Object[]{saslResponse.error(), saslResponse.errorMessage(), saslResponse.saslAuthBytes()});
                    result.complete(channel);
                } else {
                    log.error("Failed authentication against KOP broker {}{}", (Object)saslResponse.error(), (Object)saslResponse.errorMessage());
                    result.completeExceptionally(saslResponse.error().exception());
                }
            });
        }
        catch (SaslException | PulsarClientException ex) {
            log.error("Transaction marker channel handler authentication failed.", ex);
            result.completeExceptionally(ex);
        }
        return result;
    }

    private static SaslHandshakeRequest newSaslHandshake(String mechanism) {
        return (SaslHandshakeRequest)new SaslHandshakeRequest.Builder(new SaslHandshakeRequestData().setMechanism(mechanism)).build();
    }

    private static SaslAuthenticateRequest newSaslAuthenticate(byte[] saslAuthBytes) {
        return (SaslAuthenticateRequest)new SaslAuthenticateRequest.Builder(new SaslAuthenticateRequestData().setAuthBytes(saslAuthBytes)).build();
    }

    private static WriteTxnMarkersRequest newWriteTxnMarkers(List<WriteTxnMarkersRequest.TxnMarkerEntry> txnMarkerEntries) {
        return (WriteTxnMarkersRequest)new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), txnMarkerEntries).build();
    }
}

