/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.util.Collections;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalWorkerService
extends BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase
implements FnService {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ExternalWorkerService.class);
    private static final @UnknownKeyFor @NonNull @Initialized String PIPELINE_OPTIONS_ENV_VAR = "PIPELINE_OPTIONS";
    private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
    private final @UnknownKeyFor @NonNull @Initialized ServerFactory serverFactory = ServerFactory.createDefault();

    public ExternalWorkerService(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        this.options = options;
    }

    @Override
    public void startWorker( @UnknownKeyFor @NonNull @Initialized BeamFnApi.StartWorkerRequest request, @UnknownKeyFor @NonNull @Initialized StreamObserver< @UnknownKeyFor @NonNull @Initialized BeamFnApi.StartWorkerResponse> responseObserver) {
        LOG.info("Starting worker {} pointing at {}.", (Object)request.getWorkerId(), (Object)request.getControlEndpoint().getUrl());
        LOG.debug("Worker request {}.", (Object)request);
        Thread th = new Thread(() -> {
            try {
                FnHarness.main(request.getWorkerId(), this.options, Collections.emptySet(), request.getLoggingEndpoint(), request.getControlEndpoint(), null);
                LOG.info("Successfully started worker {}.", (Object)request.getWorkerId());
            }
            catch (Exception exn) {
                LOG.error(String.format("Failed to start worker %s.", request.getWorkerId()), (Throwable)exn);
            }
        });
        th.setName("SDK-worker-" + request.getWorkerId());
        th.setDaemon(true);
        th.start();
        responseObserver.onNext((Object)BeamFnApi.StartWorkerResponse.newBuilder().build());
        responseObserver.onCompleted();
    }

    @Override
    public void stopWorker( @UnknownKeyFor @NonNull @Initialized BeamFnApi.StopWorkerRequest request, @UnknownKeyFor @NonNull @Initialized StreamObserver< @UnknownKeyFor @NonNull @Initialized BeamFnApi.StopWorkerResponse> responseObserver) {
        responseObserver.onNext((Object)BeamFnApi.StopWorkerResponse.newBuilder().build());
        responseObserver.onCompleted();
    }

    @Override
    public void close() {
    }

    public @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized ExternalWorkerService> start() throws @UnknownKeyFor @NonNull @Initialized Exception {
        String externalServiceAddress = Environments.getExternalServiceAddress((PortablePipelineOptions)this.options.as(PortablePipelineOptions.class));
        GrpcFnServer<ExternalWorkerService> server = externalServiceAddress.isEmpty() ? GrpcFnServer.allocatePortAndCreateFor(this, this.serverFactory) : GrpcFnServer.create(this, Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(), this.serverFactory);
        LOG.debug("Listening for worker start requests at {}.", (Object)server.getApiServiceDescriptor().getUrl());
        return server;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) {
        LOG.info("Starting external worker service");
        String optionsEnv = (String)Preconditions.checkArgumentNotNull((Object)System.getenv(PIPELINE_OPTIONS_ENV_VAR), (Object)"No pipeline options provided in environment variables PIPELINE_OPTIONS");
        LOG.info("Pipeline options {}", (Object)optionsEnv);
        PipelineOptions options = PipelineOptionsTranslation.fromJson(optionsEnv);
        try (GrpcFnServer<ExternalWorkerService> server = new ExternalWorkerService(options).start();){
            LOG.info("External worker service started at address: {}", (Object)server.getApiServiceDescriptor().getUrl());
            Sleeper.DEFAULT.sleep(Long.MAX_VALUE);
        }
        catch (Exception e) {
            LOG.error("Error running worker service", (Throwable)e);
        }
        finally {
            LOG.info("External worker service stopped.");
        }
    }
}

