/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.companion;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.companion.ClusterCompanion;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerBuilder;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerGroupsCompanion;
import io.smallrye.reactive.messaging.kafka.companion.OffsetsCompanion;
import io.smallrye.reactive.messaging.kafka.companion.ProducerBuilder;
import io.smallrye.reactive.messaging.kafka.companion.ProducerTask;
import io.smallrye.reactive.messaging.kafka.companion.TopicsCompanion;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

@Experimental(value="Experimental API")
public class KafkaCompanion
implements AutoCloseable {
    private final Map<Class<?>, Serde<?>> serdeMap = new HashMap();
    private final Map<String, Object> commonClientConfig = new HashMap<String, Object>();
    private final String bootstrapServers;
    private final Duration kafkaApiTimeout;
    private AdminClient adminClient;
    private final List<Runnable> onClose = new CopyOnWriteArrayList<Runnable>();

    public KafkaCompanion(String bootstrapServers) {
        this(bootstrapServers, Duration.ofSeconds(10L));
    }

    public KafkaCompanion(String bootstrapServers, Duration kafkaApiTimeout) {
        this.bootstrapServers = bootstrapServers;
        this.kafkaApiTimeout = kafkaApiTimeout;
    }

    public Duration getKafkaApiTimeout() {
        return this.kafkaApiTimeout;
    }

    public Map<String, Object> getCommonClientConfig() {
        return this.commonClientConfig;
    }

    public void setCommonClientConfig(Map<String, Object> properties) {
        this.commonClientConfig.putAll(properties);
    }

    public String getBootstrapServers() {
        return this.bootstrapServers;
    }

    public synchronized AdminClient getOrCreateAdminClient() {
        if (this.adminClient == null) {
            HashMap<String, Object> configMap = new HashMap<String, Object>(this.getCommonClientConfig());
            configMap.put("bootstrap.servers", this.getBootstrapServers());
            configMap.put("metadata.max.age.ms", 1000);
            configMap.put("client.id", "companion-admin-for-" + this.getBootstrapServers());
            this.adminClient = AdminClient.create(configMap);
            this.registerOnClose(() -> this.adminClient.close(this.kafkaApiTimeout));
        }
        return this.adminClient;
    }

    @Override
    public synchronized void close() {
        for (Runnable runnable : new ArrayList<Runnable>(this.onClose)) {
            runnable.run();
        }
    }

    public static void sleep(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static String getHeader(Headers headers, String key) {
        return new String(headers.lastHeader(key).value(), StandardCharsets.UTF_8);
    }

    public static TopicPartition tp(String topic, int partition) {
        return new TopicPartition(topic, partition);
    }

    public static <K, V> ProducerRecord<K, V> record(String topic, V value) {
        return new ProducerRecord(topic, value);
    }

    public static <K, V> ProducerRecord<K, V> record(String topic, K key, V value) {
        return new ProducerRecord(topic, key, value);
    }

    public static <K, V> ProducerRecord<K, V> record(String topic, Integer partition, K key, V value) {
        return new ProducerRecord(topic, partition, key, value);
    }

    public static <T> Uni<T> waitFor(Uni<T> source, Predicate<T> predicate, Duration pollDelay) {
        return source.repeat().withDelay(pollDelay).whilst(t -> predicate.negate().test(t)).select().last().toUni();
    }

    public static Uni<Void> waitFor(BooleanSupplier predicate, Duration pollDelay) {
        return Uni.createFrom().voidItem().repeat().withDelay(pollDelay).whilst(t -> !predicate.getAsBoolean()).select().last().toUni();
    }

    protected static <T> Uni<T> toUni(KafkaFuture<T> kafkaFuture) {
        return Uni.createFrom().completionStage(kafkaFuture.toCompletionStage());
    }

    public <T> void registerSerde(Class<T> type, Serde<T> serde) {
        this.serdeMap.put(type, serde);
    }

    public <T> void registerSerde(Class<T> type, Serializer<T> serializer, Deserializer<T> deserializer) {
        this.registerSerde(type, Serdes.serdeFrom(serializer, deserializer));
    }

    public <T> Serde<T> getSerdeForType(Class<T> type) {
        Serde<?> serde = this.serdeMap.get(type);
        if (serde != null) {
            return serde;
        }
        return Serdes.serdeFrom(type);
    }

    public TopicsCompanion topics() {
        return new TopicsCompanion(this.getOrCreateAdminClient(), this.kafkaApiTimeout);
    }

    public OffsetsCompanion offsets() {
        return new OffsetsCompanion(this.getOrCreateAdminClient(), this.kafkaApiTimeout);
    }

    public ConsumerGroupsCompanion consumerGroups() {
        return new ConsumerGroupsCompanion(this.getOrCreateAdminClient(), this.kafkaApiTimeout);
    }

    public ClusterCompanion cluster() {
        return new ClusterCompanion(this.getOrCreateAdminClient(), this.kafkaApiTimeout);
    }

    public void deleteRecords(Map<TopicPartition, RecordsToDelete> offsetsToDelete) {
        KafkaCompanion.toUni(this.getOrCreateAdminClient().deleteRecords(offsetsToDelete).all()).await().atMost(this.kafkaApiTimeout);
    }

    public void deleteRecords(TopicPartition partition, Long beforeOffset) {
        HashMap<TopicPartition, RecordsToDelete> offsetsToDelete = new HashMap<TopicPartition, RecordsToDelete>();
        offsetsToDelete.put(partition, RecordsToDelete.beforeOffset((long)beforeOffset));
        this.deleteRecords(offsetsToDelete);
    }

    public Map<String, Object> getConsumerProperties() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", this.bootstrapServers);
        config.put("group.id", "companion-" + UUID.randomUUID());
        config.put("client.id", "companion-" + UUID.randomUUID());
        config.put("enable.auto.commit", Boolean.FALSE.toString());
        config.put("auto.offset.reset", OffsetResetStrategy.EARLIEST.toString().toLowerCase());
        config.putAll(this.getCommonClientConfig());
        return config;
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(Class<? extends Deserializer<?>> valueDeserializerClassName) {
        return this.consumeWithDeserializers(valueDeserializerClassName.getName());
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(Class<? extends Deserializer<?>> keyDeserializerClassName, Class<? extends Deserializer<?>> valueDeserializerClassName) {
        return this.consumeWithDeserializers(keyDeserializerClassName.getName(), valueDeserializerClassName.getName());
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(String valueDeserializerClassName) {
        return this.consumeWithDeserializers(StringDeserializer.class.getName(), valueDeserializerClassName);
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(String keyDeserializerClassName, String valueDeserializerClassName) {
        ConsumerBuilder builder = new ConsumerBuilder(this.getConsumerProperties(), this.kafkaApiTimeout, keyDeserializerClassName, valueDeserializerClassName);
        this.registerOnClose(builder::close);
        return builder;
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        ConsumerBuilder<K, V> builder = new ConsumerBuilder<K, V>(this.getConsumerProperties(), this.kafkaApiTimeout, keyDeserializer, valueDeserializer);
        this.registerOnClose(builder::close);
        return builder;
    }

    public <K, V> ConsumerBuilder<K, V> consume(Serde<K> keySerde, Serde<V> valueSerde) {
        return this.consumeWithDeserializers(keySerde.deserializer(), valueSerde.deserializer());
    }

    public <K, V> ConsumerBuilder<K, V> consume(Class<K> keyType, Class<V> valueType) {
        return this.consume(this.getSerdeForType(keyType), this.getSerdeForType(valueType));
    }

    public <V> ConsumerBuilder<String, V> consume(Class<V> valueType) {
        return this.consume(Serdes.String(), this.getSerdeForType(valueType));
    }

    public ConsumerBuilder<String, String> consumeStrings() {
        return this.consume(Serdes.String(), Serdes.String());
    }

    public ConsumerBuilder<String, Integer> consumeIntegers() {
        return this.consume(Serdes.String(), Serdes.Integer());
    }

    public ConsumerBuilder<String, Double> consumeDoubles() {
        return this.consume(Serdes.String(), Serdes.Double());
    }

    public Map<String, Object> getProducerProperties() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", this.bootstrapServers);
        config.put("client.id", "companion-" + UUID.randomUUID());
        config.putAll(this.getCommonClientConfig());
        return config;
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(Class<? extends Serializer<?>> keySerializerType, Class<? extends Serializer<?>> valueSerializerType) {
        return this.produceWithSerializers(keySerializerType.getName(), valueSerializerType.getName());
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(Class<? extends Serializer<?>> valueSerializerType) {
        return this.produceWithSerializers(valueSerializerType.getName());
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(String valueSerializerClassName) {
        return this.produceWithSerializers(StringSerializer.class.getName(), valueSerializerClassName);
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(String keySerializerClassName, String valueSerializerClassName) {
        ProducerBuilder builder = new ProducerBuilder(this.getProducerProperties(), this.kafkaApiTimeout, keySerializerClassName, valueSerializerClassName);
        this.registerOnClose(builder::close);
        return builder;
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        ProducerBuilder<K, V> builder = new ProducerBuilder<K, V>(this.getProducerProperties(), this.kafkaApiTimeout, keySerializer, valueSerializer);
        this.registerOnClose(builder::close);
        return builder;
    }

    public <K, V> ProducerBuilder<K, V> produce(Serde<K> keySerde, Serde<V> valueSerde) {
        ProducerBuilder<K, V> builder = new ProducerBuilder<K, V>(this.getProducerProperties(), this.kafkaApiTimeout, keySerde, valueSerde);
        this.registerOnClose(builder::close);
        return builder;
    }

    public <K, V> ProducerBuilder<K, V> produce(Class<K> keyType, Class<V> valueType) {
        return this.produce(this.getSerdeForType(keyType), this.getSerdeForType(valueType));
    }

    public <V> ProducerBuilder<String, V> produce(Class<V> valueType) {
        return this.produce(Serdes.String(), this.getSerdeForType(valueType));
    }

    public ProducerBuilder<String, String> produceStrings() {
        return this.produce(Serdes.String(), Serdes.String());
    }

    public ProducerBuilder<String, Integer> produceIntegers() {
        return this.produce(Serdes.String(), Serdes.Integer());
    }

    public ProducerBuilder<String, Double> produceDoubles() {
        return this.produce(Serdes.String(), Serdes.Double());
    }

    public <K, C, P> ProducerTask process(Set<String> topics, ConsumerBuilder<K, C> consumer, ProducerBuilder<K, P> producer, Function<ConsumerRecord<K, C>, ProducerRecord<K, P>> process) {
        return producer.fromMulti(consumer.process(topics, m -> m.onItem().transform(process)));
    }

    public <K, C, P> ProducerTask processTransactional(Set<String> topics, ConsumerBuilder<K, C> consumer, ProducerBuilder<K, P> producer, Function<ConsumerRecord<K, C>, ProducerRecord<K, P>> process) {
        if (!producer.isTransactional()) {
            throw new IllegalStateException("producer must be transactional");
        }
        ProducerBuilder builder = producer.withOnTermination((kafkaProducer, throwable) -> {
            if (throwable == null) {
                try {
                    Map<TopicPartition, OffsetAndMetadata> position = consumer.position();
                    kafkaProducer.sendOffsetsToTransaction(position, consumer.unwrap().groupMetadata());
                    kafkaProducer.commitTransaction();
                }
                catch (Throwable e) {
                    kafkaProducer.abortTransaction();
                    consumer.resetToLastCommittedPositions();
                }
            } else {
                kafkaProducer.abortTransaction();
                consumer.resetToLastCommittedPositions();
            }
        });
        return new ProducerTask((Multi<RecordMetadata>)consumer.processBatch(topics, records -> {
            if (records.isEmpty()) {
                return Multi.createFrom().empty();
            }
            return builder.getProduceMulti(Multi.createFrom().iterable((Iterable)records).onItem().transform(process));
        }).onTermination().invoke(builder::close));
    }

    private <K, V> void registerOnClose(Runnable action) {
        this.onClose.add(() -> {
            try {
                action.run();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
    }
}

