/*
 * Decompiled with CFR 0.152.
 */
package dev.miku.r2dbc.mysql;

import dev.miku.r2dbc.mysql.Binding;
import dev.miku.r2dbc.mysql.ExceptionFactory;
import dev.miku.r2dbc.mysql.PreparedIdentifier;
import dev.miku.r2dbc.mysql.client.Client;
import dev.miku.r2dbc.mysql.message.client.ExchangeableMessage;
import dev.miku.r2dbc.mysql.message.client.PrepareQueryMessage;
import dev.miku.r2dbc.mysql.message.client.PreparedCloseMessage;
import dev.miku.r2dbc.mysql.message.client.PreparedExecuteMessage;
import dev.miku.r2dbc.mysql.message.client.PreparedFetchMessage;
import dev.miku.r2dbc.mysql.message.client.SimpleQueryMessage;
import dev.miku.r2dbc.mysql.message.server.CompleteMessage;
import dev.miku.r2dbc.mysql.message.server.ErrorMessage;
import dev.miku.r2dbc.mysql.message.server.PreparedOkMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.message.server.ServerStatusMessage;
import dev.miku.r2dbc.mysql.message.server.SyntheticMetadataMessage;
import dev.miku.r2dbc.mysql.util.ConnectionContext;
import dev.miku.r2dbc.mysql.util.InternalArrays;
import dev.miku.r2dbc.mysql.util.OperatorUtils;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

final class QueryFlow {
    private static final Predicate<ServerMessage> RESULT_DONE = message -> message instanceof CompleteMessage;
    private static final Predicate<ServerMessage> PREPARE_DONE = message -> message instanceof ErrorMessage || message instanceof SyntheticMetadataMessage && ((SyntheticMetadataMessage)message).isCompleted();
    private static final Predicate<ServerMessage> METADATA_DONE = message -> message instanceof ErrorMessage || message instanceof SyntheticMetadataMessage || message instanceof CompleteMessage && ((CompleteMessage)message).isDone();
    private static final Predicate<ServerMessage> FETCH_DONE = message -> message instanceof ErrorMessage || message instanceof CompleteMessage && ((CompleteMessage)message).isDone();
    private static final Consumer<ReferenceCounted> RELEASE = ReferenceCounted::release;
    private static final Consumer<Object> SAFE_RELEASE = ReferenceCountUtil::safeRelease;

    static Mono<PreparedIdentifier> prepare(Client client, String sql) {
        return OperatorUtils.discardOnCancel(client.exchange(new PrepareQueryMessage(sql), PREPARE_DONE)).doOnDiscard(PreparedOkMessage.class, prepared -> QueryFlow.close(client, prepared.getStatementId()).subscribe()).handle((message, sink) -> {
            if (message instanceof ErrorMessage) {
                sink.error((Throwable)ExceptionFactory.createException((ErrorMessage)message, sql));
            } else if (message instanceof SyntheticMetadataMessage) {
                if (((SyntheticMetadataMessage)message).isCompleted()) {
                    sink.complete();
                }
            } else if (message instanceof PreparedOkMessage) {
                sink.next((Object)new PreparedIdentifier(client, ((PreparedOkMessage)message).getStatementId()));
            } else {
                ReferenceCountUtil.release((Object)message);
            }
        }).last();
    }

    static Flux<Flux<ServerMessage>> execute(Client client, ConnectionContext context, String sql, PreparedIdentifier identifier, boolean deprecateEof, int fetchSize, List<Binding> bindings) {
        switch (bindings.size()) {
            case 1: {
                return Flux.defer(() -> QueryFlow.execute0(client, context, sql, identifier, deprecateEof, (Binding)bindings.get(0), fetchSize).windowUntil(RESULT_DONE));
            }
            case 0: {
                return Flux.empty();
            }
        }
        Iterator<Binding> iterator = bindings.iterator();
        EmitterProcessor processor = EmitterProcessor.create((int)1, (boolean)true);
        Runnable complete = () -> {
            if (processor.isCancelled() || processor.isTerminated()) {
                return;
            }
            try {
                if (iterator.hasNext()) {
                    if (identifier.isClosed()) {
                        Binding.clearSubsequent(iterator);
                        processor.onComplete();
                    } else {
                        processor.onNext(iterator.next());
                    }
                } else {
                    processor.onComplete();
                }
            }
            catch (Throwable e) {
                processor.onError(e);
            }
        };
        processor.onNext((Object)iterator.next());
        return processor.concatMap(it -> QueryFlow.execute0(client, context, sql, identifier, deprecateEof, it, fetchSize).doOnComplete(complete)).doOnCancel(() -> Binding.clearSubsequent(iterator)).doOnError(ignored -> Binding.clearSubsequent(iterator)).windowUntil(RESULT_DONE);
    }

    static Mono<Void> close(Client client, int statementId) {
        return client.sendOnly(new PreparedCloseMessage(statementId));
    }

    static Mono<Void> executeVoid(Client client, String sql) {
        return Mono.defer(() -> QueryFlow.execute0(client, sql).doOnNext(SAFE_RELEASE).then());
    }

