/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.intercept;

import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaProducerConfiguration;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.Header;
import io.micronaut.messaging.exceptions.MessagingClientException;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class KafkaClientIntroductionAdvice
implements MethodInterceptor<Object, Object>,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaClientIntroductionAdvice.class);
    private final BeanContext beanContext;
    private final SerdeRegistry serdeRegistry;
    private final ConversionService<?> conversionService;
    private final Map<ProducerKey, Producer> producerMap = new ConcurrentHashMap<ProducerKey, Producer>();

    public KafkaClientIntroductionAdvice(BeanContext beanContext, SerdeRegistry serdeRegistry, ConversionService<?> conversionService) {
        this.beanContext = beanContext;
        this.serdeRegistry = serdeRegistry;
        this.conversionService = conversionService;
    }

    /*
     * Exception decompiling
     */
    public final Object intercept(MethodInvocationContext<Object, Object> context) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [4[SWITCH], 7[CASE], 10[CATCHBLOCK], 1[TRYBLOCK]], but top level block is 3[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @PreDestroy
    public final void close() {
        Collection<Producer> kafkaProducers = this.producerMap.values();
        try {
            for (Producer kafkaProducer : kafkaProducers) {
                try {
                    kafkaProducer.close();
                }
                catch (Exception e) {
                    LOG.warn("Error closing Kafka producer: {}", (Object)e.getMessage(), (Object)e);
                }
            }
        }
        finally {
            this.producerMap.clear();
        }
    }

    private Flowable buildSendFlowable(MethodInvocationContext<Object, Object> context, String topic, Argument bodyArgument, Producer kafkaProducer, List<org.apache.kafka.common.header.Header> kafkaHeaders, Object key, Object value, Long timestamp, Argument reactiveValueType) {
        ProducerRecord record = this.buildProducerRecord(topic, kafkaHeaders, key, value, timestamp);
        Flowable returnFlowable = Flowable.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> {
            if (exception != null) {
                emitter.onError((Throwable)this.wrapException(context, exception));
            } else {
                if (!reactiveValueType.equalsType(Argument.VOID_OBJECT)) {
                    Optional converted = this.conversionService.convert((Object)metadata, reactiveValueType);
                    if (converted.isPresent()) {
                        emitter.onNext(converted.get());
                    } else if (reactiveValueType.getType() == bodyArgument.getType()) {
                        emitter.onNext(value);
                    }
                }
                emitter.onComplete();
            }
        }), (BackpressureStrategy)BackpressureStrategy.ERROR);
        return returnFlowable;
    }

    private Flowable<Object> buildSendFlowable(MethodInvocationContext<Object, Object> context, String topic, Producer kafkaProducer, List<org.apache.kafka.common.header.Header> kafkaHeaders, Argument<?> returnType, Object key, Object value, Long timestamp, Duration maxBlock) {
        Flowable valueFlowable = (Flowable)Publishers.convertPublisher((Object)value, Flowable.class);
        Class javaReturnType = returnType.getType();
        if (Iterable.class.isAssignableFrom(javaReturnType)) {
            javaReturnType = returnType.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT).getType();
        }
        Class finalJavaReturnType = javaReturnType;
        Flowable sendFlowable = valueFlowable.flatMap(o -> {
            ProducerRecord record = this.buildProducerRecord(topic, kafkaHeaders, key, o, timestamp);
            LOG.trace("@KafkaClient method [{}] Sending producer record: {}", (Object)context, (Object)record);
            return Flowable.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    emitter.onError((Throwable)this.wrapException(context, exception));
                } else {
                    if (RecordMetadata.class.isAssignableFrom(finalJavaReturnType)) {
                        emitter.onNext((Object)metadata);
                    } else if (finalJavaReturnType.isInstance(o)) {
                        emitter.onNext(o);
                    } else {
                        Optional converted = this.conversionService.convert((Object)metadata, finalJavaReturnType);
                        if (converted.isPresent()) {
                            emitter.onNext(converted.get());
                        }
                    }
                    emitter.onComplete();
                }
            }), (BackpressureStrategy)BackpressureStrategy.BUFFER);
        });
        if (maxBlock != null) {
            sendFlowable = sendFlowable.timeout(maxBlock.toMillis(), TimeUnit.MILLISECONDS);
        }
        return sendFlowable;
    }

    private MessagingClientException wrapException(MethodInvocationContext<Object, Object> context, Throwable exception) {
        return new MessagingClientException("Exception sending producer record for method [" + context + "]: " + exception.getMessage(), exception);
    }

    private ProducerRecord buildProducerRecord(String topic, List<org.apache.kafka.common.header.Header> kafkaHeaders, Object key, Object value, Long timestamp) {
        return new ProducerRecord(topic, null, timestamp, key, value, kafkaHeaders.isEmpty() ? null : kafkaHeaders);
    }

    private Producer getProducer(Argument bodyArgument, @Nullable Argument keyArgument, AnnotationMetadata metadata) {
        Class keyType = keyArgument != null ? keyArgument.getType() : byte[].class;
        String clientId = metadata.getValue(KafkaClient.class, String.class).orElse(null);
        ProducerKey key = new ProducerKey(keyType, bodyArgument.getType(), clientId);
        return this.producerMap.computeIfAbsent(key, producerKey -> {
            Serializer valueSerializer;
            Object keySerializer;
            Optional namedConfig;
            String producerId = producerKey.id;
            AbstractKafkaProducerConfiguration configuration = producerId != null ? ((namedConfig = this.beanContext.findBean(KafkaProducerConfiguration.class, Qualifiers.byName((String)producerId))).isPresent() ? (AbstractKafkaProducerConfiguration)namedConfig.get() : (AbstractKafkaProducerConfiguration)this.beanContext.getBean(AbstractKafkaProducerConfiguration.class)) : (AbstractKafkaProducerConfiguration)this.beanContext.getBean(AbstractKafkaProducerConfiguration.class);
            DefaultKafkaProducerConfiguration newConfiguration = new DefaultKafkaProducerConfiguration(configuration);
            Properties newProperties = newConfiguration.getConfig();
            if (clientId != null) {
                newProperties.putIfAbsent("client.id", clientId);
            }
            metadata.getValue(KafkaClient.class, "maxBlock", Duration.class).ifPresent(maxBlock -> newProperties.put("max.block.ms", String.valueOf(maxBlock.toMillis())));
            Integer ack = metadata.getValue(KafkaClient.class, "acks", Integer.class).orElse(Integer.MIN_VALUE);
            if (ack != Integer.MIN_VALUE) {
                String acksValue = ack == -1 ? "all" : String.valueOf(ack);
                newProperties.put("acks", acksValue);
            }
            metadata.findAnnotation(KafkaClient.class).map(ann -> ann.getProperties("properties", "name")).ifPresent(newProperties::putAll);
            LOG.debug("Creating new KafkaProducer.");
            if (!newProperties.containsKey("key.serializer") && (keySerializer = (Serializer)newConfiguration.getKeySerializer().orElse(null)) == null) {
                keySerializer = keyArgument != null ? this.serdeRegistry.pickSerializer(keyArgument) : new ByteArraySerializer();
                LOG.debug("Using Kafka key serializer: {}", keySerializer);
                newConfiguration.setKeySerializer(keySerializer);
            }
            if (!newProperties.containsKey("value.serializer") && (valueSerializer = (Serializer)newConfiguration.getValueSerializer().orElse(null)) == null) {
                boolean batch = metadata.isTrue(KafkaClient.class, "batch");
                valueSerializer = this.serdeRegistry.pickSerializer(batch ? bodyArgument.getFirstTypeVariable().orElse(bodyArgument) : bodyArgument);
                LOG.debug("Using Kafka value serializer: {}", valueSerializer);
                newConfiguration.setValueSerializer(valueSerializer);
            }
            return (Producer)this.beanContext.createBean(Producer.class, new Object[]{newConfiguration});
        });
    }

    private static /* synthetic */ Object lambda$intercept$5(Class javaReturnType, Argument finalBodyArgument, Object finalValue) {
        if (javaReturnType == finalBodyArgument.getType()) {
            return finalValue;
        }
        return null;
    }

    private static /* synthetic */ Object lambda$intercept$4(Class javaReturnType, Argument finalBodyArgument, Object finalValue) {
        if (javaReturnType == finalBodyArgument.getType()) {
            return finalValue;
        }
        return null;
    }

    private /* synthetic */ Publisher lambda$intercept$3(MethodInvocationContext context, String finalTopic, Argument finalBodyArgument, Producer kafkaProducer, List kafkaHeaders, Object finalKey, Long timestamp, Argument finalReactiveTypeValue, Object o) throws Exception {
        return this.buildSendFlowable((MethodInvocationContext<Object, Object>)context, finalTopic, finalBodyArgument, kafkaProducer, kafkaHeaders, finalKey, o, timestamp, finalReactiveTypeValue);
    }

    private /* synthetic */ void lambda$intercept$2(CompletableFuture completableFuture, MethodInvocationContext context, boolean returnTypeValueVoid, Argument finalReturnTypeValue, Argument finalBodyArgument, Object finalValue, RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            completableFuture.completeExceptionally((Throwable)this.wrapException((MethodInvocationContext<Object, Object>)context, exception));
        } else if (!returnTypeValueVoid) {
            Optional converted = this.conversionService.convert((Object)metadata, finalReturnTypeValue);
            if (converted.isPresent()) {
                completableFuture.complete(converted.get());
            } else if (finalReturnTypeValue.getType() == finalBodyArgument.getType()) {
                completableFuture.complete(finalValue);
            }
        } else {
            completableFuture.complete(null);
        }
    }

    private static /* synthetic */ String lambda$intercept$1(AnnotationMetadata annotationMetadata, String argumentName) {
        return annotationMetadata.stringValue(Header.class).orElse(argumentName);
    }

    private static /* synthetic */ IllegalStateException lambda$intercept$0(MethodInvocationContext context) {
        return new IllegalStateException("No @KafkaClient annotation present on method: " + context);
    }

    private final class ProducerKey {
        final Class keyType;
        final Class valueType;
        final String id;

        ProducerKey(Class keyType, Class valueType, String id) {
            this.keyType = keyType;
            this.valueType = valueType;
            this.id = id;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ProducerKey that = (ProducerKey)o;
            return Objects.equals(this.keyType, that.keyType) && Objects.equals(this.valueType, that.valueType) && Objects.equals(this.id, that.id);
        }

        public int hashCode() {
            return Objects.hash(this.keyType, this.valueType, this.id);
        }
    }
}

