/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands;
import io.quarkus.redis.datasource.pubsub.RedisPubSubMessage;
import io.quarkus.redis.runtime.datasource.AbstractRedisCommands;
import io.quarkus.redis.runtime.datasource.DefaultRedisPubSubMessage;
import io.quarkus.redis.runtime.datasource.Marshaller;
import io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl;
import io.quarkus.redis.runtime.datasource.RedisCommand;
import io.quarkus.redis.runtime.datasource.Validation;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.mutiny.redis.client.Command;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.mutiny.redis.client.RedisConnection;
import io.vertx.mutiny.redis.client.Response;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class ReactivePubSubCommandsImpl<V>
extends AbstractRedisCommands
implements ReactivePubSubCommands<V> {
    private final Type classOfMessage;
    private final Redis client;
    private final ReactiveRedisDataSourceImpl datasource;

    public ReactivePubSubCommandsImpl(ReactiveRedisDataSourceImpl ds, Type classOfMessage) {
        super(ds, new Marshaller(classOfMessage));
        this.client = ds.redis;
        this.datasource = ds;
        this.classOfMessage = classOfMessage;
    }

    @Override
    public ReactiveRedisDataSource getDataSource() {
        return this.datasource;
    }

    @Override
    public Uni<Void> publish(String channel, V message) {
        ParameterValidation.nonNull((Object)channel, (String)"channel");
        ParameterValidation.nonNull(message, (String)"message");
        RedisCommand cmd = RedisCommand.of(Command.PUBLISH).put(channel).put(this.marshaller.encode(message));
        return this.execute(cmd).replaceWithVoid();
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(String channel, Consumer<V> onMessage) {
        return this.subscribe(channel, onMessage, null, null);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPattern(String pattern, Consumer<V> onMessage) {
        return this.subscribeToPattern(pattern, onMessage, null, null);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPattern(String pattern, BiConsumer<String, V> onMessage) {
        return this.subscribeToPattern(pattern, onMessage, null, null);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, Consumer<V> onMessage) {
        return this.subscribeToPatterns(patterns, onMessage, null, null);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage) {
        return this.subscribeToPatterns(patterns, onMessage, null, null);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMessage) {
        return this.subscribe(channels, onMessage, null, null);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(List<String> channels, BiConsumer<String, V> onMessage) {
        return this.subscribe(channels, onMessage, null, null);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(String channel, Consumer<V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        return this.subscribe(List.of(channel), onMessage, onEnd, onException);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPattern(String pattern, Consumer<V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        return this.subscribeToPatterns(List.of(pattern), onMessage, onEnd, onException);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPattern(String pattern, BiConsumer<String, V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        return this.subscribeToPatterns(List.of(pattern), onMessage, onEnd, onException);
    }

    private void validatePatterns(List<String> patterns) {
        Validation.notNullOrEmpty(patterns, "patterns");
        for (String pattern : patterns) {
            if (pattern == null) {
                throw new IllegalArgumentException("Pattern must not be null");
            }
            if (!pattern.isBlank()) continue;
            throw new IllegalArgumentException("Pattern cannot be blank");
        }
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, Consumer<V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        ParameterValidation.nonNull(onMessage, (String)"onMessage");
        this.validatePatterns(patterns);
        return this.client.connect().chain(conn -> {
            RedisAPI api = RedisAPI.api((RedisConnection)conn);
            ReactiveRedisPatternSubscriberImpl subscriber = new ReactiveRedisPatternSubscriberImpl((RedisConnection)conn, api, patterns, (channel, value) -> onMessage.accept(value), onEnd, onException);
            return subscriber.subscribe().replaceWith((Object)subscriber);
        });
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        this.validatePatterns(patterns);
        ParameterValidation.nonNull(onMessage, (String)"onMessage");
        return this.client.connect().chain(conn -> {
            RedisAPI api = RedisAPI.api((RedisConnection)conn);
            ReactiveRedisPatternSubscriberImpl subscriber = new ReactiveRedisPatternSubscriberImpl((RedisConnection)conn, api, patterns, onMessage, onEnd, onException);
            return subscriber.subscribe().replaceWith((Object)subscriber);
        });
    }

    private void validateChannels(List<String> channels) {
        Validation.notNullOrEmpty(channels, "channels");
        for (String pattern : channels) {
            if (pattern == null) {
                throw new IllegalArgumentException("Channel must not be null");
            }
            if (!pattern.isBlank()) continue;
            throw new IllegalArgumentException("Channel cannot be blank");
        }
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        ParameterValidation.nonNull(onMessage, (String)"onMessage");
        this.validateChannels(channels);
        return this.client.connect().chain(conn -> {
            RedisAPI api = RedisAPI.api((RedisConnection)conn);
            ReactiveAbstractRedisSubscriberImpl subscriber = new ReactiveAbstractRedisSubscriberImpl((RedisConnection)conn, api, channels, (channel, value) -> onMessage.accept(value), onEnd, onException);
            return subscriber.subscribe().replaceWith((Object)subscriber);
        });
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(List<String> channels, BiConsumer<String, V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        Validation.notNullOrEmpty(channels, "channels");
        ParameterValidation.nonNull(onMessage, (String)"onMessage");
        for (String channel : channels) {
            if (channel == null) {
                return Uni.createFrom().failure((Throwable)new IllegalArgumentException("Channels must not be null"));
            }
            if (!channel.isBlank()) continue;
            return Uni.createFrom().failure((Throwable)new IllegalArgumentException("Channels cannot be blank"));
        }
        return this.client.connect().chain(conn -> {
            RedisAPI api = RedisAPI.api((RedisConnection)conn);
            ReactiveAbstractRedisSubscriberImpl subscriber = new ReactiveAbstractRedisSubscriberImpl((RedisConnection)conn, api, channels, onMessage, onEnd, onException);
            return subscriber.subscribe().replaceWith((Object)subscriber);
        });
    }

    @Override
    public Multi<V> subscribeToPatterns(String ... patterns) {
        Validation.notNullOrEmpty(patterns, "patterns");
        ParameterValidation.doesNotContainNull((Object[])patterns, (String)"patterns");
        return Multi.createFrom().emitter(emitter -> this.subscribeToPatterns(List.of(patterns), arg_0 -> ((MultiEmitter)emitter).emit(arg_0), () -> ((MultiEmitter)emitter).complete(), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)).subscribe().with(x -> emitter.onTermination(() -> x.unsubscribe(patterns).subscribe().asCompletionStage()), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)));
    }

    @Override
    public Multi<RedisPubSubMessage<V>> subscribeAsMessagesToPatterns(String ... patterns) {
        Validation.notNullOrEmpty(patterns, "patterns");
        ParameterValidation.doesNotContainNull((Object[])patterns, (String)"patterns");
        return Multi.createFrom().emitter(emitter -> this.subscribeToPatterns(List.of(patterns), (String channel, V value) -> emitter.emit(new DefaultRedisPubSubMessage<Object>(value, (String)channel)), () -> ((MultiEmitter)emitter).complete(), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)).subscribe().with(x -> emitter.onTermination(() -> x.unsubscribe(patterns).subscribe().asCompletionStage()), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)));
    }

    @Override
    public Multi<V> subscribe(String ... channels) {
        Validation.notNullOrEmpty(channels, "channels");
        ParameterValidation.doesNotContainNull((Object[])channels, (String)"channels");
        return Multi.createFrom().emitter(emitter -> this.subscribe(List.of(channels), arg_0 -> ((MultiEmitter)emitter).emit(arg_0), () -> ((MultiEmitter)emitter).complete(), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)).subscribe().with(x -> emitter.onTermination(() -> x.unsubscribe(channels).subscribe().asCompletionStage()), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)));
    }

    @Override
    public Multi<RedisPubSubMessage<V>> subscribeAsMessages(String ... channels) {
        Validation.notNullOrEmpty(channels, "channels");
        ParameterValidation.doesNotContainNull((Object[])channels, (String)"channels");
        List<String> list = List.of(channels);
        return Multi.createFrom().emitter(emitter -> this.subscribe(list, (String channel, V value) -> emitter.emit(new DefaultRedisPubSubMessage<Object>(value, (String)channel)), () -> ((MultiEmitter)emitter).complete(), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)).subscribe().with(x -> emitter.onTermination(() -> x.unsubscribe(channels).subscribe().asCompletionStage()), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)));
    }

    private class ReactiveAbstractRedisSubscriberImpl
    extends AbstractRedisSubscriber
    implements ReactivePubSubCommands.ReactiveRedisSubscriber {
        private final List<String> channels;

        public ReactiveAbstractRedisSubscriberImpl(RedisConnection connection, RedisAPI api, List<String> channels, BiConsumer<String, V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
            super(connection, api, onMessage, onEnd, onException);
            this.channels = new ArrayList<String>(channels);
        }

        @Override
        Uni<Void> subscribeToRedis() {
            return this.api.subscribe(this.channels).replaceWithVoid();
        }

        @Override
        public Uni<Void> unsubscribe(String ... channels) {
            Validation.notNullOrEmpty(channels, "channels");
            ParameterValidation.doesNotContainNull((Object[])channels, (String)"channels");
            List<String> list = List.of(channels);
            return this.api.unsubscribe(list).chain(() -> {
                this.channels.removeAll(list);
                return this.closeAndUnregister(this.channels);
            });
        }

        @Override
        public Uni<Void> unsubscribe() {
            return this.api.unsubscribe(this.channels).chain(() -> {
                this.channels.clear();
                return this.closeAndUnregister(this.channels);
            });
        }
    }

    private class ReactiveRedisPatternSubscriberImpl
    extends AbstractRedisSubscriber
    implements ReactivePubSubCommands.ReactiveRedisSubscriber {
        private final List<String> patterns;

        public ReactiveRedisPatternSubscriberImpl(RedisConnection connection, RedisAPI api, List<String> patterns, BiConsumer<String, V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
            super(connection, api, onMessage, onEnd, onException);
            this.patterns = new ArrayList<String>(patterns);
        }

        @Override
        Uni<Void> subscribeToRedis() {
            return this.api.psubscribe(this.patterns).replaceWithVoid();
        }

        @Override
        public Uni<Void> unsubscribe(String ... patterns) {
            Validation.notNullOrEmpty(patterns, "patterns");
            ParameterValidation.doesNotContainNull((Object[])patterns, (String)"patterns");
            List<String> list = List.of(patterns);
            return this.api.punsubscribe(list).chain(() -> {
                this.patterns.removeAll(list);
                return this.closeAndUnregister(this.patterns);
            });
        }

        @Override
        public Uni<Void> unsubscribe() {
            return this.api.punsubscribe(this.patterns).chain(() -> {
                this.patterns.clear();
                return this.closeAndUnregister(this.patterns);
            });
        }
    }

    private abstract class AbstractRedisSubscriber
    implements ReactivePubSubCommands.ReactiveRedisSubscriber {
        final RedisConnection connection;
        final RedisAPI api;
        final String id;
        final BiConsumer<String, V> onMessage;
        final Runnable onEnd;
        final Consumer<Throwable> onException;

        private AbstractRedisSubscriber(RedisConnection connection, RedisAPI api, BiConsumer<String, V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
            this.connection = connection;
            this.api = api;
            this.id = UUID.randomUUID().toString();
            this.onMessage = onMessage;
            this.onEnd = onEnd;
            this.onException = onException;
        }

        abstract Uni<Void> subscribeToRedis();

        public Uni<String> subscribe() {
            Uni handled = Uni.createFrom().emitter(emitter -> {
                this.connection.handler(r -> this.runOnDuplicatedContext(() -> this.handleRedisEvent((UniEmitter<? super Void>)emitter, (Response)r)));
                if (this.onEnd != null) {
                    this.connection.endHandler(() -> this.runOnDuplicatedContext(this.onEnd));
                }
                if (this.onException != null) {
                    this.connection.exceptionHandler(t -> this.runOnDuplicatedContext(() -> this.onException.accept((Throwable)t)));
                }
            });
            Uni<Void> subscribed = this.subscribeToRedis();
            return subscribed.chain(() -> handled).replaceWith((Object)this.id);
        }

        private void runOnDuplicatedContext(Runnable runnable) {
            Consumer<Context> contextConsumer = c -> {
                Context context = VertxContext.getOrCreateDuplicatedContext((Context)c);
                VertxContextSafetyToggle.setContextSafe((Context)context, (boolean)true);
                context.runOnContext(ignored -> runnable.run());
            };
            Optional.ofNullable(Vertx.currentContext()).ifPresentOrElse(contextConsumer, () -> ReactivePubSubCommandsImpl.this.datasource.getVertx().runOnContext(() -> contextConsumer.accept(Vertx.currentContext())));
        }

        protected void handleRedisEvent(UniEmitter<? super Void> emitter, Response r) {
            if (r != null && r.size() > 0) {
                String command = r.get(0).toString();
                if ("subscribe".equalsIgnoreCase(command) || "psubscribe".equalsIgnoreCase(command)) {
                    emitter.complete(null);
                } else if ("message".equalsIgnoreCase(command)) {
                    this.onMessage.accept(r.get(1).toString(), ReactivePubSubCommandsImpl.this.marshaller.decode(ReactivePubSubCommandsImpl.this.classOfMessage, r.get(2)));
                } else if ("pmessage".equalsIgnoreCase(command)) {
                    this.onMessage.accept(r.get(2).toString(), ReactivePubSubCommandsImpl.this.marshaller.decode(ReactivePubSubCommandsImpl.this.classOfMessage, r.get(3)));
                }
            }
        }

        public Uni<Void> closeAndUnregister(Collection<?> collection) {
            if (collection.isEmpty()) {
                return this.connection.close();
            }
            return Uni.createFrom().voidItem();
        }
    }
}

