/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.contribs.tasks.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.netflix.conductor.contribs.tasks.kafka.KafkaPublishTask;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducerManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerManager.class);
    private final String requestTimeoutConfig;
    private final Cache<Properties, Producer> kafkaProducerCache;
    private final String maxBlockMsConfig;
    private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    private static final RemovalListener<Properties, Producer> LISTENER = notification -> {
        if (notification.getValue() != null) {
            ((Producer)notification.getValue()).close();
            LOGGER.info("Closed producer for {}", notification.getKey());
        }
    };

    @Autowired
    public KafkaProducerManager(@Value(value="${conductor.tasks.kafka-publish.requestTimeout:100ms}") Duration requestTimeout, @Value(value="${conductor.tasks.kafka-publish.maxBlock:500ms}") Duration maxBlock, @Value(value="${conductor.tasks.kafka-publish.cacheSize:10}") int cacheSize, @Value(value="${conductor.tasks.kafka-publish.cacheTime:120000ms}") Duration cacheTime) {
        this.requestTimeoutConfig = String.valueOf(requestTimeout.toMillis());
        this.maxBlockMsConfig = String.valueOf(maxBlock.toMillis());
        this.kafkaProducerCache = CacheBuilder.newBuilder().removalListener(LISTENER).maximumSize((long)cacheSize).expireAfterAccess(cacheTime.toMillis(), TimeUnit.MILLISECONDS).build();
    }

    public Producer getProducer(KafkaPublishTask.Input input) {
        Properties configProperties = this.getProducerProperties(input);
        return this.getFromCache(configProperties, () -> new KafkaProducer(configProperties));
    }

    @VisibleForTesting
    Producer getFromCache(Properties configProperties, Callable<Producer> createProducerCallable) {
        try {
            return (Producer)this.kafkaProducerCache.get((Object)configProperties, createProducerCallable);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    Properties getProducerProperties(KafkaPublishTask.Input input) {
        Properties configProperties = new Properties();
        configProperties.put("bootstrap.servers", input.getBootStrapServers());
        configProperties.put("key.serializer", input.getKeySerializer());
        String requestTimeoutMs = this.requestTimeoutConfig;
        if (Objects.nonNull(input.getRequestTimeoutMs())) {
            requestTimeoutMs = String.valueOf(input.getRequestTimeoutMs());
        }
        String maxBlockMs = this.maxBlockMsConfig;
        if (Objects.nonNull(input.getMaxBlockMs())) {
            maxBlockMs = String.valueOf(input.getMaxBlockMs());
        }
        configProperties.put("request.timeout.ms", requestTimeoutMs);
        configProperties.put("max.block.ms", maxBlockMs);
        configProperties.put("value.serializer", STRING_SERIALIZER);
        return configProperties;
    }
}

