/*
 * Decompiled with CFR 0.152.
 */
package com.zebrunner.agent.core.registrar;

import com.zebrunner.agent.core.logging.Log;
import com.zebrunner.agent.core.registrar.LogsBuffer;
import com.zebrunner.agent.core.registrar.RunContext;
import com.zebrunner.agent.core.registrar.ZebrunnerApiClient;
import com.zebrunner.agent.core.registrar.descriptor.TestDescriptor;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class FlushingLogsBuffer<E>
implements LogsBuffer<E> {
    private static final Logger log = LoggerFactory.getLogger(FlushingLogsBuffer.class);
    private static final ScheduledExecutorService FLUSH_EXECUTOR = Executors.newScheduledThreadPool(4);
    private static final ZebrunnerApiClient API_CLIENT = ZebrunnerApiClient.getInstance();
    private static final AtomicBoolean EXECUTOR_ENABLED = new AtomicBoolean();
    private static volatile Queue<Log> QUEUE = new ConcurrentLinkedQueue<Log>();
    private final Function<E, Log> converter;

    FlushingLogsBuffer(Function<E, Log> converter) {
        this.converter = converter;
        Runtime.getRuntime().addShutdownHook(new Thread(FlushingLogsBuffer::shutdown));
    }

    @Override
    public void put(E event) {
        Optional<TestDescriptor> currentTest = RunContext.getCurrentTest();
        if (currentTest.isPresent()) {
            Log log = this.converter.apply(event);
            log.setTestId(String.valueOf(currentTest.get().getZebrunnerId()));
            QUEUE.add(log);
            if (EXECUTOR_ENABLED.compareAndSet(false, true)) {
                FlushingLogsBuffer.scheduleFlush();
            }
        }
    }

    private static void scheduleFlush() {
        FLUSH_EXECUTOR.scheduleWithFixedDelay(FlushingLogsBuffer::flush, 1L, 1L, TimeUnit.SECONDS);
    }

    private static void flush() {
        if (!QUEUE.isEmpty()) {
            Long runId = RunContext.getZebrunnerRunId();
            Queue<Log> logsBatch = QUEUE;
            QUEUE = new ConcurrentLinkedQueue<Log>();
            API_CLIENT.sendLogs(logsBatch, runId);
        }
    }

    private static void shutdown() {
        FLUSH_EXECUTOR.shutdown();
        try {
            if (!FLUSH_EXECUTOR.awaitTermination(10L, TimeUnit.SECONDS)) {
                FLUSH_EXECUTOR.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        FlushingLogsBuffer.flush();
    }
}

