/*
 * Decompiled with CFR 0.152.
 */
package water;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Delayed;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import jsr166y.CountedCompleter;
import jsr166y.ForkJoinPool;
import water.AutoBuffer;
import water.DTask;
import water.Freezable;
import water.H2O;
import water.H2ONode;
import water.MRTask;
import water.TypeMap;
import water.UDP;
import water.util.DistributedException;
import water.util.Log;

public class RPC<V extends DTask>
implements Future<V>,
Delayed,
ForkJoinPool.ManagedBlocker {
    H2ONode _target;
    final V _dt;
    volatile boolean _done;
    volatile boolean _nack;
    int _tasknum;
    final long _started;
    long _retry;
    int _resendsCnt;
    ArrayList<H2O.H2OCountedCompleter> _fjtasks;
    boolean _sentTcp;
    int _size;
    int _size_rez;
    static final byte SERVER_UDP_SEND = 10;
    static final byte SERVER_TCP_SEND = 11;
    static final byte CLIENT_UDP_SEND = 12;
    static final byte CLIENT_TCP_SEND = 13;
    private static final String[] COOKIES = new String[]{"SERVER_UDP", "SERVER_TCP", "CLIENT_UDP", "CLIENT_TCP"};
    static final int MAX_TIMEOUT = 60000;
    static final long RETRY_MS = 10000L;

    public static <DT extends DTask> RPC<DT> call(H2ONode target, DT dtask) {
        return new RPC<DT>(target, dtask).call();
    }

    public RPC(H2ONode target, V dtask) {
        this(target, dtask, 1.0f);
        this.setTaskNum();
    }

    RPC(H2ONode target, V dtask, float ignore) {
        this._target = target;
        this._dt = dtask;
        this._started = System.currentTimeMillis();
        this._retry = 10000L;
    }

    RPC<V> setTaskNum() {
        assert (this._tasknum == 0);
        this._tasknum = this._target.nextTaskNum();
        return this;
    }

    private void handleCompleter(CountedCompleter cc) {
        assert (cc instanceof H2O.H2OCountedCompleter);
        if (this._fjtasks == null || !this._fjtasks.contains(cc)) {
            this.addCompleter((H2O.H2OCountedCompleter)cc);
        }
        ((CountedCompleter)this._dt).setCompleter(null);
    }

    private RPC<V> handleLocal() {
        assert (((CountedCompleter)this._dt).getCompleter() == null);
        ((CountedCompleter)this._dt).setCompleter(new H2O.H2OCallback<DTask>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void callback(DTask dt) {
                RPC rPC = RPC.this;
                synchronized (rPC) {
                    RPC.this._done = true;
                    RPC.this.notifyAll();
                }
                RPC.this.doAllCompletions();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean onExceptionalCompletion(Throwable ex, CountedCompleter dt) {
                RPC rPC = RPC.this;
                synchronized (rPC) {
                    if (RPC.this._done) {
                        return true;
                    }
                    ((DTask)RPC.this._dt).setException(ex);
                    RPC.this._done = true;
                    RPC.this.notifyAll();
                }
                RPC.this.doAllCompletions();
                return true;
            }
        });
        H2O.submitTask(this._dt);
        return this;
    }

    public synchronized RPC<V> call() {
        CountedCompleter cc = ((CountedCompleter)this._dt).getCompleter();
        if (cc != null) {
            this.handleCompleter(cc);
        }
        if (this._target == H2O.SELF) {
            return this.handleLocal();
        }
        if (this._target != null) {
            this._target.taskPut(this._tasknum, this);
        }
        try {
            block15: {
                if (this._nack) {
                    return this;
                }
                if (this.isDone()) {
                    if (this._target != null) {
                        this._target.taskRemove(this._tasknum);
                    }
                    return this;
                }
                if (!this._sentTcp) {
                    while (true) {
                        AutoBuffer ab = new AutoBuffer(this._target, ((H2O.H2OCountedCompleter)this._dt).priority());
                        try {
                            ab.putTask(UDP.udp.exec, this._tasknum).put1(12);
                            ab.put((Freezable)this._dt);
                            boolean t = ab.hasTCP();
                            assert (this.sz_check(ab)) : "Resend of " + this._dt.getClass() + " changes size from " + this._size + " to " + ab.size() + " for task#" + this._tasknum;
                            ab.close();
                            this._sentTcp = t;
                            break block15;
                        }
                        catch (AutoBuffer.AutoBufferException e) {
                            Log.info("IOException during RPC call: " + e._ioe.getMessage() + ",  AB=" + ab + ", for task#" + this._tasknum + ", waiting and retrying...");
                            ab.drainClose();
                            try {
                                Thread.sleep(500L);
                            }
                            catch (InterruptedException interruptedException) {}
                            continue;
                        }
                        break;
                    }
                }
                AutoBuffer ab = new AutoBuffer(this._target, ((H2O.H2OCountedCompleter)this._dt).priority()).putTask(UDP.udp.exec, this._tasknum);
                ab.put1(13).close();
            }
            this._retry += this._retry < 60000L ? this._retry : 60000L;
            return this;
        }
        catch (Throwable t) {
            t.printStackTrace();
            throw Log.throwErr(t);
        }
    }

    private V result() {
        Throwable t = ((DTask)this._dt).getDException();
        if (t != null) {
            throw t instanceof DistributedException ? new DistributedException(t.getMessage(), t.getCause()) : new DistributedException(t);
        }
        return this._dt;
    }

    @Override
    public V get() {
        int priority;
        Thread cThr = Thread.currentThread();
        int n = priority = cThr instanceof H2O.FJWThr ? ((H2O.FJWThr)cThr)._priority : -1;
        assert (((H2O.H2OCountedCompleter)this._dt).priority() > priority || ((H2O.H2OCountedCompleter)this._dt).priority() == priority && this._dt instanceof MRTask) : "*** Attempting to block on task (" + this._dt.getClass() + ") with equal or lower priority. Can lead to deadlock! " + ((H2O.H2OCountedCompleter)this._dt).priority() + " <=  " + priority;
        if (this._done) {
            return this.result();
        }
        try {
            ForkJoinPool.managedBlock(this);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (this._done) {
            return this.result();
        }
        assert (this.isCancelled());
        return null;
    }

    @Override
    public boolean isReleasable() {
        return this.isDone();
    }

    @Override
    public synchronized boolean block() throws InterruptedException {
        while (!this.isDone()) {
            this.wait(1000L);
        }
        return true;
    }

    @Override
    public final V get(long timeout, TimeUnit unit) {
        if (this._done) {
            return this._dt;
        }
        throw H2O.fail();
    }

    @Override
    public final boolean isDone() {
        return this._target == null || this._done;
    }

    @Override
    public final boolean isCancelled() {
        return this._target == null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final boolean cancel(boolean mayInterruptIfRunning) {
        boolean did = false;
        RPC rPC = this;
        synchronized (rPC) {
            if (!this.isCancelled()) {
                did = true;
                this._target.taskRemove(this._tasknum);
                this._target = null;
            }
            this.notifyAll();
        }
        return did;
    }

    static void remote_exec(AutoBuffer ab) {
        long lo = ab.get8(0);
        long hi = ab._size >= 16 ? ab.get8(8) : 0L;
        int task = ab.getTask();
        int flag = ab.getFlag();
        assert (flag == 12 || flag == 13);
        RPCCall old = ab._h2o.has_task(task);
        if (old == null && flag == 13) {
            Log.warn("got tcp with existing task #, FROM " + ab._h2o.toString() + " AB: ");
            assert (!ab.hasTCP()) : "ERROR: got tcp with existing task #, FROM " + ab._h2o.toString() + " AB: ";
        } else if (old == null) {
            RPCCall rpc;
            try {
                rpc = new RPCCall(ab.get(DTask.class), ab._h2o, task);
            }
            catch (AutoBuffer.AutoBufferException e) {
                Log.info("Network congestion OR short-writer/long-reader: TCP " + e._ioe.getMessage() + ",  AB=" + ab + ", ignoring partial send");
                ab.drainClose();
                return;
            }
            RPCCall rpc2 = ab._h2o.record_task(rpc);
            if (rpc2 == null) {
                if (rpc._dt instanceof MRTask && rpc._dt.logVerbose()) {
                    Log.debug("Start remote task#" + task + " " + rpc._dt.getClass() + " from " + ab._h2o);
                }
                H2O.submitTask(rpc);
            } else if (ab.hasTCP()) {
                ab.drainClose();
            }
        } else if (!old._computedAndReplied) {
            assert (!ab.hasTCP()) : "got tcp with existing task #, FROM " + ab._h2o.toString() + " AB: " + UDP.printx16(lo, hi) + ", position = " + ab._bb.position();
            ab.clearForWriting(UDP.udp.nack._prior).putTask(UDP.udp.nack.ordinal(), task);
        } else {
            if (ab.hasTCP()) {
                Log.warn("got tcp with existing task #, FROM " + ab._h2o.toString() + " AB: " + UDP.printx16(lo, hi));
                ab.drainClose();
            }
            if (old._dt != null) {
                ++old._ackResendCnt;
                if (old._ackResendCnt % 10 == 0) {
                    Log.err("Possibly broken network, can not send ack through, got " + old._ackResendCnt + " for task # " + old._tsknum + ", dt == null?" + (old._dt == null));
                }
                old.resend_ack();
            }
        }
        ab.close();
    }

    static void tcp_ack(AutoBuffer ab) throws IOException {
        int task = ab.getTask();
        RPC rpc = ab._h2o.taskGet(task);
        if (rpc == null || rpc._done) {
            ab.drainClose();
        } else {
            assert (rpc._tasknum == task);
            assert (!rpc._done);
            try {
                rpc.response(ab);
            }
            catch (AutoBuffer.AutoBufferException e) {
                throw Log.throwErr(e._ioe);
            }
        }
        new AutoBuffer(ab._h2o, 126).putTask(UDP.udp.ackack.ordinal(), task).close();
    }

    static AutoBuffer ackack(AutoBuffer ab, int tnum) {
        return ab.clearForWriting((byte)126).putTask(UDP.udp.ackack.ordinal(), tnum);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected AutoBuffer response(AutoBuffer ab) {
        assert (this._tasknum == ab.getTask());
        if (this._done) {
            if (!ab.hasTCP()) {
                return RPC.ackack(ab, this._tasknum);
            }
            ab.drainClose();
        } else {
            int flag = ab.getFlag();
            if (flag == 11) {
                return RPC.ackack(ab, this._tasknum);
            }
            assert (flag == 10) : "flag = " + flag;
            RPC rPC = this;
            synchronized (rPC) {
                if (this._done) {
                    if (!ab.hasTCP()) {
                        return RPC.ackack(ab, this._tasknum);
                    }
                    ab.drainClose();
                } else {
                    ((H2O.H2OCountedCompleter)this._dt).read(ab);
                    this._size_rez = ab.size();
                    ab.close();
                    if (!this.isCancelled()) {
                        ((DTask)this._dt).onAck();
                    }
                    this._done = true;
                    ab._h2o.taskRemove(this._tasknum);
                    this.notifyAll();
                }
                if (!this.isCancelled()) {
                    this.doAllCompletions();
                }
            }
        }
        return new AutoBuffer(ab._h2o, 126).putTask(UDP.udp.ackack.ordinal(), this._tasknum);
    }

    private void doAllCompletions() {
        final Throwable e = ((DTask)this._dt).getDException();
        if (this._fjtasks != null) {
            for (final H2O.H2OCountedCompleter task : this._fjtasks) {
                H2O.submitTask(new H2O.H2OCountedCompleter(task.priority()){

                    @Override
                    public void compute2() {
                        if (e != null) {
                            task.completeExceptionally(e);
                        } else {
                            try {
                                task.__tryComplete((CountedCompleter)RPC.this._dt);
                            }
                            catch (Throwable e2) {
                                task.completeExceptionally(e2);
                            }
                        }
                    }
                });
            }
        }
    }

    public synchronized RPC<V> addCompleter(H2O.H2OCountedCompleter task) {
        if (this._fjtasks == null) {
            this._fjtasks = new ArrayList(2);
        }
        this._fjtasks.add(task);
        return this;
    }

    private boolean sz_check(AutoBuffer ab) {
        int absize = ab.size();
        if (this._size == 0) {
            this._size = absize;
            return true;
        }
        return this._size == absize;
    }

    int size_rez() {
        return this._size_rez;
    }

    @Override
    public final long getDelay(TimeUnit unit) {
        long delay = this._started + this._retry - System.currentTimeMillis();
        return unit.convert(delay, TimeUnit.MILLISECONDS);
    }

    @Override
    public final int compareTo(Delayed t) {
        RPC dt = (RPC)t;
        long nextTime = this._started + this._retry;
        long dtNextTime = dt._started + dt._retry;
        return nextTime == dtNextTime ? 0 : (nextTime > dtNextTime ? 1 : -1);
    }

    static class RPCCall
    extends H2O.H2OCountedCompleter
    implements Delayed {
        volatile DTask _dt;
        final H2ONode _client;
        final int _tsknum;
        long _started;
        long _retry;
        int _ackResendCnt;
        int _nackResendCnt;
        volatile boolean _computedAndReplied;
        volatile boolean _computed;
        int _size;
        private static AtomicReferenceFieldUpdater<RPCCall, DTask> CAS_DT = AtomicReferenceFieldUpdater.newUpdater(RPCCall.class, DTask.class, "_dt");

        RPCCall(DTask dt, H2ONode client, int tsknum) {
            super(dt.priority());
            this._dt = dt;
            this._client = client;
            this._tsknum = tsknum;
            if (this._dt == null) {
                this._computedAndReplied = true;
            }
            this._started = System.currentTimeMillis();
            this._retry = 5000L;
        }

        RPCCall(H2ONode client) {
            this._client = client;
            this._tsknum = 0;
        }

        @Override
        public void compute2() {
            assert (this._dt.getCompleter() == null);
            this._dt.setCompleter(this);
            this._dt.dinvoke(this._client);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onCompletion(CountedCompleter caller) {
            RPCCall rPCCall = this;
            synchronized (rPCCall) {
                assert (!this._computed);
                this._computed = true;
            }
            this.sendAck();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) {
            if (this._computed) {
                return false;
            }
            RPCCall rPCCall = this;
            synchronized (rPCCall) {
                if (this._computed) {
                    return false;
                }
                this._computed = true;
            }
            this._dt.setException(ex);
            this.sendAck();
            return false;
        }

        private void sendAck() {
            DTask dt;
            DTask origDt = this._dt;
            assert (origDt != null);
            while ((dt = this._dt) != null) {
                AutoBuffer ab = null;
                try {
                    UDP.udp udp2 = dt.priority() == 125 ? UDP.udp.fetchack : UDP.udp.ack;
                    ab = new AutoBuffer(this._client, udp2._prior).putTask(udp2, this._tsknum).put1(10);
                    assert (ab.position() == 8);
                    dt.write(ab);
                    dt._repliedTcp = ab.hasTCP();
                    ab.close();
                    this._computedAndReplied = true;
                    break;
                }
                catch (AutoBuffer.AutoBufferException e) {
                    if (!this._client._heartbeat._client) {
                        Log.info("IOException during ACK, " + e._ioe.getMessage() + ", t#" + this._tsknum + " AB=" + ab + ", waiting and retrying...");
                    }
                    ab.drainClose();
                    if (this._client._heartbeat._client) {
                        this.CAS_DT(dt, null);
                    }
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
                catch (Throwable e) {
                    Log.err(e);
                    ab.drainClose();
                }
            }
            if (dt == null) {
                Log.info("Cancelled remote task#" + this._tsknum + " " + origDt.getClass() + " to " + this._client + " has been cancelled by remote");
            } else {
                if (dt instanceof MRTask && dt.logVerbose()) {
                    Log.debug("Done remote task#" + this._tsknum + " " + dt.getClass() + " to " + this._client);
                }
                this._client.record_task_answer(this);
            }
        }

        final void send_nack() {
            new AutoBuffer(this._client, UDP.udp.nack._prior).putTask(UDP.udp.nack, this._tsknum).close();
            this._retry += this._retry < 60000L ? this._retry : 60000L;
        }

        final void resend_ack() {
            assert (this._computedAndReplied) : "Found RPCCall not computed " + this._tsknum;
            DTask dt = this._dt;
            if (dt == null) {
                return;
            }
            UDP.udp udp2 = dt.priority() == 125 ? UDP.udp.fetchack : UDP.udp.ack;
            AutoBuffer rab = new AutoBuffer(this._client, dt.priority()).putTask(udp2, this._tsknum);
            boolean wasTCP = dt._repliedTcp;
            if (wasTCP) {
                rab.put1(11);
            } else {
                rab.put1(10);
                assert (rab.position() == 8);
                dt.write(rab);
            }
            assert (this.sz_check(rab)) : "Resend of " + this._dt.getClass() + " changes size from " + this._size + " to " + rab.size();
            assert (dt._repliedTcp == wasTCP);
            rab.close();
            dt._repliedTcp = wasTCP;
            this._retry += this._retry < 60000L ? this._retry : 60000L;
        }

        @Override
        public final long getDelay(TimeUnit unit) {
            long delay = this._started + this._retry - System.currentTimeMillis();
            return unit.convert(delay, TimeUnit.MILLISECONDS);
        }

        @Override
        public final int compareTo(Delayed t) {
            RPCCall r = (RPCCall)t;
            long nextTime = this._started + this._retry;
            long rNextTime = r._started + r._retry;
            return nextTime == rNextTime ? 0 : (nextTime > rNextTime ? 1 : -1);
        }

        boolean CAS_DT(DTask old, DTask nnn) {
            return CAS_DT.compareAndSet(this, old, nnn);
        }

        private boolean sz_check(AutoBuffer ab) {
            int absize = ab.size();
            if (this._size == 0) {
                this._size = absize;
                return true;
            }
            return this._size == absize;
        }
    }

    static class RemoteHandler
    extends UDP {
        RemoteHandler() {
        }

        @Override
        AutoBuffer call(AutoBuffer ab) {
            throw H2O.fail();
        }

        @Override
        String print16(AutoBuffer ab) {
            int flag = ab.getFlag();
            String clazz = flag == 12 ? TypeMap.className(ab.getInt()) : "";
            return "task# " + ab.getTask() + " " + clazz + " " + COOKIES[flag - 10];
        }
    }
}

