/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.ConflictException;
import com.azure.cosmos.implementation.CosmosError;
import com.azure.cosmos.implementation.ForbiddenException;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.InvalidPartitionException;
import com.azure.cosmos.implementation.LockedException;
import com.azure.cosmos.implementation.MethodNotAllowedException;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.PreconditionFailedException;
import com.azure.cosmos.implementation.RequestEntityTooLargeException;
import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.RetryWithException;
import com.azure.cosmos.implementation.ServiceUnavailableException;
import com.azure.cosmos.implementation.UnauthorizedException;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdClientChannelHealthChecker;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdContext;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdContextException;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdContextNegotiator;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdContextRequest;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdHealthCheckRequest;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestRecord;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdResponse;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.guava27.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCounted;
import io.netty.util.Timeout;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.ThrowableUtil;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RntbdRequestManager
implements ChannelHandler,
ChannelInboundHandler,
ChannelOutboundHandler {
    private static final ClosedChannelException ON_CHANNEL_UNREGISTERED = (ClosedChannelException)ThrowableUtil.unknownStackTrace((Throwable)new ClosedChannelException(), RntbdRequestManager.class, (String)"channelUnregistered");
    private static final ClosedChannelException ON_CLOSE = (ClosedChannelException)ThrowableUtil.unknownStackTrace((Throwable)new ClosedChannelException(), RntbdRequestManager.class, (String)"close");
    private static final ClosedChannelException ON_DEREGISTER = (ClosedChannelException)ThrowableUtil.unknownStackTrace((Throwable)new ClosedChannelException(), RntbdRequestManager.class, (String)"deregister");
    private static final Logger logger = LoggerFactory.getLogger(RntbdRequestManager.class);
    private final CompletableFuture<RntbdContext> contextFuture = new CompletableFuture();
    private final CompletableFuture<RntbdContextRequest> contextRequestFuture = new CompletableFuture();
    private final ChannelHealthChecker healthChecker;
    private final int pendingRequestLimit;
    private final ConcurrentHashMap<Long, RntbdRequestRecord> pendingRequests;
    private final RntbdClientChannelHealthChecker.Timestamps timestamps = new RntbdClientChannelHealthChecker.Timestamps();
    private boolean closingExceptionally = false;
    private CoalescingBufferQueue pendingWrites;

    public RntbdRequestManager(ChannelHealthChecker healthChecker, int pendingRequestLimit) {
        Preconditions.checkArgument(pendingRequestLimit > 0, "pendingRequestLimit: %s", pendingRequestLimit);
        Preconditions.checkNotNull(healthChecker, "healthChecker");
        this.pendingRequests = new ConcurrentHashMap(pendingRequestLimit);
        this.pendingRequestLimit = pendingRequestLimit;
        this.healthChecker = healthChecker;
    }

    public void handlerAdded(ChannelHandlerContext context) {
        this.traceOperation(context, "handlerAdded", new Object[0]);
    }

    public void handlerRemoved(ChannelHandlerContext context) {
        this.traceOperation(context, "handlerRemoved", new Object[0]);
    }

    public void channelActive(ChannelHandlerContext context) {
        this.traceOperation(context, "channelActive", new Object[0]);
        context.fireChannelActive();
    }

    public void channelInactive(ChannelHandlerContext context) {
        this.traceOperation(context, "channelInactive", new Object[0]);
        context.fireChannelInactive();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext context, Object message) {
        block8: {
            this.traceOperation(context, "channelRead", new Object[0]);
            try {
                if (message.getClass() == RntbdResponse.class) {
                    try {
                        this.messageReceived(context, (RntbdResponse)message);
                    }
                    catch (CorruptedFrameException error) {
                        this.exceptionCaught(context, error);
                    }
                    catch (Throwable throwable) {
                        RntbdRequestManager.reportIssue(context, "{} ", message, throwable);
                        this.exceptionCaught(context, throwable);
                    }
                    break block8;
                }
                IllegalStateException error = new IllegalStateException(Strings.lenientFormat("expected message of %s, not %s: %s", RntbdResponse.class, message.getClass(), message));
                RntbdRequestManager.reportIssue(context, "", error);
                this.exceptionCaught(context, error);
            }
            catch (Throwable throwable) {
                if (message instanceof ReferenceCounted) {
                    boolean released = ((ReferenceCounted)message).release();
                    RntbdRequestManager.reportIssueUnless(released, context, "failed to release message: {}", message);
                }
                throw throwable;
            }
        }
        if (message instanceof ReferenceCounted) {
            boolean released = ((ReferenceCounted)message).release();
            RntbdRequestManager.reportIssueUnless(released, context, "failed to release message: {}", message);
        }
    }

    public void channelReadComplete(ChannelHandlerContext context) {
        this.traceOperation(context, "channelReadComplete", new Object[0]);
        this.timestamps.channelReadCompleted();
        context.fireChannelReadComplete();
    }

    public void channelRegistered(ChannelHandlerContext context) {
        this.traceOperation(context, "channelRegistered", new Object[0]);
        RntbdRequestManager.reportIssueUnless(this.pendingWrites == null, context, "pendingWrites: {}", this.pendingWrites);
        this.pendingWrites = new CoalescingBufferQueue(context.channel());
        context.fireChannelRegistered();
    }

    public void channelUnregistered(ChannelHandlerContext context) {
        this.traceOperation(context, "channelUnregistered", new Object[0]);
        if (!this.closingExceptionally) {
            this.completeAllPendingRequestsExceptionally(context, ON_CHANNEL_UNREGISTERED);
        } else {
            logger.debug("{} channelUnregistered exceptionally", (Object)context);
        }
        context.fireChannelUnregistered();
    }

    public void channelWritabilityChanged(ChannelHandlerContext context) {
        this.traceOperation(context, "channelWritabilityChanged", new Object[0]);
        context.fireChannelWritabilityChanged();
    }

    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
        this.traceOperation(context, "exceptionCaught", cause);
        if (!this.closingExceptionally) {
            this.completeAllPendingRequestsExceptionally(context, cause);
            logger.debug("{} closing due to:", (Object)context, (Object)cause);
            context.flush().close();
        }
    }

    public void userEventTriggered(ChannelHandlerContext context, Object event) {
        this.traceOperation(context, "userEventTriggered", event);
        try {
            if (event instanceof IdleStateEvent) {
                this.healthChecker.isHealthy(context.channel()).addListener(future -> {
                    Object cause;
                    if (future.isSuccess()) {
                        if (((Boolean)future.get()).booleanValue()) {
                            return;
                        }
                        cause = UnhealthyChannelException.INSTANCE;
                    } else {
                        cause = future.cause();
                    }
                    this.exceptionCaught(context, (Throwable)cause);
                });
                return;
            }
            if (event instanceof RntbdContext) {
                this.contextFuture.complete((RntbdContext)event);
                this.removeContextNegotiatorAndFlushPendingWrites(context);
                return;
            }
            if (event instanceof RntbdContextException) {
                this.contextFuture.completeExceptionally((RntbdContextException)event);
                context.pipeline().flush().close();
                return;
            }
            context.fireUserEventTriggered(event);
        }
        catch (Throwable error) {
            RntbdRequestManager.reportIssue(context, "{}: ", event, error);
            this.exceptionCaught(context, error);
        }
    }

    public void bind(ChannelHandlerContext context, SocketAddress localAddress, ChannelPromise promise) {
        this.traceOperation(context, "bind", localAddress);
        context.bind(localAddress, promise);
    }

    public void close(ChannelHandlerContext context, ChannelPromise promise) {
        this.traceOperation(context, "close", new Object[0]);
        if (!this.closingExceptionally) {
            this.completeAllPendingRequestsExceptionally(context, ON_CLOSE);
        } else {
            logger.debug("{} closed exceptionally", (Object)context);
        }
        SslHandler sslHandler = (SslHandler)context.pipeline().get(SslHandler.class);
        if (sslHandler != null) {
            sslHandler.closeOutbound();
        }
        context.close(promise);
    }

    public void connect(ChannelHandlerContext context, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        this.traceOperation(context, "connect", remoteAddress, localAddress);
        context.connect(remoteAddress, localAddress, promise);
    }

    public void deregister(ChannelHandlerContext context, ChannelPromise promise) {
        this.traceOperation(context, "deregister", new Object[0]);
        if (!this.closingExceptionally) {
            this.completeAllPendingRequestsExceptionally(context, ON_DEREGISTER);
        } else {
            logger.debug("{} deregistered exceptionally", (Object)context);
        }
        context.deregister(promise);
    }

    public void disconnect(ChannelHandlerContext context, ChannelPromise promise) {
        this.traceOperation(context, "disconnect", new Object[0]);
        context.disconnect(promise);
    }

    public void flush(ChannelHandlerContext context) {
        this.traceOperation(context, "flush", new Object[0]);
        context.flush();
    }

    public void read(ChannelHandlerContext context) {
        this.traceOperation(context, "read", new Object[0]);
        context.read();
    }

    public void write(ChannelHandlerContext context, Object message, ChannelPromise promise) {
        this.traceOperation(context, "write", message);
        if (message.getClass() == RntbdRequestRecord.class) {
            RntbdRequestRecord record = (RntbdRequestRecord)message;
            this.timestamps.channelWriteAttempted();
            context.write((Object)this.addPendingRequestRecord(context, record), promise).addListener(completed -> {
                record.stage(RntbdRequestRecord.Stage.SENT);
                if (completed.isSuccess()) {
                    this.timestamps.channelWriteCompleted();
                }
            });
            return;
        }
        if (message == RntbdHealthCheckRequest.MESSAGE) {
            context.write((Object)RntbdHealthCheckRequest.MESSAGE, promise).addListener(completed -> {
                if (completed.isSuccess()) {
                    this.timestamps.channelPingCompleted();
                }
            });
            return;
        }
        IllegalStateException error = new IllegalStateException(Strings.lenientFormat("message of %s: %s", message.getClass(), message));
        RntbdRequestManager.reportIssue(context, "", error);
        this.exceptionCaught(context, error);
    }

    int pendingRequestCount() {
        return this.pendingRequests.size();
    }

    Optional<RntbdContext> rntbdContext() {
        return Optional.of(this.contextFuture.getNow(null));
    }

    CompletableFuture<RntbdContextRequest> rntbdContextRequestFuture() {
        return this.contextRequestFuture;
    }

    boolean hasRequestedRntbdContext() {
        return this.contextRequestFuture.getNow(null) != null;
    }

    boolean hasRntbdContext() {
        return this.contextFuture.getNow(null) != null;
    }

    boolean isServiceable(int demand) {
        RntbdRequestManager.reportIssueUnless(this.hasRequestedRntbdContext(), this, "Direct TCP context request was not issued", new Object[0]);
        int limit = this.hasRntbdContext() ? this.pendingRequestLimit : Math.min(this.pendingRequestLimit, demand);
        return this.pendingRequests.size() < limit;
    }

    void pendWrite(ByteBuf out, ChannelPromise promise) {
        this.pendingWrites.add(out, promise);
    }

    RntbdClientChannelHealthChecker.Timestamps snapshotTimestamps() {
        return new RntbdClientChannelHealthChecker.Timestamps(this.timestamps);
    }

    private RntbdRequestRecord addPendingRequestRecord(ChannelHandlerContext context, RntbdRequestRecord record) {
        return this.pendingRequests.compute(record.transportRequestId(), (id, current) -> {
            RntbdRequestManager.reportIssueUnless(current == null, context, "id: {}, current: {}, request: {}", record);
            Timeout pendingRequestTimeout = record.newTimeout(timeout -> {
                EventExecutor executor = context.executor();
                if (executor.inEventLoop()) {
                    record.expire();
                } else {
                    executor.next().execute(record::expire);
                }
            });
            record.whenComplete((response, error) -> {
                this.pendingRequests.remove(id);
                pendingRequestTimeout.cancel();
            });
            return record;
        });
    }

    private void completeAllPendingRequestsExceptionally(ChannelHandlerContext context, Throwable throwable) {
        RntbdRequestManager.reportIssueUnless(!this.closingExceptionally, context, "", throwable);
        this.closingExceptionally = true;
        if (this.pendingWrites != null && !this.pendingWrites.isEmpty()) {
            this.pendingWrites.releaseAndFailAll((ChannelOutboundInvoker)context, throwable);
        }
        if (this.pendingRequests.isEmpty()) {
            return;
        }
        if (!this.contextRequestFuture.isDone()) {
            this.contextRequestFuture.completeExceptionally(throwable);
        }
        if (!this.contextFuture.isDone()) {
            this.contextFuture.completeExceptionally(throwable);
        }
        int count = this.pendingRequests.size();
        Exception contextRequestException = null;
        String phrase = null;
        if (this.contextRequestFuture.isCompletedExceptionally()) {
            try {
                this.contextRequestFuture.get();
            }
            catch (CancellationException error) {
                phrase = "RNTBD context request write cancelled";
                contextRequestException = error;
            }
            catch (Exception error) {
                phrase = "RNTBD context request write failed";
                contextRequestException = error;
            }
            catch (Throwable error) {
                phrase = "RNTBD context request write failed";
                contextRequestException = new ChannelException(error);
            }
        } else if (this.contextFuture.isCompletedExceptionally()) {
            try {
                this.contextFuture.get();
            }
            catch (CancellationException error) {
                phrase = "RNTBD context request read cancelled";
                contextRequestException = error;
            }
            catch (Exception error) {
                phrase = "RNTBD context request read failed";
                contextRequestException = error;
            }
            catch (Throwable error) {
                phrase = "RNTBD context request read failed";
                contextRequestException = new ChannelException(error);
            }
        } else {
            phrase = "closed exceptionally";
        }
        String message = Strings.lenientFormat("%s %s with %s pending requests", context, phrase, count);
        Exception cause = throwable instanceof ClosedChannelException ? (contextRequestException == null ? (ClosedChannelException)throwable : contextRequestException) : (throwable instanceof Exception ? (Exception)throwable : new ChannelException(throwable));
        for (RntbdRequestRecord record : this.pendingRequests.values()) {
            Map<String, String> requestHeaders = record.args().serviceRequest().getHeaders();
            String requestUri = record.args().physicalAddress().toString();
            GoneException error = new GoneException(message, cause, (Map<String, String>)null, requestUri);
            BridgeInternal.setRequestHeaders(error, requestHeaders);
            record.completeExceptionally((Throwable)((Object)error));
        }
    }

    private void messageReceived(ChannelHandlerContext context, RntbdResponse response) {
        Long transportRequestId = response.getTransportRequestId();
        if (transportRequestId == null) {
            RntbdRequestManager.reportIssue(context, "response ignored because its transportRequestId is missing: {}", response);
            return;
        }
        RntbdRequestRecord requestRecord = this.pendingRequests.get(transportRequestId);
        if (requestRecord == null) {
            logger.debug("response {} ignored because its requestRecord is missing: {}", (Object)transportRequestId, (Object)response);
            return;
        }
        requestRecord.responseLength(response.getMessageLength());
        requestRecord.stage(RntbdRequestRecord.Stage.RECEIVED);
        HttpResponseStatus status = response.getStatus();
        UUID activityId = response.getActivityId();
        int statusCode = status.code();
        if (HttpResponseStatus.OK.code() <= statusCode && statusCode < HttpResponseStatus.MULTIPLE_CHOICES.code()) {
            StoreResponse storeResponse = response.toStoreResponse(this.contextFuture.getNow(null));
            requestRecord.complete(storeResponse);
        } else {
            CosmosException cause;
            long lsn = (Long)response.getHeader(RntbdConstants.RntbdResponseHeader.LSN);
            String partitionKeyRangeId = (String)response.getHeader(RntbdConstants.RntbdResponseHeader.PartitionKeyRangeId);
            CosmosError error = response.hasPayload() ? new CosmosError(RntbdObjectMapper.readTree(response)) : new CosmosError(Integer.toString(statusCode), status.reasonPhrase(), status.codeClass().name());
            Map<String, String> responseHeaders = response.getHeaders().asMap(this.rntbdContext().orElseThrow(IllegalStateException::new), activityId);
            block0 : switch (status.code()) {
                case 400: {
                    cause = new BadRequestException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 409: {
                    cause = new ConflictException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 403: {
                    cause = new ForbiddenException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 410: {
                    int subStatusCode = Math.toIntExact((Long)response.getHeader(RntbdConstants.RntbdResponseHeader.SubStatus));
                    switch (subStatusCode) {
                        case 1007: {
                            cause = new PartitionKeyRangeIsSplittingException(error, lsn, partitionKeyRangeId, responseHeaders);
                            break block0;
                        }
                        case 1008: {
                            cause = new PartitionIsMigratingException(error, lsn, partitionKeyRangeId, responseHeaders);
                            break block0;
                        }
                        case 1000: {
                            cause = new InvalidPartitionException(error, lsn, partitionKeyRangeId, responseHeaders);
                            break block0;
                        }
                        case 1002: {
                            cause = new PartitionKeyRangeGoneException(error, lsn, partitionKeyRangeId, responseHeaders);
                            break block0;
                        }
                    }
                    cause = new GoneException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 500: {
                    cause = new InternalServerErrorException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 423: {
                    cause = new LockedException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 405: {
                    cause = new MethodNotAllowedException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 404: {
                    cause = new NotFoundException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 412: {
                    cause = new PreconditionFailedException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 413: {
                    cause = new RequestEntityTooLargeException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 408: {
                    cause = new RequestTimeoutException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 449: {
                    cause = new RetryWithException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 503: {
                    cause = new ServiceUnavailableException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 429: {
                    cause = new RequestRateTooLargeException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 401: {
                    cause = new UnauthorizedException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                default: {
                    cause = BridgeInternal.createCosmosException(status.code(), error, responseHeaders);
                }
            }
            requestRecord.completeExceptionally((Throwable)((Object)cause));
        }
    }

    private void removeContextNegotiatorAndFlushPendingWrites(ChannelHandlerContext context) {
        RntbdContextNegotiator negotiator = (RntbdContextNegotiator)context.pipeline().get(RntbdContextNegotiator.class);
        negotiator.removeInboundHandler();
        negotiator.removeOutboundHandler();
        if (!this.pendingWrites.isEmpty()) {
            this.pendingWrites.writeAndRemoveAll(context);
            context.flush();
        }
    }

    private static void reportIssue(Object subject, String format, Object ... args) {
        RntbdReporter.reportIssue(logger, subject, format, args);
    }

    private static void reportIssueUnless(boolean predicate, Object subject, String format, Object ... args) {
        RntbdReporter.reportIssueUnless(logger, predicate, subject, format, args);
    }

    private void traceOperation(ChannelHandlerContext context, String operationName, Object ... args) {
        logger.debug("{}\n{}\n{}", new Object[]{operationName, context, args});
    }

    private static final class UnhealthyChannelException
    extends ChannelException {
        static final UnhealthyChannelException INSTANCE = new UnhealthyChannelException();

        private UnhealthyChannelException() {
            super("health check failed");
        }

        public Throwable fillInStackTrace() {
            return this;
        }
    }
}

