/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.prism;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.Arrays;
import org.apache.beam.runners.portability.PortableRunner;
import org.apache.beam.runners.prism.PrismExecutor;
import org.apache.beam.runners.prism.PrismLocator;
import org.apache.beam.runners.prism.PrismPipelineOptions;
import org.apache.beam.runners.prism.PrismPipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrismRunner
extends PipelineRunner<PipelineResult> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(PrismRunner.class);
    private final @UnknownKeyFor @NonNull @Initialized PrismPipelineOptions prismPipelineOptions;

    protected PrismRunner(@UnknownKeyFor @NonNull @Initialized PrismPipelineOptions prismPipelineOptions) {
        this.prismPipelineOptions = prismPipelineOptions;
    }

    @UnknownKeyFor @NonNull @Initialized PrismPipelineOptions getPrismPipelineOptions() {
        return this.prismPipelineOptions;
    }

    public static @UnknownKeyFor @NonNull @Initialized PrismRunner fromOptions(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        PrismPipelineOptions prismPipelineOptions = (PrismPipelineOptions)options.as(PrismPipelineOptions.class);
        PrismRunner.validate(prismPipelineOptions);
        PrismRunner.assignDefaultsIfNeeded(prismPipelineOptions);
        return new PrismRunner(prismPipelineOptions);
    }

    private static void validate(@UnknownKeyFor @NonNull @Initialized PrismPipelineOptions options) {
        Preconditions.checkArgument((boolean)Strings.isNullOrEmpty((String)options.getJobEndpoint()), (Object)"when specifying --jobEndpoint, use --runner=PortableRunner instead");
    }

    public @UnknownKeyFor @NonNull @Initialized PipelineResult run(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) {
        LOG.info("running Pipeline using {}: defaultEnvironmentType: {}, jobEndpoint: {}", new Object[]{PortableRunner.class.getName(), this.prismPipelineOptions.getDefaultEnvironmentType(), this.prismPipelineOptions.getJobEndpoint()});
        try {
            PrismExecutor executor = this.startPrism();
            PortableRunner delegate = PortableRunner.fromOptions((PipelineOptions)this.prismPipelineOptions);
            return new PrismPipelineResult(delegate.run(pipeline), executor::stop);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @UnknownKeyFor @NonNull @Initialized PrismExecutor startPrism() throws @UnknownKeyFor @NonNull @Initialized IOException {
        PrismLocator locator = new PrismLocator(this.prismPipelineOptions);
        int port = PrismRunner.findAvailablePort();
        String portFlag = String.format("-job_port=%s", port);
        String serveHttpFlag = String.format("-serve_http=%s", this.prismPipelineOptions.getEnableWebUI());
        String idleShutdownTimeoutFlag = String.format("-idle_shutdown_timeout=%s", this.prismPipelineOptions.getIdleShutdownTimeout());
        String endpoint = "localhost:" + port;
        this.prismPipelineOptions.setJobEndpoint(endpoint);
        String command = locator.resolve();
        PrismExecutor executor = PrismExecutor.builder().setCommand(command).setArguments(Arrays.asList(portFlag, serveHttpFlag, idleShutdownTimeoutFlag)).build();
        executor.execute();
        Preconditions.checkState((boolean)executor.isAlive());
        return executor;
    }

    private static void assignDefaultsIfNeeded(@UnknownKeyFor @NonNull @Initialized PrismPipelineOptions prismPipelineOptions) {
        if (Strings.isNullOrEmpty((String)prismPipelineOptions.getDefaultEnvironmentType())) {
            prismPipelineOptions.setDefaultEnvironmentType("LOOPBACK");
        }
    }

    private static @UnknownKeyFor @NonNull @Initialized int findAvailablePort() throws @UnknownKeyFor @NonNull @Initialized IOException {
        try (ServerSocket socket = new ServerSocket(0);){
            int n = socket.getLocalPort();
            return n;
        }
    }
}

