/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka;

import io.smallrye.reactive.messaging.kafka.Kafka;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;
import io.smallrye.reactive.messaging.kafka.KafkaSink;
import io.smallrye.reactive.messaging.kafka.KafkaSource;
import io.smallrye.reactive.messaging.spi.IncomingConnectorFactory;
import io.smallrye.reactive.messaging.spi.OutgoingConnectorFactory;
import io.vertx.reactivex.core.Vertx;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.MessagingProvider;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

@ApplicationScoped
public class KafkaMessagingProvider
implements IncomingConnectorFactory,
OutgoingConnectorFactory {
    @Inject
    private Instance<Vertx> instanceOfVertx;
    private List<KafkaSource> sources = new CopyOnWriteArrayList<KafkaSource>();
    private List<KafkaSink> sinks = new CopyOnWriteArrayList<KafkaSink>();
    private boolean internalVertxInstance = false;
    private Vertx vertx;

    public void terminate(@Observes @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        this.sources.forEach(KafkaSource::close);
        this.sinks.forEach(KafkaSink::close);
        if (this.internalVertxInstance) {
            this.vertx.close();
        }
    }

    @PostConstruct
    void init() {
        if (this.instanceOfVertx.isUnsatisfied()) {
            this.internalVertxInstance = true;
            this.vertx = Vertx.vertx();
        } else {
            this.vertx = (Vertx)this.instanceOfVertx.get();
        }
    }

    public Class<? extends MessagingProvider> type() {
        return Kafka.class;
    }

    public PublisherBuilder<KafkaMessage> getPublisherBuilder(Config config) {
        KafkaSource source = new KafkaSource(this.vertx, config);
        this.sources.add(source);
        return source.getSource();
    }

    public SubscriberBuilder<? extends Message, Void> getSubscriberBuilder(Config config) {
        KafkaSink sink = new KafkaSink(this.vertx, config);
        this.sinks.add(sink);
        return sink.getSink();
    }
}

