/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands;
import io.quarkus.redis.runtime.datasource.AbstractRedisCommands;
import io.quarkus.redis.runtime.datasource.Marshaller;
import io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl;
import io.quarkus.redis.runtime.datasource.RedisCommand;
import io.quarkus.redis.runtime.datasource.Validation;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.mutiny.redis.client.Command;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.mutiny.redis.client.RedisConnection;
import io.vertx.mutiny.redis.client.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;

public class ReactivePubSubCommandsImpl<V>
extends AbstractRedisCommands
implements ReactivePubSubCommands<V> {
    private final Class<V> classOfMessage;
    private final Redis client;
    private final ReactiveRedisDataSourceImpl datasource;

    public ReactivePubSubCommandsImpl(ReactiveRedisDataSourceImpl ds, Class<V> classOfMessage) {
        super(ds, new Marshaller(classOfMessage));
        this.client = ds.redis;
        this.datasource = ds;
        this.classOfMessage = classOfMessage;
    }

    @Override
    public ReactiveRedisDataSource getDataSource() {
        return this.datasource;
    }

    @Override
    public Uni<Void> publish(String channel, V message) {
        ParameterValidation.nonNull((Object)channel, (String)"channel");
        ParameterValidation.nonNull(message, (String)"message");
        RedisCommand cmd = RedisCommand.of(Command.PUBLISH).put(channel).put(this.marshaller.encode(message));
        return this.execute(cmd).replaceWithVoid();
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(String channel, Consumer<V> onMessage) {
        return this.subscribe(List.of(channel), onMessage);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPattern(String pattern, Consumer<V> onMessage) {
        return this.subscribeToPatterns(List.of(pattern), onMessage);
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, Consumer<V> onMessage) {
        Validation.notNullOrEmpty(patterns, "patterns");
        ParameterValidation.nonNull(onMessage, (String)"onMessage");
        for (String pattern : patterns) {
            if (pattern == null) {
                throw new IllegalArgumentException("Patterns must not be null");
            }
            if (!pattern.isBlank()) continue;
            throw new IllegalArgumentException("Patterns cannot be blank");
        }
        return this.client.connect().chain(conn -> {
            RedisAPI api = RedisAPI.api((RedisConnection)conn);
            ReactiveRedisPatternSubscriberImpl subscriber = new ReactiveRedisPatternSubscriberImpl((RedisConnection)conn, api, onMessage, patterns);
            return subscriber.subscribe().replaceWith((Object)subscriber);
        });
    }

    @Override
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMessage) {
        Validation.notNullOrEmpty(channels, "channels");
        ParameterValidation.nonNull(onMessage, (String)"onMessage");
        for (String channel : channels) {
            if (channel == null) {
                throw new IllegalArgumentException("Channels must not be null");
            }
            if (!channel.isBlank()) continue;
            throw new IllegalArgumentException("Channels cannot be blank");
        }
        return this.client.connect().chain(conn -> {
            RedisAPI api = RedisAPI.api((RedisConnection)conn);
            ReactiveAbstractRedisSubscriberImpl subscriber = new ReactiveAbstractRedisSubscriberImpl((RedisConnection)conn, api, onMessage, channels);
            return subscriber.subscribe().replaceWith((Object)subscriber);
        });
    }

    @Override
    public Multi<V> subscribe(String ... channels) {
        Validation.notNullOrEmpty(channels, "channels");
        ParameterValidation.doesNotContainNull((Object[])channels, (String)"channels");
        return Multi.createFrom().emitter(emitter -> this.subscribe(List.of(channels), arg_0 -> ((MultiEmitter)emitter).emit(arg_0)).subscribe().with(x -> emitter.onTermination(() -> x.unsubscribe(channels).subscribe().asCompletionStage()), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)));
    }

    @Override
    public Multi<V> subscribeToPatterns(String ... patterns) {
        Validation.notNullOrEmpty(patterns, "patterns");
        ParameterValidation.doesNotContainNull((Object[])patterns, (String)"patterns");
        return Multi.createFrom().emitter(emitter -> this.subscribeToPatterns(List.of(patterns), arg_0 -> ((MultiEmitter)emitter).emit(arg_0)).subscribe().with(x -> emitter.onTermination(() -> x.unsubscribe(patterns).subscribe().asCompletionStage()), arg_0 -> ((MultiEmitter)emitter).fail(arg_0)));
    }

    private class ReactiveRedisPatternSubscriberImpl
    extends AbstractRedisSubscriber
    implements ReactivePubSubCommands.ReactiveRedisSubscriber {
        private final List<String> patterns;

        public ReactiveRedisPatternSubscriberImpl(RedisConnection connection, RedisAPI api, Consumer<V> onMessage, List<String> patterns) {
            super(connection, api, onMessage);
            this.patterns = new ArrayList<String>(patterns);
        }

        @Override
        Uni<Void> subscribeToRedis() {
            return this.api.psubscribe(this.patterns).replaceWithVoid();
        }

        @Override
        public Uni<Void> unsubscribe(String ... patterns) {
            Validation.notNullOrEmpty(patterns, "patterns");
            ParameterValidation.doesNotContainNull((Object[])patterns, (String)"patterns");
            List<String> list = List.of(patterns);
            return this.api.punsubscribe(list).chain(() -> {
                this.patterns.removeAll(list);
                return this.closeAndUnregister(this.patterns);
            });
        }

        @Override
        public Uni<Void> unsubscribe() {
            return this.api.punsubscribe(this.patterns).chain(() -> {
                this.patterns.clear();
                return this.closeAndUnregister(this.patterns);
            });
        }
    }

    private class ReactiveAbstractRedisSubscriberImpl
    extends AbstractRedisSubscriber
    implements ReactivePubSubCommands.ReactiveRedisSubscriber {
        private final List<String> channels;

        public ReactiveAbstractRedisSubscriberImpl(RedisConnection connection, RedisAPI api, Consumer<V> onMessage, List<String> channels) {
            super(connection, api, onMessage);
            this.channels = new ArrayList<String>(channels);
        }

        @Override
        Uni<Void> subscribeToRedis() {
            return this.api.subscribe(this.channels).replaceWithVoid();
        }

        @Override
        public Uni<Void> unsubscribe(String ... channels) {
            Validation.notNullOrEmpty(channels, "channels");
            ParameterValidation.doesNotContainNull((Object[])channels, (String)"channels");
            List<String> list = List.of(channels);
            return this.api.unsubscribe(list).chain(() -> {
                this.channels.removeAll(list);
                return this.closeAndUnregister(this.channels);
            });
        }

        @Override
        public Uni<Void> unsubscribe() {
            return this.api.unsubscribe(this.channels).chain(() -> {
                this.channels.clear();
                return this.closeAndUnregister(this.channels);
            });
        }
    }

    private abstract class AbstractRedisSubscriber
    implements ReactivePubSubCommands.ReactiveRedisSubscriber {
        final RedisConnection connection;
        final RedisAPI api;
        final String id;
        final Consumer<V> onMessage;

        private AbstractRedisSubscriber(RedisConnection connection, RedisAPI api, Consumer<V> onMessage) {
            this.connection = connection;
            this.api = api;
            this.id = UUID.randomUUID().toString();
            this.onMessage = onMessage;
        }

        abstract Uni<Void> subscribeToRedis();

        public Uni<String> subscribe() {
            Uni handled = Uni.createFrom().emitter(emitter -> this.connection.handler(r -> {
                if (r != null && r.size() > 0) {
                    Context ctxt = Vertx.currentContext();
                    if (ctxt != null) {
                        this.handleRedisEvent((UniEmitter<? super Void>)emitter, (Response)r);
                    } else {
                        ReactivePubSubCommandsImpl.this.datasource.getVertx().runOnContext(() -> this.handleRedisEvent((UniEmitter<? super Void>)emitter, (Response)r));
                    }
                }
            }));
            Uni<Void> subscribed = this.subscribeToRedis();
            return subscribed.chain(() -> handled).replaceWith((Object)this.id);
        }

        private void handleRedisEvent(UniEmitter<? super Void> emitter, Response r) {
            Context context = VertxContext.getOrCreateDuplicatedContext((Context)Vertx.currentContext());
            String command = r.get(0).toString();
            if ("subscribe".equalsIgnoreCase(command) || "psubscribe".equalsIgnoreCase(command)) {
                emitter.complete(null);
            } else if ("message".equalsIgnoreCase(command)) {
                context.runOnContext(x -> this.onMessage.accept(ReactivePubSubCommandsImpl.this.marshaller.decode(ReactivePubSubCommandsImpl.this.classOfMessage, r.get(2))));
            } else if ("pmessage".equalsIgnoreCase(command)) {
                context.runOnContext(x -> this.onMessage.accept(ReactivePubSubCommandsImpl.this.marshaller.decode(ReactivePubSubCommandsImpl.this.classOfMessage, r.get(3))));
            }
        }

        public Uni<Void> closeAndUnregister(Collection<?> collection) {
            if (collection.isEmpty()) {
                return this.connection.close();
            }
            return Uni.createFrom().voidItem();
        }
    }
}

