/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.server.logging.structured.kafka;

import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.Response;
import com.linecorp.armeria.common.logging.ResponseLog;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.logging.structured.ApacheThriftStructuredLog;
import com.linecorp.armeria.server.logging.structured.StructuredLogBuilder;
import com.linecorp.armeria.server.logging.structured.StructuredLoggingService;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStructuredLoggingService<I extends Request, O extends Response, L>
extends StructuredLoggingService<I, O, L> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaStructuredLoggingService.class);
    private final Producer<byte[], L> producer;
    private final String topic;
    private final KeySelector<L> keySelector;
    private final boolean needToCloseProducer;

    public static <I extends Request, O extends Response, L> Function<Service<? super I, ? extends O>, StructuredLoggingService<I, O, L>> newDecorator(Producer<byte[], L> producer, String topic, StructuredLogBuilder<L> logBuilder, KeySelector<L> keySelector) {
        return service -> new KafkaStructuredLoggingService(service, logBuilder, producer, topic, keySelector, false);
    }

    public static <I extends Request, O extends Response, L> Function<Service<? super I, ? extends O>, StructuredLoggingService<I, O, L>> newDecorator(Producer<byte[], L> producer, String topic, StructuredLogBuilder<L> logBuilder) {
        return KafkaStructuredLoggingService.newDecorator(producer, topic, logBuilder, null);
    }

    public static <I extends Request, O extends Response, L> Function<Service<? super I, ? extends O>, StructuredLoggingService<I, O, L>> newDecorator(String bootstrapServers, String topic, StructuredLogBuilder<L> logBuilder, KeySelector<L> keySelector) {
        KafkaProducer producer = new KafkaProducer(KafkaStructuredLoggingService.newDefaultConfig(bootstrapServers));
        return arg_0 -> KafkaStructuredLoggingService.lambda$newDecorator$1(logBuilder, (Producer)producer, topic, keySelector, arg_0);
    }

    public static <I extends Request, O extends Response, L> Function<Service<? super I, ? extends O>, StructuredLoggingService<I, O, L>> newDecorator(String bootstrapServers, String topic, StructuredLogBuilder<L> logBuilder) {
        return KafkaStructuredLoggingService.newDecorator(bootstrapServers, topic, logBuilder, null);
    }

    public static <I extends Request, O extends Response> Function<Service<? super I, ? extends O>, StructuredLoggingService<I, O, ApacheThriftStructuredLog>> newDecorator(String bootstrapServers, String topic) {
        return KafkaStructuredLoggingService.newDecorator(bootstrapServers, topic, ApacheThriftStructuredLog::new);
    }

    private static Properties newDefaultConfig(String bootstrapServers) {
        Properties producerConfig = new Properties();
        producerConfig.setProperty("bootstrap.servers", bootstrapServers);
        producerConfig.setProperty("client.id", KafkaStructuredLoggingService.class.getSimpleName());
        producerConfig.setProperty("acks", "all");
        producerConfig.setProperty("retries", "3");
        return producerConfig;
    }

    KafkaStructuredLoggingService(Service<? super I, ? extends O> delegate, StructuredLogBuilder<L> logBuilder, Producer<byte[], L> producer, String topic, KeySelector<L> keySelector, boolean needToCloseProducer) {
        super(delegate, logBuilder);
        this.producer = Objects.requireNonNull(producer, "producer");
        this.topic = Objects.requireNonNull(topic, "topic");
        this.keySelector = keySelector == null ? (ctx, res, log) -> null : keySelector;
        this.needToCloseProducer = needToCloseProducer;
    }

    protected void writeLog(RequestContext reqCtx, ResponseLog resLog, L structuredLog) {
        byte[] key = this.keySelector.selectKey(reqCtx, resLog, structuredLog);
        ProducerRecord producerRecord = new ProducerRecord(this.topic, (Object)key, structuredLog);
        this.producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                logger.warn("failed to send service log to Kafka {}", (Object)producerRecord, (Object)exception);
            }
        });
    }

    protected void close() {
        if (this.needToCloseProducer) {
            this.producer.close();
        }
    }

    private static /* synthetic */ StructuredLoggingService lambda$newDecorator$1(StructuredLogBuilder logBuilder, Producer producer, String topic, KeySelector keySelector, Service service) {
        return new KafkaStructuredLoggingService(service, logBuilder, producer, topic, keySelector, true);
    }

    @FunctionalInterface
    public static interface KeySelector<E> {
        @Nullable
        public byte[] selectKey(RequestContext var1, ResponseLog var2, E var3);
    }
}