    static Mono<Void> executeVoid(Client client, String ... statements) {
        return QueryFlow.selfEmitter(InternalArrays.asReadOnlyList(statements), client).doOnNext(SAFE_RELEASE).then();
    }

    static Flux<Flux<ServerMessage>> execute(Client client, String sql) {
        return Flux.defer(() -> QueryFlow.execute0(client, sql).windowUntil(RESULT_DONE));
    }

    static Flux<Flux<ServerMessage>> execute(Client client, List<String> statements) {
        return Flux.defer(() -> {
            switch (statements.size()) {
                case 0: {
                    return Flux.empty();
                }
                case 1: {
                    return QueryFlow.execute0(client, (String)statements.get(0)).windowUntil(RESULT_DONE);
                }
            }
            return QueryFlow.selfEmitter(statements, client).windowUntil(RESULT_DONE);
        });
    }

    private static Flux<ServerMessage> execute0(Client client, String sql) {
        return OperatorUtils.discardOnCancel(client.exchange(new SimpleQueryMessage(sql), FETCH_DONE)).doOnDiscard(ReferenceCounted.class, RELEASE).handle((BiConsumer)new Handler(sql));
    }

    private static Flux<ServerMessage> execute0(Client client, ConnectionContext context, String sql, PreparedIdentifier identifier, boolean deprecateEof, Binding binding, int fetchSize) {
        if (fetchSize > 0) {
            int statementId = identifier.getId();
            PreparedExecuteMessage cursor = binding.toMessage(statementId, false);
            return OperatorUtils.discardOnCancel(client.exchange(cursor, deprecateEof ? FETCH_DONE : METADATA_DONE)).doOnDiscard(ReferenceCounted.class, RELEASE).handle((BiConsumer)new TakeOne(sql)).concatWith((Publisher)Flux.defer(() -> QueryFlow.fetch(client, context, identifier, new PreparedFetchMessage(statementId, fetchSize), sql)));
        }
        return OperatorUtils.discardOnCancel(client.exchange(binding.toMessage(identifier.getId(), true), FETCH_DONE)).doOnDiscard(ReferenceCounted.class, RELEASE).handle((BiConsumer)new Handler(sql));
    }

    private static Flux<ServerMessage> fetch(Client client, ConnectionContext context, PreparedIdentifier identifier, PreparedFetchMessage fetch, String sql) {
        if ((context.getServerStatuses() & 0x40) == 0) {
            return Flux.empty();
        }
        EmitterProcessor processor = EmitterProcessor.create((int)1, (boolean)false);
        processor.onNext((Object)fetch);
        return processor.concatMap(request -> OperatorUtils.discardOnCancel(client.exchange((ExchangeableMessage)request, FETCH_DONE)).doOnDiscard(ReferenceCounted.class, RELEASE).handle((message, sink) -> {
            if (message instanceof ErrorMessage) {
                sink.error((Throwable)ExceptionFactory.createException((ErrorMessage)message, sql));
                processor.onComplete();
            } else if (message instanceof ServerStatusMessage) {
                short statuses = ((ServerStatusMessage)message).getServerStatuses();
                if ((statuses & 0x80) == 0) {
                    if (processor.isCancelled() || processor.isTerminated()) {
                        return;
                    }
                    if (identifier.isClosed()) {
                        processor.onComplete();
                    } else {
                        processor.onNext((Object)fetch);
                    }
                } else {
                    sink.next(message);
                    processor.onComplete();
                }
            } else {
                sink.next(message);
            }
        }));
    }

    private static Flux<ServerMessage> selfEmitter(List<String> statements, Client client) {
        if (statements.isEmpty()) {
            return Flux.empty();
        }
        Iterator<String> iter = statements.iterator();
        EmitterProcessor processor = EmitterProcessor.create((int)1, (boolean)true);
        Runnable complete = () -> OperatorUtils.emitIterator(processor, iter);
        processor.onNext((Object)iter.next());
        return processor.concatMap(it -> QueryFlow.execute0(client, it).doOnComplete(complete));
    }

    private QueryFlow() {
    }

    private static final class Handler
    implements BiConsumer<ServerMessage, SynchronousSink<ServerMessage>> {
        private final String sql;

        private Handler(String sql) {
            this.sql = sql;
        }

        @Override
        public void accept(ServerMessage message, SynchronousSink<ServerMessage> sink) {
            if (message instanceof ErrorMessage) {
                sink.error((Throwable)ExceptionFactory.createException((ErrorMessage)message, this.sql));
            } else {
                sink.next((Object)message);
            }
        }
    }

    private static final class TakeOne
    implements BiConsumer<ServerMessage, SynchronousSink<ServerMessage>> {
        private final String sql;
        private boolean next;

        private TakeOne(String sql) {
            this.sql = sql;
        }

        @Override
        public void accept(ServerMessage message, SynchronousSink<ServerMessage> sink) {
            if (this.next) {
                return;
            }
            this.next = true;
            if (message instanceof ErrorMessage) {
                sink.error((Throwable)ExceptionFactory.createException((ErrorMessage)message, this.sql));
            } else {
                sink.next((Object)message);
            }
        }
    }
}

