/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.data.connection.support;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.propagation.ReactorPropagation;
import io.micronaut.core.propagation.PropagatedContextElement;
import io.micronaut.data.connection.ConnectionDefinition;
import io.micronaut.data.connection.ConnectionStatus;
import io.micronaut.data.connection.exceptions.NoConnectionException;
import io.micronaut.data.connection.reactive.ReactiveStreamsConnectionOperations;
import io.micronaut.data.connection.reactive.ReactorConnectionOperations;
import io.micronaut.data.connection.support.DefaultConnectionStatus;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

@Internal
public abstract class AbstractReactorConnectionOperations<C>
implements ReactorConnectionOperations<C> {
    @NonNull
    protected abstract Publisher<C> openConnection(@NonNull ConnectionDefinition var1);

    @NonNull
    protected abstract Publisher<Void> closeConnection(@NonNull C var1, @NonNull ConnectionDefinition var2);

    @Override
    public final Optional<ConnectionStatus<C>> findConnectionStatus(@NonNull ContextView contextView) {
        return this.findPropagateContextElement(contextView).map(e -> e.status);
    }

    private Optional<ClientSessionPropagatedContext> findPropagateContextElement(ContextView contextView) {
        return ReactorPropagation.findAllContextElements((ContextView)contextView, ClientSessionPropagatedContext.class).filter(e -> e.connectionOperations == this).findFirst();
    }

    @Override
    @NonNull
    public <T> Flux<T> withConnectionFlux(@NonNull ConnectionDefinition definition, @NonNull Function<ConnectionStatus<C>, Flux<T>> callback) {
        Objects.requireNonNull(callback, "Callback cannot be null");
        return Flux.deferContextual(contextView -> {
            C connection = this.findConnection((ContextView)contextView);
            if (connection != null) {
                return switch (definition.getPropagationBehavior()) {
                    default -> throw new IncompatibleClassChangeError();
                    case ConnectionDefinition.Propagation.REQUIRED, ConnectionDefinition.Propagation.MANDATORY -> this.existingConnectionFlux(definition, callback, connection);
                    case ConnectionDefinition.Propagation.REQUIRES_NEW -> this.openConnectionFlux(definition, callback);
                };
            }
            switch (definition.getPropagationBehavior()) {
                default: {
                    throw new IncompatibleClassChangeError();
                }
                case REQUIRED: 
                case REQUIRES_NEW: {
                    break;
                }
                case MANDATORY: {
                    throw this.noConnectionFound();
                }
            }
            return this.openConnectionFlux(definition, callback);
        });
    }

    private <T> Flux<T> existingConnectionFlux(ConnectionDefinition definition, Function<ConnectionStatus<C>, Flux<T>> callback, C clientSession) {
        return callback.apply(new DefaultConnectionStatus<C>(clientSession, definition, false));
    }

    private <T> Flux<T> openConnectionFlux(ConnectionDefinition definition, Function<ConnectionStatus<C>, Flux<T>> callback) {
        return Flux.usingWhen(this.openConnection(definition), connection -> {
            DefaultConnectionStatus<Object> status = new DefaultConnectionStatus<Object>(connection, definition, true);
            return ((Flux)callback.apply(status)).contextWrite(ctx -> this.addClientSession((Context)ctx, status));
        }, connection -> this.closeConnection(connection, definition));
    }

    @Override
    @NonNull
    public <T> Mono<T> withConnectionMono(@NonNull ConnectionDefinition definition, @NonNull Function<ConnectionStatus<C>, Mono<T>> callback) {
        Objects.requireNonNull(callback, "Callback cannot be null");
        return Mono.deferContextual(contextView -> {
            C connection = this.findConnection((ContextView)contextView);
            if (connection != null) {
                return switch (definition.getPropagationBehavior()) {
                    default -> throw new IncompatibleClassChangeError();
                    case ConnectionDefinition.Propagation.REQUIRED, ConnectionDefinition.Propagation.MANDATORY -> this.existingConnectionMono(definition, callback, connection);
                    case ConnectionDefinition.Propagation.REQUIRES_NEW -> this.openConnectionMono(definition, callback);
                };
            }
            switch (definition.getPropagationBehavior()) {
                default: {
                    throw new IncompatibleClassChangeError();
                }
                case REQUIRED: 
                case REQUIRES_NEW: {
                    break;
                }
                case MANDATORY: {
                    throw this.noConnectionFound();
                }
            }
            return this.openConnectionMono(definition, callback);
        });
    }

    private <T> Mono<T> existingConnectionMono(ConnectionDefinition definition, Function<ConnectionStatus<C>, Mono<T>> callback, C clientSession) {
        return callback.apply(new DefaultConnectionStatus<C>(clientSession, definition, false));
    }

    private <T> Mono<T> openConnectionMono(ConnectionDefinition definition, Function<ConnectionStatus<C>, Mono<T>> callback) {
        return Mono.usingWhen(this.openConnection(definition), connection -> {
            DefaultConnectionStatus<Object> status = new DefaultConnectionStatus<Object>(connection, definition, true);
            return ((Mono)callback.apply(status)).contextWrite(ctx -> this.addClientSession((Context)ctx, status));
        }, connection -> this.closeConnection(connection, definition));
    }

    private NoConnectionException noConnectionFound() {
        return new NoConnectionException("No existing connection found for connection marked with propagation 'mandatory'");
    }

    @NonNull
    private Context addClientSession(@NonNull Context context, @NonNull ConnectionStatus<C> status) {
        return ReactorPropagation.addContextElement((Context)context, new ClientSessionPropagatedContext<C>(this, status));
    }

    @Nullable
    private C findConnection(@NonNull ContextView contextView) {
        return this.findConnectionStatus(contextView).map(ConnectionStatus::getConnection).orElse(null);
    }

    private record ClientSessionPropagatedContext<C>(ReactiveStreamsConnectionOperations<?> connectionOperations, ConnectionStatus<C> status) implements PropagatedContextElement
    {
    }
}

