/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.companion;

import com.opencsv.CSVReader;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
import io.smallrye.reactive.messaging.kafka.companion.ProducerTask;
import io.smallrye.reactive.messaging.kafka.companion.RecordQualifiers;
import java.io.Closeable;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.jboss.logging.Logger;

public class ProducerBuilder<K, V>
implements Closeable {
    private static final Logger LOGGER = Logger.getLogger(ProducerBuilder.class);
    private final Map<String, Object> props;
    private final Function<Map<String, Object>, KafkaProducer<K, V>> producerCreator;
    private final Duration kafkaApiTimeout;
    private Serde<K> keySerde;
    private Serde<V> valueSerde;
    private KafkaProducer<K, V> kafkaProducer;
    private ExecutorService executorService;
    private BiConsumer<KafkaProducer<K, V>, Throwable> onTermination = this::terminate;

    public ProducerBuilder(Map<String, Object> props, Duration kafkaApiTimeout, String keySerializerClassName, String valueSerializerClassName) {
        this.props = props;
        this.kafkaApiTimeout = kafkaApiTimeout;
        this.props.put("key.serializer", keySerializerClassName);
        this.props.put("value.serializer", valueSerializerClassName);
        this.producerCreator = KafkaProducer::new;
    }

    public ProducerBuilder(Map<String, Object> props, Duration kafkaApiTimeout, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.props = props;
        this.kafkaApiTimeout = kafkaApiTimeout;
        this.producerCreator = p -> new KafkaProducer(p, keySerializer, valueSerializer);
    }

    public ProducerBuilder(Map<String, Object> props, Duration kafkaApiTimeout, Serde<K> keySerde, Serde<V> valueSerde) {
        this(props, kafkaApiTimeout, keySerde.serializer(), valueSerde.serializer());
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    private synchronized KafkaProducer<K, V> getOrCreateProducer() {
        if (this.kafkaProducer == null) {
            this.kafkaProducer = this.producerCreator.apply(this.props);
            if (this.props.containsKey("transactional.id")) {
                this.kafkaProducer.initTransactions();
            }
        }
        return this.kafkaProducer;
    }

    private synchronized ExecutorService getOrCreateExecutor() {
        if (this.executorService == null) {
            this.executorService = Executors.newFixedThreadPool(1, c -> new Thread(c, "producer-" + this.clientId()));
        }
        return this.executorService;
    }

    public KafkaProducer<K, V> unwrap() {
        return this.kafkaProducer;
    }

    @Override
    public synchronized void close() {
        if (this.kafkaProducer != null) {
            LOGGER.infof("Closing producer %s", (Object)this.clientId());
            this.kafkaProducer.flush();
            this.kafkaProducer.close(this.kafkaApiTimeout);
            this.kafkaProducer = null;
            this.executorService.shutdown();
            this.executorService = null;
        }
    }

    private void terminate(KafkaProducer<K, V> producer, Throwable throwable) {
        if (this.isTransactional()) {
            if (throwable == null) {
                producer.commitTransaction();
            } else {
                producer.abortTransaction();
            }
        }
        this.close();
    }

    public ProducerBuilder<K, V> withProp(String key, String value) {
        this.props.put(key, value);
        return this;
    }

    public ProducerBuilder<K, V> withProps(Map<String, String> properties) {
        this.props.putAll(properties);
        return this;
    }

    public ProducerBuilder<K, V> withClientId(String clientId) {
        return this.withProp("client.id", clientId);
    }

    public ProducerBuilder<K, V> withTransactionalId(String transactionalId) {
        return this.withProp("transactional.id", transactionalId);
    }

    public ProducerBuilder<K, V> withOnTermination(BiConsumer<KafkaProducer<K, V>, Throwable> onTermination) {
        Objects.requireNonNull(this.props.get("transactional.id"), "transactional id");
        this.onTermination = onTermination;
        return this;
    }

    public String clientId() {
        return (String)this.props.get("client.id");
    }

    public boolean isTransactional() {
        return this.props.containsKey("transactional.id");
    }

    private Uni<RecordMetadata> record(ProducerRecord<K, V> record) {
        return Uni.createFrom().emitter(em -> this.getOrCreateProducer().send(record, (metadata, exception) -> {
            if (exception != null) {
                em.fail((Throwable)exception);
            } else {
                em.complete((Object)metadata);
            }
        })).emitOn((Executor)this.getOrCreateExecutor()).invoke(() -> LOGGER.debugf("Producer %s: sent message %s", (Object)this.clientId(), (Object)record));
    }

    Multi<RecordMetadata> getProduceMulti(Multi<ProducerRecord<K, V>> recordProducer) {
        return Multi.createFrom().deferred(() -> {
            if (this.isTransactional()) {
                this.getOrCreateProducer().beginTransaction();
            }
            return recordProducer.onItem().transformToUniAndConcatenate(this::record).runSubscriptionOn((Executor)this.getOrCreateExecutor()).onTermination().invoke((throwable, cancelled) -> this.onTermination.accept(this.getOrCreateProducer(), (Throwable)throwable));
        });
    }

    public ProducerTask fromCsv(String resourcePath) {
        Objects.requireNonNull(resourcePath, "resource path");
        Objects.requireNonNull(this.keySerde, "Producer needs to be created with key Serde");
        Objects.requireNonNull(this.valueSerde, "Producer needs to be created with value Serde");
        return new ProducerTask(this.getProduceMulti(Multi.createFrom().resource(() -> new CSVReader((Reader)new InputStreamReader(this.getResourceAsStream(resourcePath), StandardCharsets.UTF_8)), reader -> Multi.createFrom().iterable((Iterable)reader).onItem().transform(this::getProducerRecord).filter(Objects::nonNull)).withFinalizer(Unchecked.consumer(CSVReader::close))));
    }

    private ProducerRecord<K, V> getProducerRecord(String[] values) {
        if (values.length < 1) {
            return null;
        }
        if (values.length == 1) {
            return new ProducerRecord(values[0], null);
        }
        if (values.length == 2) {
            return new ProducerRecord(values[0], this.valueSerde.deserializer().deserialize(values[0], values[1].getBytes(StandardCharsets.UTF_8)));
        }
        if (values.length == 3) {
            return new ProducerRecord(values[0], this.keySerde.deserializer().deserialize(values[0], values[1].getBytes(StandardCharsets.UTF_8)), this.valueSerde.deserializer().deserialize(values[0], values[1].getBytes(StandardCharsets.UTF_8)));
        }
        return new ProducerRecord(values[0], Integer.valueOf(Integer.parseInt(values[1])), this.keySerde.deserializer().deserialize(values[0], values[2].getBytes(StandardCharsets.UTF_8)), this.valueSerde.deserializer().deserialize(values[0], values[3].getBytes(StandardCharsets.UTF_8)));
    }

    private InputStream getResourceAsStream(String resourcePath) {
        HashSet<ClassLoader> classLoadersToSearch = new HashSet<ClassLoader>();
        classLoadersToSearch.add(Thread.currentThread().getContextClassLoader());
        classLoadersToSearch.add(ClassLoader.getSystemClassLoader());
        classLoadersToSearch.add(KafkaCompanion.class.getClassLoader());
        for (ClassLoader classLoader : classLoadersToSearch) {
            InputStream stream = classLoader.getResourceAsStream(resourcePath);
            if (stream != null) {
                return stream;
            }
            if (!resourcePath.startsWith("/") || (stream = classLoader.getResourceAsStream(resourcePath.replaceFirst("/", ""))) == null) continue;
            return stream;
        }
        throw new IllegalArgumentException("Resource '" + resourcePath + "' not found on classpath.");
    }

    public ProducerTask fromMulti(Multi<ProducerRecord<K, V>> recordMulti) {
        Objects.requireNonNull(recordMulti, "record multi");
        return new ProducerTask(this.getProduceMulti(recordMulti));
    }

    public ProducerTask fromRecords(List<ProducerRecord<K, V>> records) {
        Objects.requireNonNull(records, "records");
        return this.fromMulti(Multi.createFrom().iterable(records));
    }

    @SafeVarargs
    public final ProducerTask fromRecords(ProducerRecord<K, V> ... records) {
        Objects.requireNonNull(records, "records");
        return this.fromMulti(Multi.createFrom().items((Object[])records));
    }

    public ProducerTask usingGenerator(Function<Integer, ProducerRecord<K, V>> generatorFunction, Function<Multi<ProducerRecord<K, V>>, Multi<ProducerRecord<K, V>>> plugFunction) {
        Objects.requireNonNull(generatorFunction, "record generator function");
        return this.fromMulti(Multi.createFrom().range(0, Integer.MAX_VALUE).onItem().transform(generatorFunction).plug(plugFunction));
    }

    public ProducerTask usingGenerator(Function<Integer, ProducerRecord<K, V>> generatorFunction) {
        return this.usingGenerator(generatorFunction, Function.identity());
    }

    public ProducerTask usingGenerator(Function<Integer, ProducerRecord<K, V>> generatorFunction, long numberOfRecords) {
        return this.usingGenerator(generatorFunction, RecordQualifiers.until(numberOfRecords));
    }

    public ProducerTask usingGenerator(Function<Integer, ProducerRecord<K, V>> generatorFunction, Duration during) {
        return this.usingGenerator(generatorFunction, RecordQualifiers.until(during));
    }
}

