/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.StrictQueueSizeGuard;
import io.quarkus.reactivemessaging.http.runtime.config.StreamConfigBase;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.ext.web.RoutingContext;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;

abstract class ReactiveHandlerBeanBase<ConfigType extends StreamConfigBase, MessageType> {
    protected final Map<String, Bundle<MessageType>> processors = new HashMap<String, Bundle<MessageType>>();

    ReactiveHandlerBeanBase() {
    }

    @PostConstruct
    void init() {
        this.configs().forEach(this::addProcessor);
    }

    void handle(RoutingContext event) {
        Bundle<MessageType> bundle = this.processors.get(this.key(event));
        if (bundle != null) {
            MultiEmitter emitter = bundle.emitter;
            StrictQueueSizeGuard guard = bundle.guard;
            this.handleRequest(event, emitter, guard, bundle.path);
        } else {
            event.response().setStatusCode(404).end();
        }
    }

    private void addProcessor(ConfigType streamConfig) {
        StrictQueueSizeGuard guard = new StrictQueueSizeGuard(((StreamConfigBase)streamConfig).bufferSize);
        Bundle bundle = new Bundle(guard);
        Multi processor = Multi.createFrom().emitter(bundle::setEmitter, BackPressureStrategy.BUFFER).onItem().invoke(guard::dequeue);
        bundle.setProcessor(processor);
        bundle.setPath(((StreamConfigBase)streamConfig).path);
        Bundle previousProcessor = this.processors.put(this.key(streamConfig), bundle);
        if (previousProcessor != null) {
            throw new IllegalStateException("Duplicate incoming streams defined for " + this.description(streamConfig));
        }
    }

    protected abstract void handleRequest(RoutingContext var1, MultiEmitter<? super MessageType> var2, StrictQueueSizeGuard var3, String var4);

    protected abstract String description(ConfigType var1);

    protected abstract String key(ConfigType var1);

    protected abstract String key(RoutingContext var1);

    protected abstract Collection<ConfigType> configs();

    protected class Bundle<MessageType> {
        private final StrictQueueSizeGuard guard;
        private Multi<MessageType> processor;
        private MultiEmitter<? super MessageType> emitter;
        private String path;

        private Bundle(StrictQueueSizeGuard guard) {
            this.guard = guard;
        }

        public void setProcessor(Multi<MessageType> processor) {
            this.processor = processor;
        }

        public void setEmitter(MultiEmitter<? super MessageType> emitter) {
            this.emitter = emitter;
        }

        public Multi<MessageType> getProcessor() {
            return this.processor;
        }

        public void setPath(String path) {
            this.path = path;
        }
    }
}

