/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.logging;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
import org.apache.beam.sdk.fn.stream.DirectStreamObserver;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientResponseObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.initialization.qual.UnderInitialization;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.MDC;

public class BeamFnLoggingClient
implements AutoCloseable {
    private static final @UnknownKeyFor @NonNull @Initialized String ROOT_LOGGER_NAME = "";
    private static final @UnknownKeyFor @NonNull @Initialized ImmutableMap<@UnknownKeyFor @NonNull @Initialized Level,  @UnknownKeyFor @NonNull @Initialized BeamFnApi.LogEntry.Severity.Enum> LOG_LEVEL_MAP = ImmutableMap.builder().put((Object)Level.SEVERE, (Object)BeamFnApi.LogEntry.Severity.Enum.ERROR).put((Object)Level.WARNING, (Object)BeamFnApi.LogEntry.Severity.Enum.WARN).put((Object)Level.INFO, (Object)BeamFnApi.LogEntry.Severity.Enum.INFO).put((Object)Level.FINE, (Object)BeamFnApi.LogEntry.Severity.Enum.DEBUG).put((Object)Level.FINEST, (Object)BeamFnApi.LogEntry.Severity.Enum.TRACE).build();
    private static final @UnknownKeyFor @NonNull @Initialized ImmutableMap< @UnknownKeyFor @NonNull @Initialized BeamFnApi.LogEntry.Severity.Enum, @UnknownKeyFor @NonNull @Initialized Level> REVERSE_LOG_LEVEL_MAP = ImmutableMap.builder().putAll((Iterable)LOG_LEVEL_MAP.asMultimap().inverse().entries()).build();
    private static final @UnknownKeyFor @NonNull @Initialized Formatter DEFAULT_FORMATTER = new SimpleFormatter();
    private static final @UnknownKeyFor @NonNull @Initialized int MAX_BUFFERED_LOG_ENTRY_COUNT = 10000;
    private static final @UnknownKeyFor @NonNull @Initialized Object COMPLETED = new Object();
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final @UnknownKeyFor @NonNull @Initialized StreamWriter streamWriter;
    private final @UnknownKeyFor @NonNull @Initialized BeamFnLoggingClient. @UnknownKeyFor @NonNull @Initialized LogRecordHandler logRecordHandler;
    private final @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized Logger> configuredLoggers = new ArrayList<Logger>();
    private final @UnknownKeyFor @NonNull @Initialized BlockingQueue< @UnknownKeyFor @NonNull @Initialized BeamFnApi.LogEntry> bufferedLogEntries = new ArrayBlockingQueue<BeamFnApi.LogEntry>(10000);
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?> bufferedLogConsumer;
    private @Nullable @UnknownKeyFor @Initialized Thread logEntryHandlerThread = null;

    public static @UnknownKeyFor @NonNull @Initialized BeamFnLoggingClient createAndStart(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized Function<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized ManagedChannel> channelFactory) {
        BeamFnLoggingClient client = new BeamFnLoggingClient(apiServiceDescriptor, new StreamWriter(channelFactory.apply(apiServiceDescriptor)), ((SdkHarnessOptions)options.as(SdkHarnessOptions.class)).getLogMdc(), ((ExecutorOptions)options.as(ExecutorOptions.class)).getScheduledExecutorService(), (SdkHarnessOptions)options.as(SdkHarnessOptions.class));
        return client;
    }

    private BeamFnLoggingClient(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized StreamWriter streamWriter, @UnknownKeyFor @NonNull @Initialized boolean logMdc, @UnknownKeyFor @NonNull @Initialized ScheduledExecutorService executorService, @UnknownKeyFor @NonNull @Initialized SdkHarnessOptions options) {
        this.apiServiceDescriptor = apiServiceDescriptor;
        this.streamWriter = streamWriter;
        this.logRecordHandler = new LogRecordHandler(logMdc);
        this.logRecordHandler.setLevel(Level.ALL);
        this.logRecordHandler.setFormatter(DEFAULT_FORMATTER);
        CompletableFuture started = new CompletableFuture();
        this.bufferedLogConsumer = CompletableFuture.supplyAsync(() -> {
            try {
                this.logEntryHandlerThread = Thread.currentThread();
                this.installLogging(options);
                started.complete(COMPLETED);
                streamWriter.drainQueueToStream(this.bufferedLogEntries);
            }
            finally {
                this.restoreLoggers();
                this.flushFinalLogs();
            }
            return COMPLETED;
        }, executorService);
        try {
            CompletableFuture.anyOf(this.bufferedLogConsumer, started).get();
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Error starting background log thread " + e.getCause());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    @RequiresNonNull.List(value={@RequiresNonNull(value={"logRecordHandler"}), @RequiresNonNull(value={"configuredLoggers"})})
    private void installLogging(@UnderInitialization BeamFnLoggingClient this, @UnknownKeyFor @NonNull @Initialized SdkHarnessOptions options) {
        LogManager logManager = LogManager.getLogManager();
        logManager.reset();
        Logger rootLogger = logManager.getLogger(ROOT_LOGGER_NAME);
        for (Handler handler : rootLogger.getHandlers()) {
            rootLogger.removeHandler(handler);
        }
        this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions((SdkHarnessOptions)options));
        rootLogger.addHandler(this.logRecordHandler);
    }

    @Override
    public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Preconditions.checkNotNull(this.bufferedLogConsumer, (Object)"BeamFnLoggingClient not fully started");
        try {
            try {
                this.streamWriter.softClose();
                this.bufferedLogConsumer.get(10L, TimeUnit.SECONDS);
            }
            catch (TimeoutException e) {
                this.streamWriter.hardClose();
                this.bufferedLogConsumer.get();
            }
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof Exception) {
                throw (Exception)e.getCause();
            }
            throw e;
        }
    }

    @RequiresNonNull.List(value={@RequiresNonNull(value={"configuredLoggers"}), @RequiresNonNull(value={"logRecordHandler"})})
    private void restoreLoggers(@UnderInitialization BeamFnLoggingClient this) {
        for (Logger logger : this.configuredLoggers) {
            logger.setLevel(null);
            logger.removeHandler(this.logRecordHandler);
        }
        this.configuredLoggers.clear();
        LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).removeHandler(this.logRecordHandler);
        try {
            LogManager.getLogManager().readConfiguration();
        }
        catch (IOException e) {
            System.out.print("Unable to restore log managers from configuration: " + e.toString());
        }
    }

    @RequiresNonNull(value={"bufferedLogEntries"})
    void flushFinalLogs(@UnderInitialization BeamFnLoggingClient this) {
        ArrayList finalLogEntries = new ArrayList(10000);
        this.bufferedLogEntries.drainTo(finalLogEntries);
        for (BeamFnApi.LogEntry logEntry : finalLogEntries) {
            LogRecord logRecord = new LogRecord((Level)Preconditions.checkNotNull((Object)((Level)REVERSE_LOG_LEVEL_MAP.get((Object)logEntry.getSeverity()))), logEntry.getMessage());
            logRecord.setLoggerName(logEntry.getLogLocation());
            logRecord.setMillis(logEntry.getTimestamp().getSeconds() * 1000L + (long)(logEntry.getTimestamp().getNanos() / 1000000));
            logRecord.setThreadID(Integer.parseInt(logEntry.getThread()));
            if (!logEntry.getTrace().isEmpty()) {
                logRecord.setThrown(new Throwable(logEntry.getTrace()));
            }
            LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).log(logRecord);
        }
    }

    public /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?> terminationFuture() {
        Preconditions.checkNotNull(this.bufferedLogConsumer, (Object)"BeamFnLoggingClient not fully started");
        return this.bufferedLogConsumer;
    }

    @SideEffectFree
    public @UnknownKeyFor @NonNull @Initialized String toString() {
        return MoreObjects.toStringHelper(BeamFnLoggingClient.class).add("apiServiceDescriptor", (Object)this.apiServiceDescriptor).toString();
    }

    private class LogRecordHandler
    extends Handler {
        private final @UnknownKeyFor @NonNull @Initialized boolean logMdc;

        LogRecordHandler(boolean logMdc) {
            this.logMdc = logMdc;
        }

        @Override
        public void publish(@UnknownKeyFor @NonNull @Initialized LogRecord record) {
            Map mdc;
            String transformId;
            ExecutionStateSampler.ExecutionStateTracker stateTracker;
            String loggerName;
            Throwable thrown;
            BeamFnApi.LogEntry.Severity.Enum severity = (BeamFnApi.LogEntry.Severity.Enum)((Object)LOG_LEVEL_MAP.get((Object)record.getLevel()));
            if (severity == null) {
                return;
            }
            BeamFnApi.LogEntry.Builder builder = BeamFnApi.LogEntry.newBuilder().setSeverity(severity).setMessage(this.getFormatter().formatMessage(record)).setThread(Integer.toString(record.getThreadID())).setTimestamp(Timestamp.newBuilder().setSeconds(record.getMillis() / 1000L).setNanos((int)(record.getMillis() % 1000L) * 1000000));
            String instructionId = BeamFnLoggingMDC.getInstructionId();
            if (instructionId != null) {
                builder.setInstructionId(instructionId);
            }
            if ((thrown = record.getThrown()) != null) {
                builder.setTrace(Throwables.getStackTraceAsString((Throwable)thrown));
            }
            if ((loggerName = record.getLoggerName()) != null) {
                builder.setLogLocation(loggerName);
            }
            if ((stateTracker = BeamFnLoggingMDC.getStateTracker()) != null && (transformId = stateTracker.getCurrentThreadsPTransformId()) != null) {
                builder.setTransformId(transformId);
            }
            if (this.logMdc && (mdc = MDC.getCopyOfContextMap()) != null) {
                Struct.Builder customDataBuilder = builder.getCustomDataBuilder();
                mdc.forEach((k, v) -> customDataBuilder.putFields(k, Value.newBuilder().setStringValue(v).build()));
            }
            if (Thread.currentThread() != BeamFnLoggingClient.this.logEntryHandlerThread) {
                try {
                    BeamFnLoggingClient.this.bufferedLogEntries.put(builder.build());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            } else {
                this.dropIfBufferFull(builder.build());
            }
        }

        private @UnknownKeyFor @NonNull @Initialized boolean dropIfBufferFull( @UnknownKeyFor @NonNull @Initialized BeamFnApi.LogEntry logEntry) {
            return BeamFnLoggingClient.this.bufferedLogEntries.offer(logEntry);
        }

        @Override
        public void flush() {
        }

        @Override
        public synchronized void close() {
        }
    }

    private static class StreamWriter {
        private final @UnknownKeyFor @NonNull @Initialized ManagedChannel channel;
        private final @UnknownKeyFor @NonNull @Initialized StreamObserver< @UnknownKeyFor @NonNull @Initialized BeamFnApi.LogEntry.List> outboundObserver;
        private final @UnknownKeyFor @NonNull @Initialized StreamWriter. @UnknownKeyFor @NonNull @Initialized LogControlObserver inboundObserver;
        private final @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @NonNull @Initialized Object> inboundObserverCompletion;
        private final @UnknownKeyFor @NonNull @Initialized AdvancingPhaser streamPhaser;
        private final @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @NonNull @Initialized Object> softClosing = new CompletableFuture();

        public StreamWriter(@UnknownKeyFor @NonNull @Initialized ManagedChannel channel) {
            this.inboundObserverCompletion = new CompletableFuture();
            this.streamPhaser = new AdvancingPhaser(1);
            this.channel = channel;
            BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub((Channel)channel);
            this.inboundObserver = new LogControlObserver();
            this.outboundObserver = new DirectStreamObserver<BeamFnApi.LogEntry.List>(this.streamPhaser, (CallStreamObserver)stub.logging((StreamObserver<BeamFnApi.LogControl>)this.inboundObserver));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void drainQueueToStream(@UnknownKeyFor @NonNull @Initialized BlockingQueue< @UnknownKeyFor @NonNull @Initialized BeamFnApi.LogEntry> bufferedLogEntries) {
            Throwable thrown = null;
            try {
                ArrayList additionalLogEntries = new ArrayList(10000);
                while (!this.streamPhaser.isTerminated()) {
                    BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1L, TimeUnit.SECONDS);
                    if (logEntry == null) {
                        if (!this.softClosing.isDone()) continue;
                        break;
                    }
                    BeamFnApi.LogEntry.List.Builder builder = BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
                    bufferedLogEntries.drainTo(additionalLogEntries);
                    builder.addAllLogEntries(additionalLogEntries);
                    this.outboundObserver.onNext((Object)builder.build());
                    additionalLogEntries.clear();
                }
                if (this.inboundObserverCompletion.isDone()) {
                    try {
                        this.inboundObserverCompletion.get();
                        throw new IllegalStateException("Logging stream terminated unexpectedly with success before it was closed by the client.");
                    }
                    catch (ExecutionException e) {
                        throw new IllegalStateException("Logging stream terminated unexpectedly before it was closed by the client with error: " + e.getCause());
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
            }
            catch (Throwable t2) {
                thrown = t2;
                throw new RuntimeException(t2);
            }
            finally {
                if (thrown == null) {
                    this.outboundObserver.onCompleted();
                } else {
                    this.outboundObserver.onError(thrown);
                }
                this.channel.shutdown();
                boolean shutdownFinished = false;
                try {
                    shutdownFinished = this.channel.awaitTermination(10L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                finally {
                    if (!shutdownFinished) {
                        this.channel.shutdownNow();
                    }
                }
            }
        }

        public void softClose() {
            this.softClosing.complete(COMPLETED);
        }

        public void hardClose() {
            this.streamPhaser.forceTermination();
        }

        private class LogControlObserver
        implements ClientResponseObserver<BeamFnApi.LogEntry, BeamFnApi.LogControl> {
            private LogControlObserver() {
            }

            public void beforeStart(@UnknownKeyFor @NonNull @Initialized ClientCallStreamObserver< @UnknownKeyFor @NonNull @Initialized BeamFnApi.LogEntry> requestStream) {
                requestStream.setOnReadyHandler(StreamWriter.this.streamPhaser::arrive);
            }

            public void onNext( @UnknownKeyFor @NonNull @Initialized BeamFnApi.LogControl value) {
            }

            public void onError(@UnknownKeyFor @NonNull @Initialized Throwable t2) {
                StreamWriter.this.inboundObserverCompletion.completeExceptionally(t2);
                StreamWriter.this.hardClose();
            }

            public void onCompleted() {
                StreamWriter.this.inboundObserverCompletion.complete(COMPLETED);
                StreamWriter.this.hardClose();
            }
        }
    }
}

