/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.e2e.common.container.seatunnel;

import com.google.auto.service.AutoService;
import groovy.lang.Tuple2;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.compress.utils.Lists;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;

@AutoService(value={TestContainer.class})
public class SeaTunnelContainer
extends AbstractTestContainer {
    private static final Logger log = LoggerFactory.getLogger(SeaTunnelContainer.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    protected static final String JDK_DOCKER_IMAGE = "openjdk:8";
    private static final String CLIENT_SHELL = "seatunnel.sh";
    protected static final String SERVER_SHELL = "seatunnel-cluster.sh";
    protected static final String CONNECTOR_CHECK_SHELL = "seatunnel-connector.sh";
    protected GenericContainer<?> server;
    private final AtomicInteger runningCount = new AtomicInteger();

    @Override
    public void startUp() throws Exception {
        FileUtils.createNewDir((String)HOST_VOLUME_MOUNT_PATH);
        this.server = this.createSeaTunnelServer();
    }

    public void startUp(Network NETWORK) throws Exception {
        this.server = this.createSeaTunnelServer(NETWORK);
    }

    private GenericContainer<?> createSeaTunnelServer() throws IOException, InterruptedException {
        return this.createSeaTunnelServer(NETWORK);
    }

    private GenericContainer<?> createSeaTunnelServer(Network NETWORK) throws IOException, InterruptedException {
        GenericContainer server = ((GenericContainer)new GenericContainer(this.getDockerImage()).withNetwork(NETWORK).withEnv("TZ", "UTC").withCommand(this.buildStartCommand()).withNetworkAliases(new String[]{"server"}).withExposedPorts(new Integer[0]).withFileSystemBind("/tmp", "/opt/hive")).withLogConsumer((Consumer)new Slf4jLogConsumer(DockerLoggerFactory.getLogger((String)"seatunnel-engine:openjdk:8"))).withFileSystemBind(HOST_VOLUME_MOUNT_PATH, "/tmp/seatunnel_mnt", BindMode.READ_WRITE).waitingFor((WaitStrategy)Wait.forLogMessage((String)".*received new worker register:.*", (int)1));
        this.copySeaTunnelStarterToContainer(server);
        server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
        server.withCopyFileToContainer(MountableFile.forHostPath((String)(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/")), Paths.get("/tmp/seatunnel/", "config").toString());
        server.withCopyFileToContainer(MountableFile.forHostPath((String)(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar")), Paths.get("/tmp/seatunnel/", "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
        this.executeExtraCommands(server);
        server.start();
        return server;
    }

    protected String[] buildStartCommand() {
        return new String[]{ContainerUtil.adaptPathForWin(Paths.get("/tmp/seatunnel/", "bin", SERVER_SHELL).toString())};
    }

    protected GenericContainer<?> createSeaTunnelContainerWithFakeSourceAndInMemorySink(String configFilePath) throws IOException, InterruptedException {
        GenericContainer server = new GenericContainer(this.getDockerImage()).withNetwork(NETWORK).withEnv("TZ", "UTC").withCommand(ContainerUtil.adaptPathForWin(Paths.get("/tmp/seatunnel/", "bin", SERVER_SHELL).toString())).withNetworkAliases(new String[]{"server"}).withExposedPorts(new Integer[0]).withLogConsumer((Consumer)new Slf4jLogConsumer(DockerLoggerFactory.getLogger((String)"seatunnel-engine:openjdk:8"))).waitingFor((WaitStrategy)Wait.forLogMessage((String)".*received new worker register:.*", (int)1));
        this.copySeaTunnelStarterToContainer(server);
        server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
        server.setExposedPorts(Arrays.asList(5801, 8080));
        server.withCopyFileToContainer(MountableFile.forHostPath((String)(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/")), Paths.get("/tmp/seatunnel/", "config").toString());
        server.withCopyFileToContainer(MountableFile.forHostPath((String)configFilePath), Paths.get("/tmp/seatunnel/", "config", "seatunnel.yaml").toString());
        server.withCopyFileToContainer(MountableFile.forHostPath((String)(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar")), Paths.get("/tmp/seatunnel/", "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
        server.start();
        this.executeExtraCommands(server);
        File module = new File(ContainerUtil.PROJECT_ROOT_PATH + File.separator + this.getConnectorModulePath());
        List<File> connectorFiles = ContainerUtil.getConnectorFiles(module, Collections.singleton("connector-fake"), this.getConnectorNamePrefix());
        URL url = FileUtils.searchJarFiles((Path)Paths.get(ContainerUtil.PROJECT_ROOT_PATH + File.separator + "seatunnel-e2e/seatunnel-e2e-common/target", new String[0])).stream().filter(jar -> jar.toString().endsWith("-tests.jar")).findFirst().get();
        connectorFiles.add(new File(url.getFile()));
        connectorFiles.forEach(jar -> server.copyFileToContainer(MountableFile.forHostPath((String)jar.getAbsolutePath()), Paths.get("/tmp/seatunnel/", "connectors", jar.getName()).toString()));
        server.copyFileToContainer(MountableFile.forHostPath((String)(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties")), Paths.get("/tmp/seatunnel/", "connectors", "plugin-mapping.properties").toString());
        return server;
    }

    @Override
    public void tearDown() throws Exception {
        if (this.server != null) {
            this.server.execInContainer(new String[]{"rm", "-rf", "/tmp/seatunnel_mnt"});
            this.server.close();
        }
        FileUtils.deleteFile((String)HOST_VOLUME_MOUNT_PATH);
    }

    @Override
    protected String getDockerImage() {
        return JDK_DOCKER_IMAGE;
    }

    @Override
    protected String getStartModuleName() {
        return "seatunnel-starter";
    }

    @Override
    protected String getStartShellName() {
        return CLIENT_SHELL;
    }

    @Override
    protected String getConnectorModulePath() {
        return "seatunnel-connectors-v2";
    }

    @Override
    protected String getConnectorType() {
        return "seatunnel";
    }

    @Override
    protected String getConnectorNamePrefix() {
        return "connector-";
    }

    @Override
    protected List<String> getExtraStartShellCommands() {
        return Collections.emptyList();
    }

    @Override
    public TestContainerId identifier() {
        return TestContainerId.SEATUNNEL;
    }

    @Override
    protected String getSavePointCommand() {
        return "-s";
    }

    @Override
    protected String getCancelJobCommand() {
        return "-can";
    }

    @Override
    protected String getRestoreCommand() {
        return "-r";
    }

    @Override
    public void executeExtraCommands(ContainerExtendedFactory extendedFactory) throws IOException, InterruptedException {
        extendedFactory.extend(this.server);
    }

    @Override
    public Container.ExecResult executeConnectorCheck(String[] args) throws IOException, InterruptedException {
        ContainerUtil.copyAllConnectorJarToContainer(this.server, this.getConnectorModulePath(), this.getConnectorNamePrefix(), this.getConnectorType(), "/tmp/seatunnel/");
        ArrayList<String> command = new ArrayList<String>();
        String binPath = Paths.get("/tmp/seatunnel/", "bin", CONNECTOR_CHECK_SHELL).toString();
        command.add(ContainerUtil.adaptPathForWin(binPath));
        Arrays.stream(args).forEach(arg -> command.add((String)arg));
        return this.executeCommand(this.server, command);
    }

    @Override
    public Container.ExecResult executeBaseCommand(String[] args) throws IOException, InterruptedException {
        ArrayList<String> command = new ArrayList<String>();
        String binPath = Paths.get("/tmp/seatunnel/", "bin", this.getStartShellName()).toString();
        command.add(ContainerUtil.adaptPathForWin(binPath));
        Arrays.stream(args).forEach(arg -> command.add((String)arg));
        return this.executeCommand(this.server, command);
    }

    @Override
    public Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException {
        return this.executeJob(confFile, Lists.newArrayList());
    }

    @Override
    public Container.ExecResult executeJob(String confFile, List<String> variables) throws IOException, InterruptedException {
        return this.doExecuteJob(confFile, null, variables);
    }

    @Override
    public Container.ExecResult executeJob(String confFile, String jobId, String ... variables) throws IOException, InterruptedException {
        return this.doExecuteJob(confFile, jobId, variables != null ? Arrays.asList(variables) : null);
    }

    private Container.ExecResult doExecuteJob(String confFile, String jobId, List<String> variables) throws IOException, InterruptedException {
        log.info("test in container: {}", (Object)this.identifier());
        List<String> beforeThreads = ContainerUtil.getJVMThreadNames(this.server);
        this.runningCount.incrementAndGet();
        Container.ExecResult result = this.executeJob(this.server, confFile, jobId, variables);
        if (this.runningCount.decrementAndGet() > 0) {
            return result;
        }
        List<String> afterThreads = ContainerUtil.getJVMThreadNames(this.server);
        if ((afterThreads = this.removeSystemThread(beforeThreads, afterThreads)).isEmpty()) {
            return result;
        }
        Awaitility.await().atMost(120L, TimeUnit.SECONDS).untilAsserted(() -> {
            List<String> threads = ContainerUtil.getJVMThreadNames(this.server);
            List<String> finalAfterThreads = threads = this.removeSystemThread(beforeThreads, threads);
            Assertions.assertTrue((boolean)threads.isEmpty(), (String)("There are still threads running in the container: \n" + ContainerUtil.getJVMThreads(this.server).stream().filter(tuple2 -> finalAfterThreads.contains(tuple2.getV1())).map(Tuple2::getV2).map(str -> str + "\n").collect(Collectors.joining())));
        });
        return result;
    }

    private List<String> removeSystemThread(List<String> beforeThreads, List<String> afterThreads) throws IOException {
        afterThreads.removeIf(SeaTunnelContainer::isSystemThread);
        afterThreads.removeIf(beforeThreads::contains);
        Map<String, String> threadAndClassLoader = this.getThreadClassLoader();
        List<String> notSystemClassLoaderThread = threadAndClassLoader.entrySet().stream().filter(tc -> {
            if (((String)tc.getKey()).contains("process reaper")) {
                return false;
            }
            String classLoader = (String)tc.getValue();
            return !classLoader.contains("AppClassLoader") && !classLoader.equals("null");
        }).map(Map.Entry::getKey).collect(Collectors.toList());
        notSystemClassLoaderThread.addAll(afterThreads);
        notSystemClassLoaderThread.removeIf(this::isIssueWeAlreadyKnow);
        notSystemClassLoaderThread.removeIf(SeaTunnelContainer::isSystemThread);
        return notSystemClassLoaderThread;
    }

    private static boolean isSystemThread(String s) {
        Pattern aqsThread = Pattern.compile("pool-[0-9]-thread-[0-9]");
        return s.startsWith("hz.main") || s.startsWith("seatunnel-coordinator-service") || s.startsWith("GC task thread") || s.contains("CompilerThread") || s.startsWith("SeaTunnel-CompletableFuture-Thread-") || s.contains("NioNetworking-closeListenerExecutor") || s.contains("ForkJoinPool.commonPool") || s.contains("DestroyJavaVM") || s.contains("main-query-state-checker") || s.contains("Keep-Alive-SocketCleaner") || s.contains("process reaper") || s.startsWith("Timer-") || s.contains("InterruptTimer") || s.contains("Java2D Disposer") || s.contains("OkHttp ConnectionPool") || s.startsWith("http-report-event-scheduler") || s.startsWith("event-forwarder") || s.contains("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner") || s.startsWith("Log4j2-TF-") || aqsThread.matcher(s).matches() || s.startsWith("LeaseRenewer") || s.startsWith("org.apache.hadoop.hdfs.PeerCache") || s.startsWith("java-sdk-progress-listener-callback-thread");
    }

    private void classLoaderObjectCheck(Integer maxSize) throws IOException, InterruptedException {
        String className;
        Map<String, Integer> objects = ContainerUtil.getJVMLiveObject(this.server);
        if (objects.containsKey(className = "org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader") && objects.get(className) > maxSize) {
            Awaitility.await().atMost(20L, TimeUnit.SECONDS).untilAsserted(() -> {
                Map<String, Integer> newObjects = ContainerUtil.getJVMLiveObject(this.server);
                if (newObjects.containsKey(className)) {
                    Assertions.assertTrue((newObjects.get(className) <= maxSize ? 1 : 0) != 0, (String)"There are still SeaTunnelChildFirstClassLoader objects in the seatunnel server");
                }
            });
        }
    }

    private Map<String, String> getThreadClassLoader() throws IOException {
        HttpGet get = new HttpGet("http://localhost:5801/hazelcast/rest/maps/running-threads");
        try (CloseableHttpClient client = HttpClients.createDefault();){
            CloseableHttpResponse response = client.execute((HttpUriRequest)get);
            String threads = EntityUtils.toString((HttpEntity)response.getEntity());
            List value = (List)OBJECT_MAPPER.readValue(threads, (TypeReference)new TypeReference<List<Map<String, String>>>(){});
            Map<String, String> map2 = value.stream().collect(Collectors.toMap(map -> (String)map.get("threadName"), map -> (String)map.get("classLoader"), (a, b) -> a + " && " + b));
            return map2;
        }
    }

    protected boolean isIssueWeAlreadyKnow(String threadName) {
        return threadName.startsWith("ClickHouseClientWorker") || threadName.startsWith("Okio Watchdog") || threadName.startsWith("OkHttp TaskRunner") || threadName.startsWith("SessionExecutor") || threadName.startsWith("iceberg-worker-pool") || threadName.contains("oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser") || threadName.startsWith("AsyncAppender-Dispatcher-Thread") || threadName.startsWith("BufferPoolPruner") || threadName.startsWith("MaintenanceTimer") || threadName.startsWith("cluster-") || threadName.startsWith("iceberg") || threadName.contains("java-sdk-http-connection-reaper") || threadName.contains("Timer for 's3a-file-system' metrics system") || threadName.startsWith("MutableQuantiles-") || threadName.startsWith("Thread-") || threadName.startsWith("JNA Cleaner") || threadName.startsWith("grpc") || threadName.startsWith("AsyncOutputStream") || threadName.startsWith("MANIFEST-READ-THREAD-POOL");
    }

    @Override
    public Container.ExecResult savepointJob(String jobId) throws IOException, InterruptedException {
        return this.savepointJob(this.server, jobId);
    }

    @Override
    public Container.ExecResult restoreJob(String confFile, String jobId, String ... variables) throws IOException, InterruptedException {
        this.runningCount.incrementAndGet();
        Container.ExecResult result = this.restoreJob(this.server, confFile, jobId, variables != null ? Arrays.asList(variables) : null);
        this.runningCount.decrementAndGet();
        return result;
    }

    @Override
    public Container.ExecResult cancelJob(String jobId) throws IOException, InterruptedException {
        return this.cancelJob(this.server, jobId);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public String getJobStatus(String jobId) {
        HttpGet get = new HttpGet(String.format("http://%s:%d/job-info/%s", this.server.getHost(), this.server.getMappedPort(8080), jobId));
        try (CloseableHttpClient client = HttpClients.createDefault();){
            CloseableHttpResponse response = client.execute((HttpUriRequest)get);
            if (response.getStatusLine().getStatusCode() != 200) return null;
            String jobStatus = EntityUtils.toString((HttpEntity)response.getEntity());
            ObjectNode jsonNodes = JsonUtils.parseObject((String)jobStatus);
            if (!jsonNodes.has("jobStatus")) return null;
            String string = jsonNodes.get("jobStatus").asText();
            return string;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String getServerLogs() {
        return this.server.getLogs();
    }

    @Override
    public void copyFileToContainer(String path, String targetPath) {
        ContainerUtil.copyFileIntoContainers(ContainerUtil.getResourcesFile(path).toPath(), targetPath, this.server);
    }

    @Override
    public void copyAbsolutePathToContainer(String path, String targetPath) {
        ContainerUtil.copyFileIntoContainers(Paths.get(path, new String[0]), targetPath, this.server);
    }
}

