/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.mssql;

import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.mssql.Binding;
import io.r2dbc.mssql.PreparedStatementCache;
import io.r2dbc.mssql.QueryLogger;
import io.r2dbc.mssql.client.Client;
import io.r2dbc.mssql.codec.Codecs;
import io.r2dbc.mssql.codec.RpcDirection;
import io.r2dbc.mssql.message.ClientMessage;
import io.r2dbc.mssql.message.Message;
import io.r2dbc.mssql.message.TransactionDescriptor;
import io.r2dbc.mssql.message.token.AbstractDoneToken;
import io.r2dbc.mssql.message.token.AbstractInfoToken;
import io.r2dbc.mssql.message.token.ColumnMetadataToken;
import io.r2dbc.mssql.message.token.DoneInProcToken;
import io.r2dbc.mssql.message.token.DoneProcToken;
import io.r2dbc.mssql.message.token.ErrorToken;
import io.r2dbc.mssql.message.token.ReturnValue;
import io.r2dbc.mssql.message.token.RowToken;
import io.r2dbc.mssql.message.token.RpcRequest;
import io.r2dbc.mssql.message.type.Collation;
import io.r2dbc.mssql.util.Assert;
import io.r2dbc.mssql.util.Operators;
import io.r2dbc.mssql.util.PredicateUtils;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.processing.Completion;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.util.Logger;
import reactor.util.Loggers;

final class RpcQueryMessageFlow {
    private static final Predicate<Message> FILTER_PREDICATE;
    private static final Logger logger;
    static final RpcRequest.OptionFlags NO_METADATA;
    static final int FETCH_FIRST = 1;
    static final int FETCH_NEXT = 2;
    static final int FETCH_PREV = 4;
    static final int FETCH_LAST = 8;
    static final int FETCH_ABSOLUTE = 16;
    static final int FETCH_RELATIVE = 32;
    static final int FETCH_REFRESH = 128;
    static final int FETCH_INFO = 256;
    static final int FETCH_PREV_NOADJUST = 512;
    static final int SCROLLOPT_KEYSET = 1;
    static final int SCROLLOPT_DYNAMIC = 2;
    static final int SCROLLOPT_FORWARD_ONLY = 4;
    static final int SCROLLOPT_STATIC = 8;
    static final int SCROLLOPT_FAST_FORWARD = 16;
    static final int SCROLLOPT_PARAMETERIZED_STMT = 4096;
    static final int CCOPT_READ_ONLY = 1;
    static final int CCOPT_ALLOW_DIRECT = 8192;

    RpcQueryMessageFlow() {
    }

