/*
 * Decompiled with CFR 0.152.
 */
package io.airlift.log;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.Threads;
import io.airlift.log.MessageOutput;
import jakarta.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Comparator;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.ErrorManager;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.stream.Collectors;
import org.weakref.jmx.Managed;

@ThreadSafe
public class BufferedHandler
extends Handler {
    private static final byte[] EMPTY_BYTES = new byte[0];
    private final MessageAndSource terminalMessage = new MessageAndSource(EMPTY_BYTES, "", (SettableFuture<Void>)SettableFuture.create());
    private final ExecutorService bufferDrainExecutor = Executors.newSingleThreadExecutor(Threads.daemonThreadsNamed((String)"log-buffer-drainer"));
    private final AtomicBoolean initialized = new AtomicBoolean();
    private final AtomicBoolean inputClosed = new AtomicBoolean();
    private final AtomicLong droppedMessages = new AtomicLong();
    private final MessageOutput messageOutput;
    private final DropSummaryFormatter dropSummaryFormatter;
    private final RateLimiter errorRetryLimiter;
    private final Duration maxCloseTime;
    private final int messageFlushCount;
    private final ReentrantLock queueDrainLock = new ReentrantLock();
    private final Condition recordEnqueued = this.queueDrainLock.newCondition();
    private final Deque<MessageAndSource> queue;
    @GuardedBy(value="queueDrainLock")
    private final Multiset<String> dropCountBySource = HashMultiset.create();
    @GuardedBy(value="queueDrainLock")
    private boolean terminalMessageDequeued;
    @Nullable
    @GuardedBy(value="queueDrainLock")
    private SettableFuture<Void> flushedSignal;

    public BufferedHandler(MessageOutput messageOutput, Formatter formatter, ErrorManager errorManager) {
        this(messageOutput, formatter, BufferedHandler::defaultFormatDropSummary, errorManager, RateLimiter.create((double)0.5), Duration.ofSeconds(10L), 512, 1024);
    }

    public BufferedHandler(MessageOutput messageOutput, Formatter formatter, DropSummaryFormatter dropSummaryFormatter, ErrorManager errorManager, RateLimiter errorRetryLimiter, Duration maxCloseTime, int messageFlushCount, int maxBufferSize) {
        this.messageOutput = Objects.requireNonNull(messageOutput, "messageOutput is null");
        this.setFormatter(Objects.requireNonNull(formatter, "formatter is null"));
        this.dropSummaryFormatter = Objects.requireNonNull(dropSummaryFormatter, "dropSummaryFormatter is null");
        this.setErrorManager(Objects.requireNonNull(errorManager, "errorManager is null"));
        this.errorRetryLimiter = Objects.requireNonNull(errorRetryLimiter, "errorRetryLimiter is null");
        this.maxCloseTime = Objects.requireNonNull(maxCloseTime, "maxCloseTime is null");
        Preconditions.checkArgument((messageFlushCount > 0 ? 1 : 0) != 0, (Object)"messageFlushCount must be greater than zero");
        this.messageFlushCount = messageFlushCount;
        Preconditions.checkArgument((maxBufferSize > 0 ? 1 : 0) != 0, (Object)"maxBufferSize must be greater than zero");
        this.queue = new LinkedBlockingDeque<MessageAndSource>(maxBufferSize);
    }

    private static String defaultFormatDropSummary(Multiset<String> dropCountBySource) {
        return dropCountBySource.entrySet().stream().sorted(Comparator.comparing(Multiset.Entry::getElement)).map(entry -> "%s messages dropped: %s".formatted(entry.getElement(), entry.getCount())).collect(Collectors.joining("\n", "Log buffer dropped messages:\n", ""));
    }

    public void initialize() {
        if (!this.initialized.compareAndSet(false, true)) {
            return;
        }
        this.bufferDrainExecutor.execute(this::bufferDrainLoop);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void bufferDrainLoop() {
        int flushCounter = 0;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                ImmutableMultiset dropSnapshot = ImmutableMultiset.of();
                MessageAndSource message = null;
                SettableFuture<Void> flushedSignal = null;
                try {
                    this.queueDrainLock.lock();
                    try {
                        while (true) {
                            if (!this.dropCountBySource.isEmpty()) {
                                dropSnapshot = ImmutableMultiset.copyOf(this.dropCountBySource);
                                this.dropCountBySource.clear();
                            }
                            message = this.queuePollFirst();
                            flushedSignal = this.flushedSignal;
                            this.flushedSignal = null;
                            if (!dropSnapshot.isEmpty() || message != null) break;
                            if (flushedSignal != null) {
                                break;
                            }
                            if (this.terminalMessageDequeued) {
                                return;
                            }
                            this.recordEnqueued.await();
                        }
                    }
                    finally {
                        this.queueDrainLock.unlock();
                    }
                    if (!dropSnapshot.isEmpty()) {
                        if (!this.writeMessageOutputSafe(this.formatMessageBytes(this.createDropSummaryRecord((Multiset<String>)dropSnapshot)))) {
                            this.errorRetryLimiter.acquire();
                            continue;
                        }
                        dropSnapshot = ImmutableMultiset.of();
                        ++flushCounter;
                    }
                    if (message != null) {
                        if (!this.writeMessageOutputSafe(message.logMessage())) {
                            this.errorRetryLimiter.acquire();
                            continue;
                        }
                        message = null;
                        ++flushCounter;
                    }
                    if (flushCounter < this.messageFlushCount && flushedSignal == null && this.hasDrainingWork()) continue;
                    this.flushMessageOutputSafe();
                    flushCounter = 0;
                }
                finally {
                    if (message != null || !dropSnapshot.isEmpty()) {
                        this.queueDrainLock.lock();
                        try {
                            if (!(message == null || !this.terminalMessageDequeued && this.dropCountBySource.isEmpty() && this.queue.offerFirst(message))) {
                                this.dropCountBySource.add((Object)message.sourceName());
                                this.droppedMessages.incrementAndGet();
                            }
                            dropSnapshot.forEachEntry((arg_0, arg_1) -> this.dropCountBySource.add(arg_0, arg_1));
                        }
                        finally {
                            this.queueDrainLock.unlock();
                        }
                    }
                    if (flushedSignal == null) continue;
                    if (flushCounter > 0) {
                        this.flushMessageOutputSafe();
                        flushCounter = 0;
                    }
                    flushedSignal.set(null);
                }
            }
            catch (LogFormatException e) {
                this.reportError(null, e, 5);
            }
            catch (InterruptedException e) {
                this.reportError("Log draining thread interrupted, will exit!", e, 0);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                this.reportError("Unexpected buffer drain loop exception", e, 0);
            }
        }
    }

    @Nullable
    @GuardedBy(value="queueDrainLock")
    private MessageAndSource queuePollFirst() {
        Preconditions.checkState((boolean)this.queueDrainLock.isHeldByCurrentThread());
        if (this.terminalMessageDequeued) {
            return null;
        }
        MessageAndSource message = this.queue.pollFirst();
        if (message == null) {
            return null;
        }
        if (message.flushSignal != null) {
            if (this.flushedSignal == null) {
                this.flushedSignal = message.flushSignal;
            } else {
                message.flushSignal.setFuture(this.flushedSignal);
            }
            if (message == this.terminalMessage) {
                this.terminalMessageDequeued = true;
                SettableFuture<Void> terminalFlushedSignal = Objects.requireNonNull(this.flushedSignal, "flushedSignal must be set");
                this.queue.removeIf(queuedMessage -> {
                    if (queuedMessage.flushSignal != null) {
                        queuedMessage.flushSignal.setFuture((ListenableFuture)terminalFlushedSignal);
                        return true;
                    }
                    return false;
                });
            }
            return null;
        }
        return message;
    }

    private boolean hasDrainingWork() {
        this.queueDrainLock.lock();
        try {
            boolean bl = !this.dropCountBySource.isEmpty() || !this.queue.isEmpty();
            return bl;
        }
        finally {
            this.queueDrainLock.unlock();
        }
    }

    private LogRecord createDropSummaryRecord(Multiset<String> droppedSnapshot) {
        try {
            LogRecord record = new LogRecord(Level.SEVERE, this.dropSummaryFormatter.formatDropSummary(droppedSnapshot));
            record.setLoggerName(BufferedHandler.class.getName());
            return record;
        }
        catch (Exception e) {
            throw new LogFormatException(e);
        }
    }

    @Override
    public void publish(LogRecord record) {
        try {
            if (!this.isLoggable(record)) {
                return;
            }
            if (this.inputClosed.get()) {
                return;
            }
            MessageAndSource message = this.toMessageAndSource(record);
            this.queueInsert(message);
        }
        catch (LogFormatException e) {
            this.reportError(null, e, 5);
        }
        catch (Exception e) {
            this.reportError(null, e, 0);
        }
    }

    private MessageAndSource toMessageAndSource(LogRecord record) {
        return new MessageAndSource(this.formatMessageBytes(record), BufferedHandler.determineSourceName(record), null);
    }

    private byte[] formatMessageBytes(LogRecord logRecord) {
        try {
            return this.getFormatter().format(logRecord).getBytes(StandardCharsets.UTF_8);
        }
        catch (Exception e) {
            throw new LogFormatException(e);
        }
    }

    private static String determineSourceName(LogRecord record) {
        return (String)MoreObjects.firstNonNull((Object)record.getLoggerName(), (Object)"UNKNOWN");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void queueInsert(MessageAndSource message) {
        while (!this.queue.offerLast(message)) {
            this.queueDrainLock.lock();
            try {
                MessageAndSource toDrop = this.queuePollFirst();
                if (toDrop == null) {
                    if (!this.terminalMessageDequeued) continue;
                    return;
                }
                this.dropCountBySource.add((Object)toDrop.sourceName());
                this.droppedMessages.incrementAndGet();
            }
            finally {
                this.queueDrainLock.unlock();
            }
        }
        this.queueDrainLock.lock();
        try {
            this.recordEnqueued.signal();
        }
        finally {
            this.queueDrainLock.unlock();
        }
    }

    @Override
    public void flush() {
        if (this.inputClosed.get()) {
            return;
        }
        this.flushMessageOutputSafe();
    }

    public ListenableFuture<Void> requestFullFlush() {
        if (this.inputClosed.get()) {
            return Futures.nonCancellationPropagating((ListenableFuture)((ListenableFuture)Objects.requireNonNull(this.terminalMessage.flushSignal(), "terminalMessage flush signal must not be null")));
        }
        SettableFuture<Void> flushedSignal = SettableFuture.create();
        this.queueInsert(new MessageAndSource(EMPTY_BYTES, "", flushedSignal));
        this.queueDrainLock.lock();
        try {
            if (this.inputClosed.get() || this.terminalMessageDequeued) {
                flushedSignal = Objects.requireNonNull(this.terminalMessage.flushSignal(), "terminalMessage flush signal must not be null");
            }
        }
        finally {
            this.queueDrainLock.unlock();
        }
        return Futures.nonCancellationPropagating(flushedSignal);
    }

    @Override
    public void close() {
        if (!this.inputClosed.compareAndSet(false, true)) {
            return;
        }
        this.queueInsert(this.terminalMessage);
        try {
            this.bufferDrainExecutor.shutdown();
            if (!this.bufferDrainExecutor.awaitTermination(this.maxCloseTime.toMillis(), TimeUnit.MILLISECONDS)) {
                this.reportError("Timed out waiting for data flush during close", null, 3);
            }
        }
        catch (InterruptedException e) {
            this.reportError("Interrupted awaiting data flush during close", e, 3);
            Thread.currentThread().interrupt();
        }
        finally {
            this.closeMessageOutputSafe();
        }
    }

    @VisibleForTesting
    boolean isTerminalMessageDequeued() {
        this.queueDrainLock.lock();
        try {
            boolean bl = this.terminalMessageDequeued;
            return bl;
        }
        finally {
            this.queueDrainLock.unlock();
        }
    }

    @VisibleForTesting
    MessageOutput getMessageOutput() {
        return this.messageOutput;
    }

    @Managed
    public long getDroppedMessages() {
        return this.droppedMessages.get();
    }

    private boolean writeMessageOutputSafe(byte[] message) {
        try {
            this.messageOutput.writeMessage(message);
            return true;
        }
        catch (Exception e) {
            this.reportError("Could not write to the MessageOutput", e, 1);
            return false;
        }
    }

    private void flushMessageOutputSafe() {
        try {
            this.messageOutput.flush();
        }
        catch (Exception e) {
            this.reportError("Could not flush the MessageOutput", e, 2);
        }
    }

    private void closeMessageOutputSafe() {
        try {
            this.messageOutput.close();
        }
        catch (Exception e) {
            this.reportError("Could not close the MessageOutput", e, 3);
        }
    }

    public static interface DropSummaryFormatter {
        public String formatDropSummary(Multiset<String> var1);
    }

    private record MessageAndSource(byte[] logMessage, String sourceName, @Nullable SettableFuture<Void> flushSignal) {
        private MessageAndSource {
            Objects.requireNonNull(logMessage, "logMessage is null");
            Objects.requireNonNull(sourceName, "sourceName is null");
        }
    }

    private static class LogFormatException
    extends RuntimeException {
        public LogFormatException(Throwable cause) {
            super(cause);
        }
    }
}

