/*
 * Decompiled with CFR 0.152.
 */
package com.cloudera.livy.rsc;

import com.cloudera.livy.Job;
import com.cloudera.livy.JobContext;
import com.cloudera.livy.JobHandle;
import com.cloudera.livy.LivyClient;
import com.cloudera.livy.client.common.BufferUtils;
import com.cloudera.livy.rsc.BaseProtocol;
import com.cloudera.livy.rsc.BypassJobStatus;
import com.cloudera.livy.rsc.ContextInfo;
import com.cloudera.livy.rsc.FutureListener;
import com.cloudera.livy.rsc.JobHandleImpl;
import com.cloudera.livy.rsc.RSCConf;
import com.cloudera.livy.rsc.Utils;
import com.cloudera.livy.rsc.driver.AddJarJob;
import com.cloudera.livy.rsc.rpc.Rpc;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RSCClient
implements LivyClient {
    private static final Logger LOG = LoggerFactory.getLogger(RSCClient.class);
    private static final AtomicInteger EXECUTOR_GROUP_ID = new AtomicInteger();
    private final RSCConf conf;
    private final Map<String, JobHandleImpl<?>> jobs;
    private final ClientProtocol protocol;
    private final Promise<Rpc> driverRpc;
    private final int executorGroupId;
    private final EventLoopGroup eventLoopGroup;
    private ContextInfo contextInfo;
    private volatile boolean isAlive;

    RSCClient(RSCConf conf, Promise<ContextInfo> ctx) throws IOException {
        this.conf = conf;
        this.jobs = new ConcurrentHashMap();
        this.protocol = new ClientProtocol();
        this.driverRpc = ImmediateEventExecutor.INSTANCE.newPromise();
        this.executorGroupId = EXECUTOR_GROUP_ID.incrementAndGet();
        this.eventLoopGroup = new NioEventLoopGroup(conf.getInt(RSCConf.Entry.RPC_MAX_THREADS), Utils.newDaemonThreadFactory("RSCClient-" + this.executorGroupId + "-%d"));
        Utils.addListener(ctx, new FutureListener<ContextInfo>(){

            @Override
            public void onSuccess(ContextInfo info) throws Exception {
                RSCClient.this.connectToContext(info);
            }

            @Override
            public void onFailure(Throwable error) {
                RSCClient.this.connectionError(error);
            }
        });
        this.isAlive = true;
    }

    private synchronized void connectToContext(final ContextInfo info) throws Exception {
        this.contextInfo = info;
        try {
            Promise<Rpc> promise = Rpc.createClient(this.conf, this.eventLoopGroup, info.remoteAddress, info.remotePort, info.clientId, info.secret, this.protocol);
            Utils.addListener(promise, new FutureListener<Rpc>(){

                @Override
                public void onSuccess(Rpc rpc) throws Exception {
                    RSCClient.this.driverRpc.setSuccess((Object)rpc);
                    Utils.addListener(rpc.getChannel().closeFuture(), new FutureListener<Void>(){

                        @Override
                        public void onSuccess(Void unused) {
                            if (RSCClient.this.isAlive) {
                                LOG.warn("Client RPC channel closed unexpectedly.");
                                RSCClient.this.stop(false);
                            }
                        }
                    });
                    LOG.debug("Connected to context {} ({}, {}).", new Object[]{info.clientId, rpc.getChannel(), RSCClient.this.executorGroupId});
                }

                @Override
                public void onFailure(Throwable error) throws Exception {
                    RSCClient.this.driverRpc.setFailure(error);
                    RSCClient.this.connectionError(error);
                }
            });
        }
        catch (Exception e) {
            this.connectionError(e);
        }
    }

    private void connectionError(Throwable error) {
        LOG.error("Failed to connect to context.", error);
        this.stop(false);
    }

    private <T> Future<T> deferredCall(final Object msg, final Class<T> retType) {
        if (this.driverRpc.isSuccess()) {
            try {
                return ((Rpc)this.driverRpc.get()).call(msg, retType);
            }
            catch (Exception ie) {
                throw Utils.propagate(ie);
            }
        }
        final Promise promise = this.eventLoopGroup.next().newPromise();
        final FutureListener callListener = new FutureListener<T>(){

            @Override
            public void onSuccess(T value) throws Exception {
                promise.setSuccess(value);
            }

            @Override
            public void onFailure(Throwable error) throws Exception {
                promise.setFailure(error);
            }
        };
        Utils.addListener(this.driverRpc, new FutureListener<Rpc>(){

            @Override
            public void onSuccess(Rpc rpc) throws Exception {
                Utils.addListener(rpc.call(msg, retType), callListener);
            }

            @Override
            public void onFailure(Throwable error) throws Exception {
                promise.setFailure(error);
            }
        });
        return promise;
    }

    public <T> JobHandle<T> submit(Job<T> job) {
        return this.protocol.submit(job);
    }

    public <T> java.util.concurrent.Future<T> run(Job<T> job) {
        return this.protocol.run(job);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void stop(boolean shutdownContext) {
        if (this.isAlive) {
            this.isAlive = false;
            try {
                if (shutdownContext && this.driverRpc.isSuccess()) {
                    this.protocol.endSession();
                    long stopTimeout = this.conf.getTimeAsMs(RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT);
                    try {
                        ((Rpc)this.driverRpc.get()).getChannel().closeFuture().get(stopTimeout, TimeUnit.MILLISECONDS);
                    }
                    catch (Exception e) {
                        LOG.warn("Error waiting for context to shut down: {} ({}).", (Object)e.getClass().getSimpleName(), (Object)e.getMessage());
                    }
                }
            }
            catch (Exception e) {
                LOG.warn("Exception while waiting for end session reply.", (Throwable)e);
            }
            finally {
                if (this.driverRpc.isSuccess()) {
                    try {
                        ((Rpc)this.driverRpc.get()).close();
                    }
                    catch (Exception e) {
                        LOG.warn("Error stopping RPC.", (Throwable)e);
                    }
                }
                for (Map.Entry<String, JobHandleImpl<?>> e : this.jobs.entrySet()) {
                    LOG.info("Failing pending job {} due to shutdown.", (Object)e.getKey());
                    e.getValue().setFailure(new IOException("RSCClient instance stopped."));
                }
                this.eventLoopGroup.shutdownGracefully();
            }
            if (this.contextInfo != null) {
                LOG.debug("Disconnected from context {}, shutdown = {}.", (Object)this.contextInfo.clientId, (Object)shutdownContext);
            }
        }
    }

    public java.util.concurrent.Future<?> uploadJar(File jar) {
        throw new UnsupportedOperationException("Use addJar to add the jar to the remote context!");
    }

    public java.util.concurrent.Future<?> addJar(URI uri) {
        return this.submit(new AddJarJob(uri.toString()));
    }

    public java.util.concurrent.Future<?> uploadFile(File file) {
        throw new UnsupportedOperationException("Use addFile to add the file to the remote context!");
    }

    public java.util.concurrent.Future<?> addFile(URI uri) {
        return this.submit(new AddFileJob(uri.toString()));
    }

    public String bypass(ByteBuffer serializedJob, boolean sync) {
        return this.protocol.bypass(serializedJob, sync);
    }

    public java.util.concurrent.Future<BypassJobStatus> getBypassJobStatus(String id) {
        return this.protocol.getBypassJobStatus(id);
    }

    public void cancel(String jobId) {
        this.protocol.cancel(jobId);
    }

    ContextInfo getContextInfo() {
        return this.contextInfo;
    }

    public String submitReplCode(String code) throws Exception {
        String id = UUID.randomUUID().toString();
        this.deferredCall(new BaseProtocol.ReplJobRequest(code, id), Void.class);
        return id;
    }

    public java.util.concurrent.Future<String> getReplJobResult(String id) throws Exception {
        return this.deferredCall(new BaseProtocol.GetReplJobResult(id), String.class);
    }

    public java.util.concurrent.Future<String> getReplState() {
        return this.deferredCall(new BaseProtocol.GetReplState(), String.class);
    }

    private static class AddFileJob
    implements Job<Object> {
        private final String path;

        AddFileJob() {
            this(null);
        }

        AddFileJob(String path) {
            this.path = path;
        }

        public Object call(JobContext jc) throws Exception {
            jc.sc().addFile(this.path);
            return null;
        }
    }

    private class ClientProtocol
    extends BaseProtocol {
        private ClientProtocol() {
        }

        <T> JobHandleImpl<T> submit(Job<T> job) {
            final String jobId = UUID.randomUUID().toString();
            BaseProtocol.JobRequest<T> msg = new BaseProtocol.JobRequest<T>(jobId, job);
            final Promise promise = RSCClient.this.eventLoopGroup.next().newPromise();
            final JobHandleImpl handle = new JobHandleImpl(RSCClient.this, promise, jobId);
            RSCClient.this.jobs.put(jobId, handle);
            final Future rpc = RSCClient.this.deferredCall(msg, Void.class);
            LOG.debug("Sending JobRequest[{}].", (Object)jobId);
            Utils.addListener(rpc, new FutureListener<Void>(){

                @Override
                public void onSuccess(Void unused) throws Exception {
                    handle.changeState(JobHandle.State.QUEUED);
                }

                @Override
                public void onFailure(Throwable error) throws Exception {
                    error.printStackTrace();
                    promise.tryFailure(error);
                }
            });
            promise.addListener(new GenericFutureListener<Promise<T>>(){

                public void operationComplete(Promise<T> p) {
                    if (jobId != null) {
                        RSCClient.this.jobs.remove(jobId);
                    }
                    if (p.isCancelled() && !rpc.isDone()) {
                        rpc.cancel(true);
                    }
                }
            });
            return handle;
        }

        <T> java.util.concurrent.Future<T> run(Job<T> job) {
            return RSCClient.this.deferredCall(new BaseProtocol.SyncJobRequest<T>(job), Object.class);
        }

        String bypass(ByteBuffer serializedJob, boolean sync) {
            String jobId = UUID.randomUUID().toString();
            BaseProtocol.BypassJobRequest msg = new BaseProtocol.BypassJobRequest(jobId, BufferUtils.toByteArray(serializedJob), sync);
            RSCClient.this.deferredCall(msg, Void.class);
            return jobId;
        }

        java.util.concurrent.Future<BypassJobStatus> getBypassJobStatus(String id) {
            return RSCClient.this.deferredCall(new BaseProtocol.GetBypassJobStatus(id), BypassJobStatus.class);
        }

        void cancel(String jobId) {
            RSCClient.this.deferredCall(new BaseProtocol.CancelJob(jobId), Void.class);
        }

        java.util.concurrent.Future<?> endSession() {
            return RSCClient.this.deferredCall(new BaseProtocol.EndSession(), Void.class);
        }

        private void handle(ChannelHandlerContext ctx, BaseProtocol.InitializationError msg) {
            LOG.warn("Error reported from remote driver: %s", (Object)msg.stackTrace);
        }

        private void handle(ChannelHandlerContext ctx, BaseProtocol.JobResult msg) {
            JobHandleImpl handle = (JobHandleImpl)RSCClient.this.jobs.remove(msg.id);
            if (handle != null) {
                RuntimeException error;
                LOG.info("Received result for {}", (Object)msg.id);
                RuntimeException runtimeException = error = msg.error != null ? new RuntimeException(msg.error) : null;
                if (error == null) {
                    handle.setSuccess(msg.result);
                } else {
                    handle.setFailure(error);
                }
            } else {
                LOG.warn("Received result for unknown job {}", (Object)msg.id);
            }
        }

        private void handle(ChannelHandlerContext ctx, BaseProtocol.JobStarted msg) {
            JobHandleImpl handle = (JobHandleImpl)RSCClient.this.jobs.get(msg.id);
            if (handle != null) {
                handle.changeState(JobHandle.State.STARTED);
            } else {
                LOG.warn("Received event for unknown job {}", (Object)msg.id);
            }
        }
    }
}

