/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.logging;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.fn.harness.logging.RestoreBeamFnLoggingMDC;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Value;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientInterceptor;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.MDC;

@RunWith(value=JUnit4.class)
public class BeamFnLoggingClientTest {
    @Rule
    public TestRule restoreLogging = new RestoreBeamFnLoggingMDC();
    private static final LogRecord FILTERED_RECORD = new LogRecord(Level.SEVERE, "FilteredMessage");
    private static final LogRecord TEST_RECORD = new LogRecord(Level.FINE, "Message");
    private static final LogRecord TEST_RECORD_WITH_EXCEPTION;
    private static final BeamFnApi.LogEntry TEST_ENTRY;
    private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_CUSTOM_FORMATTER;
    private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_EXCEPTION;
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLogging() throws Exception {
        ExecutionStateSampler sampler = new ExecutionStateSampler(PipelineOptionsFactory.create(), null);
        ExecutionStateSampler.ExecutionStateTracker stateTracker = sampler.create();
        ExecutionStateSampler.ExecutionState state = stateTracker.create("shortId", "ptransformId", "ptransformIdName", "process");
        state.activate();
        BeamFnLoggingMDC.setInstructionId((String)"instruction-1");
        BeamFnLoggingMDC.setStateTracker((ExecutionStateSampler.ExecutionStateTracker)stateTracker);
        AtomicBoolean clientClosedStream = new AtomicBoolean();
        ConcurrentLinkedQueue values = new ConcurrentLinkedQueue();
        final AtomicReference outboundServerObserver = new AtomicReference();
        final CallStreamObserver inboundServerObserver = TestStreams.withOnNext(logEntries -> values.addAll(logEntries.getLogEntriesList())).withOnCompleted(() -> {
            clientClosedStream.set(true);
            ((StreamObserver)outboundServerObserver.get()).onCompleted();
        }).build();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnLoggingGrpc.BeamFnLoggingImplBase(){

            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> outboundObserver) {
                outboundServerObserver.set(outboundObserver);
                return inboundServerObserver;
            }
        })).build();
        server.start();
        ManagedChannel channel = InProcessChannelBuilder.forName((String)apiServiceDescriptor.getUrl()).build();
        try {
            BeamFnLoggingClient client = BeamFnLoggingClient.createAndStart((PipelineOptions)PipelineOptionsFactory.fromArgs((String[])new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), (Endpoints.ApiServiceDescriptor)apiServiceDescriptor, descriptor -> channel);
            Logger rootLogger = LogManager.getLogManager().getLogger("");
            Logger configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger");
            Assert.assertEquals(Level.OFF, rootLogger.getLevel());
            Assert.assertEquals(Level.FINE, configuredLogger.getLevel());
            rootLogger.log(FILTERED_RECORD);
            configuredLogger.log(TEST_RECORD);
            ExecutionStateSampler.ExecutionState errorState = stateTracker.create("shortId", "errorPtransformId", "errorPtransformIdName", "process");
            errorState.activate();
            configuredLogger.log(TEST_RECORD_WITH_EXCEPTION);
            errorState.deactivate();
            for (Handler handler : rootLogger.getHandlers()) {
                handler.setFormatter(new SimpleFormatter(){

                    @Override
                    public synchronized String formatMessage(LogRecord record) {
                        return MDC.get((String)"testMdcKey") + ":" + super.formatMessage(record);
                    }
                });
            }
            MDC.put((String)"testMdcKey", (String)"testMdcValue");
            configuredLogger.log(TEST_RECORD);
            client.close();
            Assert.assertEquals(Level.INFO, rootLogger.getLevel());
            Assert.assertNull(configuredLogger.getLevel());
            Assert.assertTrue(clientClosedStream.get());
            Assert.assertTrue(channel.isShutdown());
            MatcherAssert.assertThat(values, Matchers.contains(TEST_ENTRY, TEST_ENTRY_WITH_EXCEPTION, TEST_ENTRY_WITH_CUSTOM_FORMATTER));
        }
        finally {
            server.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception {
        BeamFnLoggingMDC.setInstructionId((String)"instruction-1");
        ConcurrentLinkedQueue values = new ConcurrentLinkedQueue();
        final AtomicReference outboundServerObserver = new AtomicReference();
        final CallStreamObserver inboundServerObserver = TestStreams.withOnNext(logEntries -> values.addAll(logEntries.getLogEntriesList())).build();
        Logger rootLogger = null;
        Logger configuredLogger = null;
        final CompletableFuture<Object> streamBlocker = new CompletableFuture<Object>();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnLoggingGrpc.BeamFnLoggingImplBase(){

            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> outboundObserver) {
                streamBlocker.join();
                outboundServerObserver.set(outboundObserver);
                outboundObserver.onError((Throwable)Status.INTERNAL.withDescription("TEST ERROR").asException());
                return inboundServerObserver;
            }
        })).build();
        server.start();
        ManagedChannel channel = InProcessChannelBuilder.forName((String)apiServiceDescriptor.getUrl()).build();
        try {
            BeamFnLoggingClient client = BeamFnLoggingClient.createAndStart((PipelineOptions)PipelineOptionsFactory.fromArgs((String[])new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), (Endpoints.ApiServiceDescriptor)apiServiceDescriptor, descriptor -> channel);
            rootLogger = LogManager.getLogManager().getLogger("");
            configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger");
            streamBlocker.complete(new Object());
            this.thrown.expectMessage("TEST ERROR");
            client.close();
        }
        catch (Throwable throwable) {
            Assert.assertNotNull("rootLogger should be initialized before exception", rootLogger);
            Assert.assertNotNull("configuredLogger should be initialized before exception", configuredLogger);
            Assert.assertEquals(Level.INFO, rootLogger.getLevel());
            Assert.assertNull(configuredLogger.getLevel());
            Assert.assertTrue(channel.isShutdown());
            server.shutdownNow();
            throw throwable;
        }
        Assert.assertNotNull("rootLogger should be initialized before exception", rootLogger);
        Assert.assertNotNull("configuredLogger should be initialized before exception", configuredLogger);
        Assert.assertEquals(Level.INFO, rootLogger.getLevel());
        Assert.assertNull(configuredLogger.getLevel());
        Assert.assertTrue(channel.isShutdown());
        server.shutdownNow();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWhenServerHangsUpEarlyThatClientIsAbleCleanup() throws Exception {
        BeamFnLoggingMDC.setInstructionId((String)"instruction-1");
        ConcurrentLinkedQueue values = new ConcurrentLinkedQueue();
        final AtomicReference outboundServerObserver = new AtomicReference();
        final CallStreamObserver inboundServerObserver = TestStreams.withOnNext(logEntries -> values.addAll(logEntries.getLogEntriesList())).build();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnLoggingGrpc.BeamFnLoggingImplBase(){

            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> outboundObserver) {
                outboundServerObserver.set(outboundObserver);
                outboundObserver.onCompleted();
                return inboundServerObserver;
            }
        })).build();
        server.start();
        this.thrown.expectMessage("Logging stream terminated unexpectedly");
        ManagedChannel channel = InProcessChannelBuilder.forName((String)apiServiceDescriptor.getUrl()).build();
        try {
            BeamFnLoggingClient client = BeamFnLoggingClient.createAndStart((PipelineOptions)PipelineOptionsFactory.fromArgs((String[])new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), (Endpoints.ApiServiceDescriptor)apiServiceDescriptor, descriptor -> channel);
            Logger rootLogger = LogManager.getLogManager().getLogger("");
            Logger configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger");
            client.close();
            Assert.assertEquals(Level.INFO, rootLogger.getLevel());
            Assert.assertNull(configuredLogger.getLevel());
        }
        finally {
            Assert.assertTrue(channel.isShutdown());
            server.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testClosableWhenBlockingForOnReady() throws Exception {
        BeamFnLoggingMDC.setInstructionId((String)"instruction-1");
        AtomicInteger testEntriesObserved = new AtomicInteger();
        final AtomicBoolean onReadyBlocking = new AtomicBoolean();
        final AtomicReference outboundServerObserver = new AtomicReference();
        final AtomicBoolean elementsAllowed = new AtomicBoolean(true);
        final CallStreamObserver inboundServerObserver = TestStreams.withOnNext(logEntries -> {
            for (BeamFnApi.LogEntry entry : logEntries.getLogEntriesList()) {
                if (!entry.toBuilder().clearCustomData().build().equals((Object)TEST_ENTRY)) continue;
                testEntriesObserved.addAndGet(1);
            }
        }).withOnCompleted(() -> ((StreamObserver)outboundServerObserver.get()).onCompleted()).build();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnLoggingGrpc.BeamFnLoggingImplBase(){

            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> outboundObserver) {
                outboundServerObserver.set(outboundObserver);
                return inboundServerObserver;
            }
        })).build();
        server.start();
        ManagedChannel channel = ((InProcessChannelBuilder)InProcessChannelBuilder.forName((String)apiServiceDescriptor.getUrl()).intercept(new ClientInterceptor[]{new ClientInterceptor(){

            public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
                ClientCall delegate = next.newCall(method, callOptions);
                return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(delegate){

                    public boolean isReady() {
                        if (elementsAllowed.get()) {
                            return true;
                        }
                        onReadyBlocking.set(true);
                        return elementsAllowed.get();
                    }
                };
            }
        }})).build();
        Logger rootLogger = null;
        Logger configuredLogger = null;
        try {
            BeamFnLoggingClient client = BeamFnLoggingClient.createAndStart((PipelineOptions)PipelineOptionsFactory.fromArgs((String[])new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), (Endpoints.ApiServiceDescriptor)apiServiceDescriptor, descriptor -> channel);
            rootLogger = LogManager.getLogManager().getLogger("");
            configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger");
            long numEntries = 2000L;
            int i = 0;
            while ((long)i < numEntries) {
                configuredLogger.log(TEST_RECORD);
                ++i;
            }
            int sleepTime = 0;
            while ((long)testEntriesObserved.get() < numEntries) {
                ++sleepTime;
                Thread.sleep(1L);
            }
            elementsAllowed.set(false);
            int postAllowedLogs = 0;
            while (!onReadyBlocking.get()) {
                ++postAllowedLogs;
                configuredLogger.log(TEST_RECORD);
                Thread.sleep(1L);
            }
            Thread.sleep(sleepTime * 3);
            Assert.assertTrue((long)testEntriesObserved.get() < numEntries + (long)postAllowedLogs);
            elementsAllowed.set(true);
            client.close();
            Assert.assertNotNull("rootLogger should be initialized before exception", rootLogger);
            Assert.assertNotNull("configuredLogger should be initialized before exception", rootLogger);
            Assert.assertEquals(Level.INFO, rootLogger.getLevel());
            Assert.assertNull(configuredLogger.getLevel());
            Assert.assertTrue(channel.isShutdown());
        }
        finally {
            server.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testServerCloseNotifiesTermination() throws Exception {
        BeamFnLoggingMDC.setInstructionId((String)"instruction-1");
        final AtomicReference outboundServerObserver = new AtomicReference();
        final CallStreamObserver inboundServerObserver = TestStreams.withOnNext(logEntries -> {}).withOnCompleted(() -> ((StreamObserver)outboundServerObserver.get()).onCompleted()).build();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnLoggingGrpc.BeamFnLoggingImplBase(){

            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> outboundObserver) {
                outboundServerObserver.set(outboundObserver);
                outboundObserver.onCompleted();
                return inboundServerObserver;
            }
        })).build();
        server.start();
        ManagedChannel channel = InProcessChannelBuilder.forName((String)apiServiceDescriptor.getUrl()).build();
        try {
            BeamFnLoggingClient client = BeamFnLoggingClient.createAndStart((PipelineOptions)PipelineOptionsFactory.fromArgs((String[])new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), (Endpoints.ApiServiceDescriptor)apiServiceDescriptor, descriptor -> channel);
            this.thrown.expectMessage("Logging stream terminated unexpectedly");
            client.terminationFuture().get();
        }
        finally {
            Assert.assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
            Assert.assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
            Assert.assertTrue(channel.isShutdown());
            server.shutdownNow();
        }
    }

    static {
        TEST_RECORD.setLoggerName("LoggerName");
        TEST_RECORD.setMillis(1234567890L);
        TEST_RECORD.setThreadID(12345);
        TEST_RECORD_WITH_EXCEPTION = new LogRecord(Level.WARNING, "MessageWithException");
        TEST_RECORD_WITH_EXCEPTION.setLoggerName("LoggerName");
        TEST_RECORD_WITH_EXCEPTION.setMillis(1234567890L);
        TEST_RECORD_WITH_EXCEPTION.setThreadID(12345);
        TEST_RECORD_WITH_EXCEPTION.setThrown(new RuntimeException("ExceptionMessage"));
        TEST_ENTRY = BeamFnApi.LogEntry.newBuilder().setInstructionId("instruction-1").setSeverity(BeamFnApi.LogEntry.Severity.Enum.DEBUG).setMessage("Message").setTransformId("ptransformId").setThread("12345").setTimestamp(Timestamp.newBuilder().setSeconds(1234567L).setNanos(890000000).build()).setLogLocation("LoggerName").build();
        TEST_ENTRY_WITH_CUSTOM_FORMATTER = BeamFnApi.LogEntry.newBuilder().setInstructionId("instruction-1").setSeverity(BeamFnApi.LogEntry.Severity.Enum.DEBUG).setMessage("testMdcValue:Message").setTransformId("ptransformId").setCustomData(Struct.newBuilder().putFields("testMdcKey", Value.newBuilder().setStringValue("testMdcValue").build())).setThread("12345").setTimestamp(Timestamp.newBuilder().setSeconds(1234567L).setNanos(890000000).build()).setLogLocation("LoggerName").build();
        TEST_ENTRY_WITH_EXCEPTION = BeamFnApi.LogEntry.newBuilder().setInstructionId("instruction-1").setSeverity(BeamFnApi.LogEntry.Severity.Enum.WARN).setMessage("MessageWithException").setTransformId("errorPtransformId").setTrace(Throwables.getStackTraceAsString((Throwable)TEST_RECORD_WITH_EXCEPTION.getThrown())).setThread("12345").setTimestamp(Timestamp.newBuilder().setSeconds(1234567L).setNanos(890000000).build()).setLogLocation("LoggerName").build();
    }
}

