/*
 * Decompiled with CFR 0.152.
 */
package dev.miku.r2dbc.mysql;

import dev.miku.r2dbc.mysql.InsertSyntheticRow;
import dev.miku.r2dbc.mysql.MySqlRow;
import dev.miku.r2dbc.mysql.MySqlRowMetadata;
import dev.miku.r2dbc.mysql.codec.Codecs;
import dev.miku.r2dbc.mysql.message.FieldValue;
import dev.miku.r2dbc.mysql.message.server.DefinitionMetadataMessage;
import dev.miku.r2dbc.mysql.message.server.EofMessage;
import dev.miku.r2dbc.mysql.message.server.OkMessage;
import dev.miku.r2dbc.mysql.message.server.RowMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.message.server.SyntheticMetadataMessage;
import dev.miku.r2dbc.mysql.util.AssertUtils;
import dev.miku.r2dbc.mysql.util.ConnectionContext;
import dev.miku.r2dbc.mysql.util.OperatorUtils;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SynchronousSink;
import reactor.util.annotation.Nullable;

public final class MySqlResult
implements Result {
    private static final Function<OkMessage, Integer> ROWS_UPDATED = message -> (int)message.getAffectedRows();
    private static final Consumer<ReferenceCounted> RELEASE = ReferenceCounted::release;
    private final boolean isBinary;
    private final Codecs codecs;
    private final ConnectionContext context;
    @Nullable
    private final String generatedKeyName;
    private final AtomicReference<Flux<ServerMessage>> messages;
    private final MonoProcessor<OkMessage> okProcessor = MonoProcessor.create();
    private MySqlRowMetadata rowMetadata;

    MySqlResult(boolean isBinary, Codecs codecs, ConnectionContext context, @Nullable String generatedKeyName, Flux<ServerMessage> messages) {
        this.isBinary = isBinary;
        this.codecs = AssertUtils.requireNonNull(codecs, "codecs must not be null");
        this.context = AssertUtils.requireNonNull(context, "context must not be null");
        this.generatedKeyName = generatedKeyName;
        this.messages = new AtomicReference<Flux<ServerMessage>>(AssertUtils.requireNonNull(messages, "messages must not be null"));
    }

    public Mono<Integer> getRowsUpdated() {
        return this.affects().map(ROWS_UPDATED);
    }

    public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
        AssertUtils.requireNonNull(f, "mapping function must not be null");
        if (this.generatedKeyName == null) {
            return this.results().handle((message, sink) -> this.handleResult((ServerMessage)message, (SynchronousSink)sink, f));
        }
        return this.affects().map(message -> {
            InsertSyntheticRow row = new InsertSyntheticRow(this.codecs, this.generatedKeyName, message.getLastInsertId());
            return f.apply(row, row);
        });
    }

    private Mono<OkMessage> affects() {
        return this.okProcessor.doOnSubscribe(s -> {
            Flux messages = this.messages.getAndSet(null);
            if (messages == null) {
                return;
            }
            messages.subscribe(message -> {
                if (message instanceof OkMessage) {
                    this.okProcessor.onNext((Object)((OkMessage)message));
                } else if (message instanceof EofMessage) {
                    this.okProcessor.onComplete();
                } else {
                    ReferenceCountUtil.safeRelease((Object)message);
                }
            }, arg_0 -> this.okProcessor.onError(arg_0), () -> this.okProcessor.onComplete());
        });
    }

    private Flux<ServerMessage> results() {
        return Flux.defer(() -> {
            Flux messages = this.messages.getAndSet(null);
            if (messages == null) {
                return Flux.error((Throwable)new IllegalStateException("Source has been released"));
            }
            this.okProcessor.onComplete();
            return OperatorUtils.discardOnCancel(messages).doOnDiscard(ReferenceCounted.class, RELEASE);
        });
    }

    private <T> void handleResult(ServerMessage message, SynchronousSink<T> sink, BiFunction<Row, RowMetadata, ? extends T> f) {
        if (message instanceof SyntheticMetadataMessage) {
            DefinitionMetadataMessage[] metadataMessages = ((SyntheticMetadataMessage)message).unwrap();
            if (metadataMessages.length == 0) {
                return;
            }
            this.rowMetadata = MySqlRowMetadata.create(metadataMessages);
        } else if (message instanceof RowMessage) {
            this.processRow((RowMessage)message, sink, f);
        } else {
            ReferenceCountUtil.safeRelease((Object)message);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void processRow(RowMessage message, SynchronousSink<T> sink, BiFunction<Row, RowMetadata, ? extends T> f) {
        T t;
        FieldValue[] fields;
        MySqlRowMetadata rowMetadata = this.rowMetadata;
        if (rowMetadata == null) {
            ReferenceCountUtil.safeRelease((Object)message);
            sink.error((Throwable)new IllegalStateException("No MySqlRowMetadata available"));
            return;
        }
        try {
            fields = message.decode(this.isBinary, rowMetadata.unwrap());
        }
        finally {
            ReferenceCountUtil.safeRelease((Object)message);
        }
        try {
            t = f.apply(new MySqlRow(fields, rowMetadata, this.codecs, this.isBinary, this.context), rowMetadata);
        }
        finally {
            for (FieldValue field : fields) {
                ReferenceCountUtil.safeRelease((Object)field);
            }
        }
        sink.next(t);
    }
}