    static Flux<Message> exchange(Client client, String query, Binding binding) {
        Assert.requireNonNull(client, "Client must not be null");
        Assert.requireNonNull(query, "Query must not be null");
        CursorState state = new CursorState();
        state.directMode = true;
        Flux<Message> exchange = client.exchange((Publisher<? extends ClientMessage>)Mono.fromSupplier(() -> RpcQueryMessageFlow.spExecuteSql(query, binding, client.getRequiredCollation(), client.getTransactionDescriptor())), AbstractDoneToken::isDone);
        OnCursorComplete cursorComplete = new OnCursorComplete();
        Flux messages = exchange.handle((message, sink) -> {
            state.update((Message)message);
            RpcQueryMessageFlow.handleMessage(client, 0, state, message, (SynchronousSink<Message>)sink, cursorComplete, true);
        }).filter(FILTER_PREDICATE).doOnCancel((Runnable)cursorComplete);
        return messages.doOnSubscribe(subscription -> QueryLogger.logQuery(client.getContext(), query)).transform(it -> Operators.discardOnCancel(it, state::cancel).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)).takeUntilOther(cursorComplete.takeUntil());
    }

    static Flux<Message> exchange(Client client, Codecs codecs, String query, int fetchSize) {
        Assert.requireNonNull(client, "Client must not be null");
        Assert.requireNonNull(query, "Query must not be null");
        Sinks.Many outbound = Sinks.many().unicast().onBackpressureBuffer();
        CursorState state = new CursorState();
        Flux<Message> exchange = client.exchange((Publisher<? extends ClientMessage>)Flux.defer(() -> {
            outbound.emitNext((Object)RpcQueryMessageFlow.spCursorOpen(query, client.getRequiredCollation(), client.getTransactionDescriptor()), Sinks.EmitFailureHandler.FAIL_FAST);
            return outbound.asFlux();
        }), RpcQueryMessageFlow.isFinalToken(state));
        OnCursorComplete cursorComplete = new OnCursorComplete();
        Flux messages = exchange.handle((message, sink) -> {
            boolean emit = true;
            if (message.getClass() == ReturnValue.class) {
                ReturnValue returnValue = (ReturnValue)message;
                if (returnValue.getOrdinal() == 0) {
                    state.cursorId = RpcQueryMessageFlow.parseCursorId(codecs, state, returnValue);
                }
                if (returnValue.getOrdinal() < 5) {
                    returnValue.release();
                    emit = false;
                }
            }
            state.update((Message)message);
            RpcQueryMessageFlow.handleMessage(client, fetchSize, (Sinks.Many<ClientMessage>)outbound, state, message, (SynchronousSink<Message>)sink, (Runnable)cursorComplete, emit);
        }).filter(FILTER_PREDICATE);
        return messages.doOnSubscribe(subscription -> QueryLogger.logQuery(client.getContext(), query)).transform(it -> Operators.discardOnCancel(it, state::cancel).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)).takeUntilOther(cursorComplete.takeUntil());
    }

    static Flux<Message> exchange(PreparedStatementCache statementCache, Client client, Codecs codecs, String query, Binding binding, int fetchSize) {
        Flux messageProducer;
        Assert.requireNonNull(client, "Client must not be null");
        Assert.requireNonNull(query, "Query must not be null");
        Sinks.Many outbound = Sinks.many().unicast().onBackpressureBuffer();
        int handle = statementCache.getHandle(query, binding);
        AtomicBoolean retryReprepare = new AtomicBoolean(true);
        AtomicBoolean needsPrepare = new AtomicBoolean(false);
        if (handle == 0) {
            messageProducer = Flux.defer(() -> {
                outbound.emitNext((Object)RpcQueryMessageFlow.spCursorPrepExec(0, query, binding, client.getRequiredCollation(), client.getTransactionDescriptor()), Sinks.EmitFailureHandler.FAIL_FAST);
                return outbound.asFlux();
            });
            needsPrepare.set(true);
        } else {
            messageProducer = Flux.defer(() -> {
                outbound.emitNext((Object)RpcQueryMessageFlow.spCursorExec(handle, binding, client.getTransactionDescriptor()), Sinks.EmitFailureHandler.FAIL_FAST);
                return outbound.asFlux();
            });
            needsPrepare.set(false);
        }
        CursorState state = new CursorState();
        Flux<Message> exchange = client.exchange((Publisher<? extends ClientMessage>)messageProducer, RpcQueryMessageFlow.isFinalToken(state));
        OnCursorComplete cursorComplete = new OnCursorComplete();
        Flux messages = exchange.handle((message, sink) -> {
            boolean emit = true;
            if (message.getClass() == ReturnValue.class) {
                ReturnValue returnValue = (ReturnValue)message;
                emit = RpcQueryMessageFlow.handleSpCursorReturnValue(statementCache, codecs, query, binding, state, needsPrepare.get(), returnValue);
                if (!emit) {
                    returnValue.release();
                }
            }
            state.update((Message)message);
            if (message instanceof ErrorToken && RpcQueryMessageFlow.isPreparedStatementNotFound(((ErrorToken)message).getNumber()) && retryReprepare.compareAndSet(true, false)) {
                logger.debug("Prepared statement no longer valid: {}", new Object[]{handle});
                state.update(CursorState.Phase.PREPARE_RETRY);
            }
            if (state.phase == CursorState.Phase.PREPARE_RETRY) {
                emit = false;
            }
            if (DoneProcToken.isDone(message) && state.phase == CursorState.Phase.PREPARE_RETRY) {
                logger.debug("Attempting to re-prepare statement: {}", new Object[]{query});
                needsPrepare.set(true);
                state.update(CursorState.Phase.NONE);
                outbound.emitNext((Object)RpcQueryMessageFlow.spCursorPrepExec(0, query, binding, client.getRequiredCollation(), client.getTransactionDescriptor()), Sinks.EmitFailureHandler.FAIL_FAST);
                return;
            }
            RpcQueryMessageFlow.handleMessage(client, fetchSize, (Sinks.Many<ClientMessage>)outbound, state, message, (SynchronousSink<Message>)sink, (Runnable)cursorComplete, emit);
        }).filter(FILTER_PREDICATE);
        return messages.doOnSubscribe(subscription -> QueryLogger.logQuery(client.getContext(), query)).transform(it -> Operators.discardOnCancel(it, state::cancel).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)).takeUntilOther(cursorComplete.takeUntil());
    }

    private static boolean isPreparedStatementNotFound(long errorNumber) {
        return errorNumber == 8179L || errorNumber == 586L;
    }

    private static boolean handleSpCursorReturnValue(PreparedStatementCache statementCache, Codecs codecs, String query, Binding binding, CursorState state, boolean needsPrepare, ReturnValue returnValue) {
        if (returnValue.getOrdinal() == 1) {
            state.cursorId = RpcQueryMessageFlow.parseCursorId(codecs, state, returnValue);
        }
        if (needsPrepare) {
            if (returnValue.getOrdinal() == 0) {
                int preparedStatementHandle = codecs.decode(returnValue.getValue(), returnValue.asDecodable(), Integer.class);
                logger.debug("Prepared statement with handle: {}", new Object[]{preparedStatementHandle});
                statementCache.putHandle(preparedStatementHandle, query, binding);
            }
            return returnValue.getOrdinal() >= 7;
        }
        return returnValue.getOrdinal() >= 5;
    }

    private static int parseCursorId(Codecs codecs, CursorState state, ReturnValue returnValue) {
        Integer cursorId = codecs.decode(returnValue.getValue(), returnValue.asDecodable(), Integer.class);
        logger.debug("CursorId: {}", new Object[]{cursorId});
        return cursorId;
    }

    private static void handleMessage(Client client, int fetchSize, CursorState state, Message message, SynchronousSink<Message> sink, Runnable onCursorComplete, boolean emit) {
        RpcQueryMessageFlow.handleMessage(client, fetchSize, (ClientMessage it) -> {
            throw new UnsupportedOperationException("Cannot accept subsequent messages");
        }, state, message, sink, onCursorComplete, emit);
    }

    private static void handleMessage(Client client, int fetchSize, Sinks.Many<ClientMessage> requests, CursorState state, Message message, SynchronousSink<Message> sink, Runnable onCursorComplete, boolean emit) {
        RpcQueryMessageFlow.handleMessage(client, fetchSize, (ClientMessage t) -> requests.emitNext(t, Sinks.EmitFailureHandler.FAIL_FAST), state, message, sink, onCursorComplete, emit);
    }

    private static void handleMessage(Client client, int fetchSize, Consumer<ClientMessage> requests, CursorState state, Message message, SynchronousSink<Message> sink, Runnable onCursorComplete, boolean emit) {
        if (message instanceof ColumnMetadataToken && !((ColumnMetadataToken)message).hasColumns()) {
            return;
        }
        if (message instanceof AbstractInfoToken && ((AbstractInfoToken)message).getNumber() == 16954L) {
            state.directMode = true;
        }
        if (message instanceof DoneInProcToken) {
            DoneInProcToken doneToken = (DoneInProcToken)message;
            state.hasMore = doneToken.hasMore();
            if (!state.directMode) {
                if (state.phase == CursorState.Phase.FETCHING && doneToken.hasCount()) {
                    sink.next((Object)new IntermediateCount(doneToken));
                }
                return;
            }
            sink.next((Object)doneToken);
            return;
        }
        if (AbstractDoneToken.isAttentionAck(message)) {
            state.update(CursorState.Phase.CLOSED);
            sink.next((Object)message);
            return;
        }
        if (!(message instanceof DoneProcToken)) {
            if (emit) {
                sink.next((Object)message);
            }
            return;
        }
        if (state.hasSeenError) {
            state.update(CursorState.Phase.ERROR);
        }
        if (DoneProcToken.isDone(message)) {
            RpcQueryMessageFlow.onDone(client, fetchSize, requests, state, onCursorComplete);
        }
    }

    static void onDone(Client client, int fetchSize, Consumer<ClientMessage> requests, CursorState state, Runnable completion) {
        CursorState.Phase phase = state.phase;
        if (RpcQueryMessageFlow.isFinalState(state)) {
            completion.run();
            state.update(CursorState.Phase.CLOSED);
            return;
        }
        if (phase == CursorState.Phase.NONE || phase == CursorState.Phase.FETCHING) {
            if ((state.hasMore && phase == CursorState.Phase.NONE || state.hasSeenRows) && state.wantsMore()) {
                if (phase == CursorState.Phase.NONE) {
                    state.update(CursorState.Phase.FETCHING);
                }
                requests.accept(RpcQueryMessageFlow.spCursorFetch(state.cursorId, 2, fetchSize, client.getTransactionDescriptor()));
            } else {
                state.update(CursorState.Phase.CLOSING);
                requests.accept(RpcQueryMessageFlow.spCursorClose(state.cursorId, client.getTransactionDescriptor()));
            }
            state.hasSeenRows = false;
        }
    }

    private static Predicate<Message> isFinalToken(CursorState state) {
        return message -> {
            if (!DoneProcToken.isDone(message)) {
                return false;
            }
            return RpcQueryMessageFlow.isFinalState(state);
        };
    }

    private static boolean isFinalState(CursorState state) {
        CursorState.Phase phase = state.phase;
        if ((phase == CursorState.Phase.NONE || phase == CursorState.Phase.FETCHING) && state.cursorId == 0) {
            return true;
        }
        return phase == CursorState.Phase.ERROR || phase == CursorState.Phase.CLOSING || phase == CursorState.Phase.CLOSED;
    }

    static RpcRequest spExecuteSql(String query, Binding binding, Collation collation, TransactionDescriptor transactionDescriptor) {
        Assert.requireNonNull(query, "Query must not be null");
        Assert.requireNonNull(collation, "Collation must not be null");
        Assert.requireNonNull(transactionDescriptor, "TransactionDescriptor must not be null");
        RpcRequest.Builder builder = RpcRequest.builder().withProcId(10).withTransactionDescriptor(transactionDescriptor).withParameter(RpcDirection.IN, collation, query).withParameter(RpcDirection.IN, collation, binding.getFormalParameters());
        binding.forEach((name, parameter) -> builder.withNamedParameter(parameter.rpcDirection, (String)name, parameter.encoded));
        return builder.build();
    }

    static RpcRequest spCursorOpen(String query, Collation collation, TransactionDescriptor transactionDescriptor) {
        Assert.requireNonNull(query, "Query must not be null");
        Assert.requireNonNull(collation, "Collation must not be null");
        Assert.requireNonNull(transactionDescriptor, "TransactionDescriptor must not be null");
        int resultSetScrollOpt = 4;
        int resultSetCCOpt = 8193;
        return RpcRequest.builder().withProcId(2).withTransactionDescriptor(transactionDescriptor).withParameter(RpcDirection.OUT, 0).withParameter(RpcDirection.IN, collation, query).withParameter(RpcDirection.IN, resultSetScrollOpt).withParameter(RpcDirection.IN, resultSetCCOpt).withParameter(RpcDirection.OUT, 0).build();
    }

    static RpcRequest spCursorFetch(int cursor, int fetchType, int rowCount, TransactionDescriptor transactionDescriptor) {
        Assert.isTrue(rowCount >= 0, "Row count must be greater or equal to zero");
        Assert.requireNonNull(transactionDescriptor, "TransactionDescriptor must not be null");
        return RpcRequest.builder().withProcId(7).withTransactionDescriptor(transactionDescriptor).withOptionFlags(NO_METADATA).withParameter(RpcDirection.IN, cursor).withParameter(RpcDirection.IN, fetchType).withParameter(RpcDirection.IN, 0).withParameter(RpcDirection.IN, rowCount).build();
    }

    static RpcRequest spCursorClose(int cursor, TransactionDescriptor transactionDescriptor) {
        Assert.requireNonNull(transactionDescriptor, "TransactionDescriptor must not be null");
        return RpcRequest.builder().withProcId(9).withTransactionDescriptor(transactionDescriptor).withParameter(RpcDirection.IN, cursor).build();
    }

    static RpcRequest spCursorPrepExec(int preparedStatementHandle, String query, Binding binding, Collation collation, TransactionDescriptor transactionDescriptor) {
        int resultSetScrollOpt = 4 | (binding.isEmpty() ? 0 : 4096);
        int resultSetCCOpt = 8193;
        RpcRequest.Builder builder = RpcRequest.builder().withProcId(5).withTransactionDescriptor(transactionDescriptor).withParameter(RpcDirection.OUT, preparedStatementHandle).withParameter(RpcDirection.OUT, 0).withParameter(RpcDirection.IN, collation, binding.getFormalParameters()).withParameter(RpcDirection.IN, collation, query).withParameter(RpcDirection.IN, resultSetScrollOpt).withParameter(RpcDirection.IN, resultSetCCOpt).withParameter(RpcDirection.OUT, 0);
        binding.forEach((name, parameter) -> builder.withNamedParameter(parameter.rpcDirection, (String)name, parameter.encoded));
        return builder.build();
    }

    static RpcRequest spCursorExec(int preparedStatementHandle, Binding binding, TransactionDescriptor transactionDescriptor) {
        Assert.isTrue(preparedStatementHandle != 0, "Invalid PreparedStatement handle");
        int resultSetScrollOpt = 4;
        int resultSetCCOpt = 8193;
        RpcRequest.Builder builder = RpcRequest.builder().withProcId(4).withTransactionDescriptor(transactionDescriptor).withParameter(RpcDirection.IN, preparedStatementHandle).withParameter(RpcDirection.OUT, 0).withParameter(RpcDirection.IN, resultSetScrollOpt).withParameter(RpcDirection.IN, resultSetCCOpt).withParameter(RpcDirection.OUT, 0);
        binding.forEach((name, parameter) -> builder.withNamedParameter(parameter.rpcDirection, (String)name, parameter.encoded));
        return builder.build();
    }

    static {
        Predicate[] predicateArray = new Predicate[8];
        predicateArray[0] = RowToken.class::isInstance;
        predicateArray[1] = ColumnMetadataToken.class::isInstance;
        predicateArray[2] = ReturnValue.class::isInstance;
        predicateArray[3] = DoneInProcToken.class::isInstance;
        predicateArray[4] = IntermediateCount.class::isInstance;
        predicateArray[5] = AbstractInfoToken.class::isInstance;
        predicateArray[6] = Completion.class::isInstance;
        predicateArray[7] = AbstractDoneToken::isAttentionAck;
        FILTER_PREDICATE = PredicateUtils.or(predicateArray);
        logger = Loggers.getLogger(RpcQueryMessageFlow.class);
        NO_METADATA = RpcRequest.OptionFlags.empty().disableMetadata();
    }

    static class CursorState {
        volatile int cursorId;
        volatile boolean hasMore;
        volatile boolean hasSeenRows;
        volatile boolean hasSeenError;
        volatile boolean directMode;
        volatile boolean cancelRequested;
        volatile ErrorToken errorToken;
        Phase phase = Phase.NONE;

        CursorState() {
        }

        boolean wantsMore() {
            return !this.cancelRequested;
        }

        void cancel() {
            this.cancelRequested = true;
        }

        void update(Message it) {
            if (it instanceof RowToken) {
                this.hasSeenRows = true;
            }
            if (it instanceof ErrorToken) {
                this.errorToken = (ErrorToken)it;
                this.hasSeenError = true;
            }
        }

        public void update(Phase newPhase) {
            this.phase = newPhase;
            if (newPhase == Phase.PREPARE_RETRY) {
                this.errorToken = null;
                this.hasSeenError = false;
            }
        }

        static enum Phase {
            NONE,
            FETCHING,
            PREPARE_RETRY,
            CLOSING,
            CLOSED,
            ERROR;

        }
    }

    static class OnCursorComplete
    implements Runnable {
        private static final int STATE_ACTIVE = 0;
        private static final int STATE_CANCELLED = 1;
        private static final AtomicIntegerFieldUpdater<OnCursorComplete> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(OnCursorComplete.class, "state");
        private final Sinks.Empty<Void> trigger = Sinks.empty();
        private volatile int state = 0;

        OnCursorComplete() {
        }

        @Override
        public void run() {
            if (STATE_UPDATER.compareAndSet(this, 0, 1)) {
                this.trigger.tryEmitEmpty();
            }
        }

        public Publisher<Void> takeUntil() {
            return this.trigger.asMono();
        }
    }

    static class IntermediateCount
    extends AbstractDoneToken {
        public IntermediateCount(DoneInProcToken token) {
            super(token.getType(), token.getStatus(), token.getCurrentCommand(), token.getRowCount());
        }

        @Override
        public String getName() {
            return "INTERMEDIATE_COUNT";
        }
    }
}

