/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.server.logging.kafka;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.server.logging.AccessLogWriter;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaAccessLogWriter<K, V>
implements AccessLogWriter {
    private static final Logger logger = LoggerFactory.getLogger(KafkaAccessLogWriter.class);
    private final Producer<K, V> producer;
    private final String topic;
    private final Function<? super RequestLog, ? extends @Nullable K> keyExtractor;
    private final Function<? super RequestLog, ? extends @Nullable V> valueExtractor;

    public KafkaAccessLogWriter(Producer<K, V> producer, String topic, Function<? super RequestLog, ? extends @Nullable V> valueExtractor) {
        this(producer, topic, log -> null, valueExtractor);
    }

    public KafkaAccessLogWriter(Producer<K, V> producer, String topic, Function<? super RequestLog, ? extends @Nullable K> keyExtractor, Function<? super RequestLog, ? extends @Nullable V> valueExtractor) {
        this.producer = Objects.requireNonNull(producer, "producer");
        this.topic = Objects.requireNonNull(topic, "topic");
        this.keyExtractor = Objects.requireNonNull(keyExtractor, "keyExtractor");
        this.valueExtractor = Objects.requireNonNull(valueExtractor, "valueExtractor");
    }

    public void log(RequestLog log) {
        V value = this.valueExtractor.apply((RequestLog)log);
        if (value == null) {
            return;
        }
        K key = this.keyExtractor.apply((RequestLog)log);
        ProducerRecord producerRecord = new ProducerRecord(this.topic, key, value);
        this.producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                logger.warn("Failed to send a record to Kafka: {}", (Object)producerRecord, (Object)exception);
            }
        });
    }

    public CompletableFuture<Void> shutdown() {
        return CompletableFuture.runAsync(() -> this.producer.close());
    }
}

