/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.status;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.status.MemoryMonitor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnStatusClient
implements AutoCloseable {
    private static final @UnknownKeyFor @NonNull @Initialized Object COMPLETED = new Object();
    private final @UnknownKeyFor @NonNull @Initialized StreamObserver< @UnknownKeyFor @NonNull @Initialized BeamFnApi.WorkerStatusResponse> outboundObserver;
    private final @UnknownKeyFor @NonNull @Initialized ProcessBundleHandler.BundleProcessorCache processBundleCache;
    private final @UnknownKeyFor @NonNull @Initialized ManagedChannel channel;
    private final @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @NonNull @Initialized Object> inboundObserverCompletion;
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BeamFnStatusClient.class);
    private final @UnknownKeyFor @NonNull @Initialized MemoryMonitor memoryMonitor;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> cache;

    public BeamFnStatusClient(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized Function<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized ManagedChannel> channelFactory, @UnknownKeyFor @NonNull @Initialized ProcessBundleHandler.BundleProcessorCache processBundleCache, @UnknownKeyFor @NonNull @Initialized PipelineOptions options, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> cache) {
        this.channel = channelFactory.apply(apiServiceDescriptor);
        this.outboundObserver = BeamFnWorkerStatusGrpc.newStub((Channel)this.channel).workerStatus(new InboundObserver());
        this.processBundleCache = processBundleCache;
        this.memoryMonitor = MemoryMonitor.fromOptions(options);
        this.cache = cache;
        this.inboundObserverCompletion = new CompletableFuture();
        Thread thread = new Thread(this.memoryMonitor);
        thread.setDaemon(true);
        thread.setPriority(1);
        thread.setName("MemoryMonitor");
        thread.start();
    }

    @Override
    public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
        try {
            Object completion = this.inboundObserverCompletion.get(1L, TimeUnit.MINUTES);
            if (completion != COMPLETED) {
                LOG.warn("InboundObserver for BeamFnStatusClient completed with exception.");
            }
        }
        finally {
            this.channel.shutdown();
            if (!this.channel.awaitTermination(10L, TimeUnit.SECONDS)) {
                this.channel.shutdownNow();
            }
        }
    }

    @UnknownKeyFor @NonNull @Initialized String getThreadDump() {
        StringJoiner trace = new StringJoiner("\n");
        trace.add("========== THREAD DUMP ==========");
        HashMap stacks = new HashMap();
        Thread.getAllStackTraces().forEach((thread, elements) -> {
            if (thread != Thread.currentThread()) {
                Stack stack = new Stack((StackTraceElement[])elements, thread.getState());
                stacks.putIfAbsent(stack, new ArrayList());
                ((List)stacks.get(stack)).add(thread.toString());
            }
        });
        stacks.entrySet().stream().sorted(Comparator.comparingInt(entry -> -((List)entry.getValue()).size())).forEachOrdered(entry -> {
            Stack stack = (Stack)entry.getKey();
            List threads = (List)entry.getValue();
            trace.add(String.format("---- Threads (%d): %s State: %s Stack: ----", new Object[]{threads.size(), threads, stack.state}));
            Arrays.stream(stack.elements).map(StackTraceElement::toString).forEach(trace::add);
            trace.add("\n");
        });
        return trace.toString();
    }

    @UnknownKeyFor @NonNull @Initialized String getMemoryUsage() {
        StringJoiner memory = new StringJoiner("\n");
        memory.add("========== MEMORY USAGE ==========");
        memory.add(this.memoryMonitor.describeMemory());
        return memory.toString();
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized String getCacheStats() {
        StringJoiner cacheStats = new StringJoiner("\n");
        cacheStats.add("========== CACHE STATS ==========");
        cacheStats.add(this.cache.describeStats());
        return cacheStats.toString();
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized String getActiveProcessBundleState() {
        StringJoiner activeBundlesState = new StringJoiner("\n");
        activeBundlesState.add("========== ACTIVE PROCESSING BUNDLES ==========");
        if (this.processBundleCache.getActiveBundleProcessors().isEmpty()) {
            activeBundlesState.add("No active processing bundles.");
        } else {
            ArrayList bundleStates = new ArrayList();
            this.processBundleCache.getActiveBundleProcessors().entrySet().stream().forEach(instructionAndBundleProcessor -> {
                ProcessBundleHandler.BundleProcessor bundleProcessor = (ProcessBundleHandler.BundleProcessor)instructionAndBundleProcessor.getValue();
                ExecutionStateSampler.ExecutionStateTrackerStatus executionStateTrackerStatus = bundleProcessor.getStateTracker().getStatus();
                if (executionStateTrackerStatus != null) {
                    bundleStates.add(new BundleState((String)instructionAndBundleProcessor.getKey(), executionStateTrackerStatus.getTrackedThread().getName(), DateTimeUtils.currentTimeMillis() - executionStateTrackerStatus.getLastTransitionTimeMillis()));
                }
            });
            bundleStates.stream().sorted(Comparator.comparing(BundleState::getTimeSinceTransition).reversed()).limit(10L).forEachOrdered(bundleState -> {
                activeBundlesState.add(String.format("---- Instruction %s ----", bundleState.getInstruction()));
                activeBundlesState.add(String.format("Tracked thread: %s", bundleState.getTrackedThreadName()));
                activeBundlesState.add(String.format("Time since transition: %.2f seconds%n", (double)bundleState.getTimeSinceTransition() / 1000.0));
            });
        }
        return activeBundlesState.toString();
    }

    private class InboundObserver
    implements StreamObserver<BeamFnApi.WorkerStatusRequest> {
        private InboundObserver() {
        }

        public void onNext( @UnknownKeyFor @NonNull @Initialized BeamFnApi.WorkerStatusRequest workerStatusRequest) {
            StringJoiner status = new StringJoiner("\n");
            status.add(BeamFnStatusClient.this.getMemoryUsage());
            status.add("\n");
            status.add(BeamFnStatusClient.this.getCacheStats());
            status.add("\n");
            status.add(BeamFnStatusClient.this.getActiveProcessBundleState());
            status.add("\n");
            status.add(BeamFnStatusClient.this.getThreadDump());
            BeamFnStatusClient.this.outboundObserver.onNext((Object)BeamFnApi.WorkerStatusResponse.newBuilder().setId(workerStatusRequest.getId()).setStatusInfo(status.toString()).build());
        }

        public void onError(@UnknownKeyFor @NonNull @Initialized Throwable t2) {
            LOG.error("Error getting SDK harness status", t2);
            BeamFnStatusClient.this.inboundObserverCompletion.completeExceptionally(t2);
        }

        public void onCompleted() {
            BeamFnStatusClient.this.inboundObserverCompletion.complete(COMPLETED);
        }
    }

    static class BundleState {
        final @UnknownKeyFor @NonNull @Initialized String instruction;
        final @UnknownKeyFor @NonNull @Initialized String trackedThreadName;
        final @UnknownKeyFor @NonNull @Initialized long timeSinceTransition;

        public @UnknownKeyFor @NonNull @Initialized String getInstruction() {
            return this.instruction;
        }

        public @UnknownKeyFor @NonNull @Initialized String getTrackedThreadName() {
            return this.trackedThreadName;
        }

        public @UnknownKeyFor @NonNull @Initialized long getTimeSinceTransition() {
            return this.timeSinceTransition;
        }

        public BundleState(@UnknownKeyFor @NonNull @Initialized String instruction, @UnknownKeyFor @NonNull @Initialized String trackedThreadName, @UnknownKeyFor @NonNull @Initialized long timeSinceTransition) {
            this.instruction = instruction;
            this.trackedThreadName = trackedThreadName;
            this.timeSinceTransition = timeSinceTransition;
        }
    }

    static class Stack {
        final @UnknownKeyFor @NonNull @Initialized StackTraceElement @UnknownKeyFor @NonNull @Initialized [] elements;
        final @UnknownKeyFor @NonNull @Initialized Thread.State state;

        Stack(@UnknownKeyFor @NonNull @Initialized StackTraceElement @UnknownKeyFor @NonNull @Initialized [] elements, @UnknownKeyFor @NonNull @Initialized Thread.State state) {
            this.elements = elements;
            this.state = state;
        }

        @Pure
        public @UnknownKeyFor @NonNull @Initialized int hashCode() {
            return Objects.hash(new Object[]{Arrays.deepHashCode(this.elements), this.state});
        }

        @EnsuresNonNullIf(expression={"#1"}, result=true)
        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean equals(@Nullable @UnknownKeyFor @Initialized Object other) {
            if (other == this) {
                return true;
            }
            if (!(other instanceof Stack)) {
                return false;
            }
            Stack that = (Stack)other;
            return this.state == that.state && Arrays.deepEquals(this.elements, that.elements);
        }
    }
}

