/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.status;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.status.MemoryMonitor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnStatusClient
implements AutoCloseable {
    private static final Object COMPLETED = new Object();
    private final StreamObserver<BeamFnApi.WorkerStatusResponse> outboundObserver;
    private final ProcessBundleHandler.BundleProcessorCache processBundleCache;
    private final ManagedChannel channel;
    private final CompletableFuture<Object> inboundObserverCompletion;
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnStatusClient.class);
    private final MemoryMonitor memoryMonitor;

    public BeamFnStatusClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, ProcessBundleHandler.BundleProcessorCache processBundleCache, PipelineOptions options) {
        this.channel = channelFactory.apply(apiServiceDescriptor);
        this.outboundObserver = BeamFnWorkerStatusGrpc.newStub(this.channel).workerStatus(new InboundObserver());
        this.processBundleCache = processBundleCache;
        this.memoryMonitor = MemoryMonitor.fromOptions(options);
        this.inboundObserverCompletion = new CompletableFuture();
        Thread thread = new Thread(this.memoryMonitor);
        thread.setDaemon(true);
        thread.setPriority(1);
        thread.setName("MemoryMonitor");
        thread.start();
    }

    @Override
    public void close() throws Exception {
        try {
            Object completion = this.inboundObserverCompletion.get(1L, TimeUnit.MINUTES);
            if (completion != COMPLETED) {
                LOG.warn("InboundObserver for BeamFnStatusClient completed with exception.");
            }
        }
        finally {
            this.channel.shutdown();
            if (!this.channel.awaitTermination(10L, TimeUnit.SECONDS)) {
                this.channel.shutdownNow();
            }
        }
    }

    String getThreadDump() {
        StringJoiner trace = new StringJoiner("\n");
        trace.add("========== THREAD DUMP ==========");
        HashMap stacks = new HashMap();
        Thread.getAllStackTraces().forEach((thread, elements) -> {
            if (thread != Thread.currentThread()) {
                Stack stack = new Stack((StackTraceElement[])elements, thread.getState());
                stacks.putIfAbsent(stack, new ArrayList());
                ((List)stacks.get(stack)).add(thread.toString());
            }
        });
        stacks.entrySet().stream().sorted(Comparator.comparingInt(entry -> -((List)entry.getValue()).size())).forEachOrdered(entry -> {
            Stack stack = (Stack)entry.getKey();
            List threads = (List)entry.getValue();
            trace.add(String.format("---- Threads (%d): %s State: %s Stack: ----", new Object[]{threads.size(), threads, stack.state}));
            Arrays.stream(stack.elements).map(StackTraceElement::toString).forEach(trace::add);
            trace.add("\n");
        });
        return trace.toString();
    }

    String getMemoryUsage() {
        StringJoiner memory = new StringJoiner("\n");
        memory.add("========== MEMORY USAGE ==========");
        memory.add(this.memoryMonitor.describeMemory());
        return memory.toString();
    }

    @VisibleForTesting
    String getActiveProcessBundleState() {
        StringJoiner activeBundlesState = new StringJoiner("\n");
        activeBundlesState.add("========== ACTIVE PROCESSING BUNDLES ==========");
        if (this.processBundleCache.getActiveBundleProcessors().isEmpty()) {
            activeBundlesState.add("No active processing bundles.");
        } else {
            ArrayList bundleStates = new ArrayList();
            this.processBundleCache.getActiveBundleProcessors().keySet().stream().forEach(instruction -> {
                ExecutionStateTracker executionStateTracker;
                Thread trackedTread;
                ProcessBundleHandler.BundleProcessor bundleProcessor = this.processBundleCache.find((String)instruction);
                if (bundleProcessor != null && (trackedTread = (executionStateTracker = bundleProcessor.getStateTracker()).getTrackedThread()) != null) {
                    bundleStates.add(new BundleState((String)instruction, trackedTread.getName(), executionStateTracker.getMillisSinceLastTransition()));
                }
            });
            bundleStates.stream().sorted(Comparator.comparing(BundleState::getTimeSinceTransition).reversed()).limit(10L).forEachOrdered(bundleState -> {
                activeBundlesState.add(String.format("---- Instruction %s ----", bundleState.getInstruction()));
                activeBundlesState.add(String.format("Tracked thread: %s", bundleState.getTrackedThreadName()));
                activeBundlesState.add(String.format("Time since transition: %.2f seconds%n", (double)bundleState.getTimeSinceTransition() / 1000.0));
            });
        }
        return activeBundlesState.toString();
    }

    private class InboundObserver
    implements StreamObserver<BeamFnApi.WorkerStatusRequest> {
        private InboundObserver() {
        }

        @Override
        public void onNext(BeamFnApi.WorkerStatusRequest workerStatusRequest) {
            StringJoiner status = new StringJoiner("\n");
            status.add(BeamFnStatusClient.this.getMemoryUsage());
            status.add("\n");
            status.add(BeamFnStatusClient.this.getActiveProcessBundleState());
            status.add("\n");
            status.add(BeamFnStatusClient.this.getThreadDump());
            BeamFnStatusClient.this.outboundObserver.onNext(BeamFnApi.WorkerStatusResponse.newBuilder().setId(workerStatusRequest.getId()).setStatusInfo(status.toString()).build());
        }

        @Override
        public void onError(Throwable t) {
            LOG.error("Error getting SDK harness status", t);
            BeamFnStatusClient.this.inboundObserverCompletion.completeExceptionally(t);
        }

        @Override
        public void onCompleted() {
            BeamFnStatusClient.this.inboundObserverCompletion.complete(COMPLETED);
        }
    }

    static class BundleState {
        final String instruction;
        final String trackedThreadName;
        final long timeSinceTransition;

        public String getInstruction() {
            return this.instruction;
        }

        public String getTrackedThreadName() {
            return this.trackedThreadName;
        }

        public long getTimeSinceTransition() {
            return this.timeSinceTransition;
        }

        public BundleState(String instruction, String trackedThreadName, long timeSinceTransition) {
            this.instruction = instruction;
            this.trackedThreadName = trackedThreadName;
            this.timeSinceTransition = timeSinceTransition;
        }
    }

    static class Stack {
        final StackTraceElement[] elements;
        final Thread.State state;

        Stack(StackTraceElement[] elements, Thread.State state) {
            this.elements = elements;
            this.state = state;
        }

        public int hashCode() {
            return Objects.hash(new Object[]{Arrays.deepHashCode(this.elements), this.state});
        }

        public boolean equals(@Nullable Object other) {
            if (other == this) {
                return true;
            }
            if (!(other instanceof Stack)) {
                return false;
            }
            Stack that = (Stack)other;
            return this.state == that.state && Arrays.deepEquals(this.elements, that.elements);
        }
    }
}

