/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.contrib.kafka;

import io.opentelemetry.contrib.kafka.KafkaSpanExporterBuilder;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class KafkaSpanExporter
implements SpanExporter {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSpanExporter.class);
    private final String topicName;
    private final Producer<String, Collection<SpanData>> producer;
    private final ExecutorService executorService;
    private final long timeoutInSeconds;
    private final AtomicBoolean isShutdown = new AtomicBoolean();

    public static KafkaSpanExporterBuilder newBuilder() {
        return new KafkaSpanExporterBuilder();
    }

    KafkaSpanExporter(String topicName, Producer<String, Collection<SpanData>> producer, ExecutorService executorService, long timeoutInSeconds) {
        this.topicName = topicName;
        this.producer = producer;
        this.executorService = executorService;
        this.timeoutInSeconds = timeoutInSeconds;
    }

    public CompletableResultCode export(@Nonnull Collection<SpanData> spans) {
        if (this.isShutdown.get()) {
            return CompletableResultCode.ofFailure();
        }
        ProducerRecord producerRecord = new ProducerRecord(this.topicName, spans);
        CompletableResultCode result = new CompletableResultCode();
        CompletableFuture.runAsync(() -> this.producer.send(producerRecord, (metadata, exception) -> {
            if (exception == null) {
                result.succeed();
            } else {
                logger.error(String.format("Error while sending spans to Kafka topic %s", this.topicName), (Throwable)exception);
                result.fail();
            }
        }), this.executorService);
        return result;
    }

    public CompletableResultCode flush() {
        CompletableResultCode result = new CompletableResultCode();
        CompletableFuture.runAsync(() -> this.producer.flush(), this.executorService).handle((unused, exception) -> {
            if (exception == null) {
                result.succeed();
            } else {
                logger.error(String.format("Error while performing the flush operation on topic %s", this.topicName), exception);
                result.fail();
            }
            return true;
        });
        return result;
    }

    private CompletableResultCode shutdownExecutorService() {
        try {
            List<Runnable> interrupted;
            this.executorService.shutdown();
            boolean terminated = this.executorService.awaitTermination(this.timeoutInSeconds, TimeUnit.SECONDS);
            if (!terminated && !(interrupted = this.executorService.shutdownNow()).isEmpty()) {
                logger.error("Shutting down KafkaSpanExporter forced {} tasks to be cancelled.", (Object)interrupted.size());
            }
            return CompletableResultCode.ofSuccess();
        }
        catch (InterruptedException e) {
            logger.error("Error when trying to shutdown KafkaSpanExporter executorService.", (Throwable)e);
            return CompletableResultCode.ofFailure();
        }
    }

    private CompletableResultCode shutdownProducer() {
        try {
            this.producer.close(Duration.ofSeconds(this.timeoutInSeconds));
            return CompletableResultCode.ofSuccess();
        }
        catch (KafkaException e) {
            logger.error("Error when trying to shutdown KafkaSpanExporter Producer.", (Throwable)e);
            return CompletableResultCode.ofFailure();
        }
    }

    public CompletableResultCode shutdown() {
        if (!this.isShutdown.compareAndSet(false, true)) {
            logger.warn("Calling shutdown() multiple times.");
            return CompletableResultCode.ofSuccess();
        }
        ArrayList<CompletableResultCode> codes = new ArrayList<CompletableResultCode>(2);
        codes.add(this.shutdownExecutorService());
        codes.add(this.shutdownProducer());
        return CompletableResultCode.ofAll(codes);
    }
}

