/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.rpc.impl;

import com.antgroup.geaflow.cluster.client.PipelineResult;
import com.antgroup.geaflow.cluster.rpc.IDriverEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.AbstractRpcEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.RpcMessageEncoder;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.pipeline.IPipelineResult;
import com.antgroup.geaflow.pipeline.Pipeline;
import com.antgroup.geaflow.rpc.proto.Driver;
import com.antgroup.geaflow.rpc.proto.DriverServiceGrpc;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DriverEndpointRef
extends AbstractRpcEndpointRef
implements IDriverEndpointRef {
    private static final Logger LOGGER = LoggerFactory.getLogger(DriverEndpointRef.class);
    private DriverServiceGrpc.DriverServiceFutureStub stub;
    private DriverServiceGrpc.DriverServiceBlockingStub blockingStub;

    public DriverEndpointRef(String host, int port, ExecutorService executorService) {
        super(host, port, executorService);
    }

    @Override
    protected void createStub(ManagedChannel channel) {
        this.stub = DriverServiceGrpc.newFutureStub((Channel)channel);
        this.blockingStub = DriverServiceGrpc.newBlockingStub((Channel)channel);
    }

    @Override
    public IPipelineResult executePipeline(Pipeline pipeline) {
        this.ensureChannelAlive();
        LOGGER.info("send pipeline to driver, driver host:{}, port:{}. {}", new Object[]{this.host, this.port, pipeline});
        ByteString payload = RpcMessageEncoder.encode(pipeline);
        Iterator iterator = this.blockingStub.executePipeline(Driver.PipelineReq.newBuilder().setPayload(payload).build());
        if (!iterator.hasNext()) {
            throw new GeaflowRuntimeException("not found ack response");
        }
        iterator.next();
        return new PipelineResult(iterator);
    }

    @Override
    public void close() {
        this.ensureChannelAlive();
        this.blockingStub.close(Empty.newBuilder().build());
        super.close();
    }
}

