/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.kafkastreamsprocessor.impl;

import io.quarkiverse.kafkastreamsprocessor.api.Processor;
import io.quarkiverse.kafkastreamsprocessor.impl.Kafka2ProcessorAdapter;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.context.RequestScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.Bean;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.annotation.Annotation;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Dependent
public class KStreamProcessorSupplier<KIn, VIn, KOut, VOut>
implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KStreamProcessorSupplier.class);
    private final Instance<org.apache.kafka.streams.processor.api.Processor<?, ?, ?, ?>> kafka3BeanInstances;
    private final Instance<org.apache.kafka.streams.processor.Processor<?, ?>> beanInstances;
    private final Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances;

    @Inject
    public KStreamProcessorSupplier(Instance<org.apache.kafka.streams.processor.api.Processor<?, ?, ?, ?>> kafka3BeanInstances, Instance<org.apache.kafka.streams.processor.Processor<?, ?>> beanInstances, Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances, BeanManager beanManager) {
        this.kafka3BeanInstances = kafka3BeanInstances;
        this.beanInstances = beanInstances;
        this.adapterInstances = adapterInstances;
        log.info("Configured Processor decorators are in order: {}", (Object)beanManager.resolveDecorators(Set.of(org.apache.kafka.streams.processor.api.Processor.class), new Annotation[0]).stream().map(Bean::getBeanClass).map(Class::getName).collect(Collectors.joining(", ")));
    }

    public org.apache.kafka.streams.processor.api.Processor<KIn, VIn, KOut, VOut> get() {
        org.apache.kafka.streams.processor.api.Processor processor;
        Optional<org.apache.kafka.streams.processor.api.Processor> kafka3Processor = this.kafka3BeanInstances.stream().filter(bean -> KStreamProcessorSupplier.hasAnnotation(bean, Processor.class)).findFirst();
        if (kafka3Processor.isEmpty()) {
            Optional<org.apache.kafka.streams.processor.Processor> kafka2Processor = this.beanInstances.stream().filter(bean -> KStreamProcessorSupplier.hasAnnotation(bean, Processor.class)).findFirst();
            if (kafka2Processor.isEmpty()) {
                throw new IllegalArgumentException("No bean found of type " + Processor.class);
            }
            Kafka2ProcessorAdapter processorAdapter = (Kafka2ProcessorAdapter)this.adapterInstances.get();
            processorAdapter.adapt(kafka2Processor.get());
            processor = processorAdapter;
        } else {
            processor = kafka3Processor.get();
        }
        if (KStreamProcessorSupplier.hasAnnotation(processor, ApplicationScoped.class) || KStreamProcessorSupplier.hasAnnotation(processor, Singleton.class) || KStreamProcessorSupplier.hasAnnotation(processor, RequestScoped.class)) {
            throw new IllegalArgumentException("Processors cannot have a scope other than @Dependant, since KafkaStreams implementation classes are not thread-safe");
        }
        return processor;
    }

    private static boolean hasAnnotation(Object bean, Class<? extends Annotation> annotation) {
        for (Class<?> current = bean.getClass(); current != null; current = current.getSuperclass()) {
            if (!current.isAnnotationPresent(annotation)) continue;
            return true;
        }
        return false;
    }
}

