/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.processors.hadoop.shuffle;

import java.io.DataInput;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFuture;
import org.gridgain.grid.GridInterruptedException;
import org.gridgain.grid.hadoop.GridHadoopJob;
import org.gridgain.grid.hadoop.GridHadoopJobInfo;
import org.gridgain.grid.hadoop.GridHadoopJobProperty;
import org.gridgain.grid.hadoop.GridHadoopPartitioner;
import org.gridgain.grid.hadoop.GridHadoopTaskInfo;
import org.gridgain.grid.hadoop.GridHadoopTaskInput;
import org.gridgain.grid.hadoop.GridHadoopTaskOutput;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleAck;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleMessage;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopConcurrentHashMultimap;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopMultimap;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopSkipList;
import org.gridgain.grid.lang.GridBiTuple;
import org.gridgain.grid.lang.GridInClosure;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.thread.GridThread;
import org.gridgain.grid.util.future.GridCompoundFuture;
import org.gridgain.grid.util.future.GridFinishedFutureEx;
import org.gridgain.grid.util.future.GridFutureAdapterEx;
import org.gridgain.grid.util.io.GridUnsafeDataInput;
import org.gridgain.grid.util.lang.GridClosureException;
import org.gridgain.grid.util.lang.GridInClosure2X;
import org.gridgain.grid.util.offheap.unsafe.GridUnsafeMemory;
import org.gridgain.grid.util.typedef.internal.U;
import org.gridgain.grid.util.worker.GridWorker;

public class GridHadoopShuffleJob<T>
implements AutoCloseable {
    private static final int MSG_BUF_SIZE = 32768;
    private final GridHadoopJob job;
    private final GridUnsafeMemory mem;
    private final GridHadoopPartitioner partitioner;
    private GridHadoopMultimap combinerMap;
    private T[] reduceAddrs;
    private T locReduceAddr;
    private GridHadoopShuffleMessage[] msgs;
    private final AtomicReferenceArray<GridHadoopMultimap> maps;
    private volatile GridInClosure2X<T, GridHadoopShuffleMessage> io;
    protected ConcurrentMap<Long, GridBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>>> sentMsgs = new ConcurrentHashMap();
    private volatile GridWorker sender;
    private final CountDownLatch ioInitLatch = new CountDownLatch(1);
    private volatile boolean flushed;
    private final GridLogger log;

    public GridHadoopShuffleJob(T locReduceAddr, GridLogger log, GridHadoopJob job, GridUnsafeMemory mem, int reducers, boolean hasLocMappers) throws GridException {
        this.locReduceAddr = locReduceAddr;
        this.job = job;
        this.mem = mem;
        this.log = log;
        job.beforeTaskRun(null);
        this.partitioner = reducers > 1 ? job.partitioner() : null;
        this.maps = new AtomicReferenceArray(reducers);
        this.msgs = new GridHadoopShuffleMessage[reducers];
        if (job.info().hasCombiner() && hasLocMappers) {
            this.combinerMap = GridHadoopJobProperty.get((GridHadoopJobInfo)job.info(), (GridHadoopJobProperty)GridHadoopJobProperty.SHUFFLE_COMBINER_NO_SORTING, (boolean)false) ? new GridHadoopConcurrentHashMultimap(job, mem, GridHadoopJobProperty.get((GridHadoopJobInfo)job.info(), (GridHadoopJobProperty)GridHadoopJobProperty.COMBINER_HASHMAP_SIZE, (int)8192)) : new GridHadoopSkipList(job, mem, job.sortComparator());
        }
    }

    public boolean initializeReduceAddresses(T[] reduceAddresses) {
        if (this.reduceAddrs == null) {
            this.reduceAddrs = reduceAddresses;
            return true;
        }
        return false;
    }

    public boolean reducersInitialized() {
        return this.reduceAddrs != null;
    }

    public void startSending(String gridName, GridInClosure2X<T, GridHadoopShuffleMessage> io) {
        assert (this.sender == null);
        assert (io != null);
        this.io = io;
        if (!this.flushed) {
            this.sender = new GridWorker(gridName, "hadoop-shuffle-" + this.job.id(), this.log){

                protected void body() throws InterruptedException {
                    try {
                        GridHadoopShuffleJob.this.job.beforeTaskRun(null);
                        while (!this.isCancelled()) {
                            Thread.sleep(5L);
                            GridHadoopShuffleJob.this.collectUpdatesAndSend(false);
                        }
                    }
                    catch (GridException e) {
                        throw new IllegalStateException(e);
                    }
                }
            };
            new GridThread(this.sender).start();
        }
        this.ioInitLatch.countDown();
    }

    private GridHadoopMultimap getOrCreateMap(AtomicReferenceArray<GridHadoopMultimap> maps, int idx) {
        GridHadoopMultimap map = maps.get(idx);
        if (map == null) {
            GridHadoopMultimap gridHadoopMultimap = map = GridHadoopJobProperty.get((GridHadoopJobInfo)this.job.info(), (GridHadoopJobProperty)GridHadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING, (boolean)false) ? new GridHadoopConcurrentHashMultimap(this.job, this.mem, GridHadoopJobProperty.get((GridHadoopJobInfo)this.job.info(), (GridHadoopJobProperty)GridHadoopJobProperty.PARTITION_HASHMAP_SIZE, (int)8192)) : new GridHadoopSkipList(this.job, this.mem, this.job.sortComparator());
            if (!maps.compareAndSet(idx, null, map)) {
                map.close();
                return maps.get(idx);
            }
        }
        return map;
    }

    public void onShuffleMessage(GridHadoopShuffleMessage msg) throws GridException {
        assert (msg.buffer() != null);
        assert (msg.offset() > 0);
        GridHadoopMultimap map = this.getOrCreateMap(this.maps, msg.reducer());
        try (final GridHadoopMultimap.Adder adder = map.startAdding();){
            final GridUnsafeDataInput dataInput = new GridUnsafeDataInput();
            final UnsafeValue val = new UnsafeValue(msg.buffer());
            msg.visit(new GridHadoopShuffleMessage.Visitor(){
                private GridHadoopMultimap.Key key;

                @Override
                public void onKey(byte[] buf, int off, int len) throws GridException {
                    dataInput.bytes(buf, off, off + len);
                    this.key = adder.addKey((DataInput)dataInput, this.key);
                }

                @Override
                public void onValue(byte[] buf, int off, int len) {
                    val.off = off;
                    val.size = len;
                    this.key.add(val);
                }
            });
        }
    }

    public void onShuffleAck(GridHadoopShuffleAck ack) {
        GridBiTuple tup = (GridBiTuple)this.sentMsgs.get(ack.id());
        if (tup != null) {
            ((GridFutureAdapterEx)tup.get2()).onDone();
        } else {
            this.log.warning("Received shuffle ack for not registered shuffle id: " + ack);
        }
    }

    private void collectUpdatesAndSend(boolean flush) throws GridException {
        for (int i = 0; i < this.maps.length(); ++i) {
            GridHadoopMultimap map = this.maps.get(i);
            if (map == null || this.locReduceAddr.equals(this.reduceAddrs[i])) continue;
            if (this.msgs[i] == null) {
                this.msgs[i] = new GridHadoopShuffleMessage(this.job.id(), i, 32768);
            }
            final int idx = i;
            map.visit(false, new GridHadoopMultimap.Visitor(){
                private long keyPtr;
                private int keySize;
                private boolean keyAdded;

                @Override
                public void onKey(long keyPtr, int keySize) {
                    this.keyPtr = keyPtr;
                    this.keySize = keySize;
                    this.keyAdded = false;
                }

                private boolean tryAdd(long valPtr, int valSize) {
                    GridHadoopShuffleMessage msg = GridHadoopShuffleJob.this.msgs[idx];
                    if (!this.keyAdded) {
                        int size = this.keySize + valSize;
                        if (!msg.available(size, false)) {
                            return false;
                        }
                        msg.addKey(this.keyPtr, this.keySize);
                        msg.addValue(valPtr, valSize);
                        this.keyAdded = true;
                        return true;
                    }
                    if (!msg.available(valSize, true)) {
                        return false;
                    }
                    msg.addValue(valPtr, valSize);
                    return true;
                }

                @Override
                public void onValue(long valPtr, int valSize) {
                    if (this.tryAdd(valPtr, valSize)) {
                        return;
                    }
                    GridHadoopShuffleJob.this.send(idx, this.keySize + valSize);
                    this.keyAdded = false;
                    if (!this.tryAdd(valPtr, valSize)) {
                        throw new IllegalStateException();
                    }
                }
            });
            if (!flush || this.msgs[i].offset() == 0) continue;
            this.send(i, 0);
        }
    }

    private void send(int idx, int newBufMinSize) {
        GridFutureAdapterEx fut = new GridFutureAdapterEx();
        GridHadoopShuffleMessage msg = this.msgs[idx];
        final long msgId = msg.id();
        GridBiTuple old = this.sentMsgs.putIfAbsent(msgId, new GridBiTuple((Object)msg, (Object)fut));
        assert (old == null);
        try {
            this.io.apply(this.reduceAddrs[idx], (Object)msg);
        }
        catch (GridClosureException e) {
            fut.onDone((Throwable)U.unwrap((Throwable)e));
        }
        fut.listenAsync(new GridInClosure<GridFuture<?>>(){

            public void apply(GridFuture<?> f) {
                try {
                    f.get();
                    GridHadoopShuffleJob.this.sentMsgs.remove(msgId);
                }
                catch (GridException e) {
                    GridHadoopShuffleJob.this.log.error("Failed to send message.", (Throwable)e);
                }
            }
        });
        this.msgs[idx] = newBufMinSize == 0 ? null : new GridHadoopShuffleMessage(this.job.id(), idx, Math.max(32768, newBufMinSize));
    }

    @Override
    public void close() throws GridException {
        if (this.sender != null) {
            this.sender.cancel();
            try {
                this.sender.join();
            }
            catch (InterruptedException e) {
                throw new GridInterruptedException(e);
            }
        }
        this.close(this.maps);
    }

    private void close(AtomicReferenceArray<GridHadoopMultimap> maps) {
        for (int i = 0; i < maps.length(); ++i) {
            GridHadoopMultimap map = maps.get(i);
            if (map == null) continue;
            map.close();
        }
    }

    public GridFuture<?> flush() throws GridException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Flushing job " + this.job.id() + " on address " + this.locReduceAddr);
        }
        this.flushed = true;
        if (this.maps.length() == 0) {
            return new GridFinishedFutureEx();
        }
        U.await((CountDownLatch)this.ioInitLatch);
        GridWorker sender0 = this.sender;
        if (sender0 != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Cancelling sender thread.");
            }
            sender0.cancel();
            try {
                sender0.join();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + this.job.id());
                }
            }
            catch (InterruptedException e) {
                throw new GridInterruptedException(e);
            }
        }
        this.collectUpdatesAndSend(true);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Finished sending collected updates to remote reducers: " + this.job.id());
        }
        GridCompoundFuture fut = new GridCompoundFuture();
        for (GridBiTuple tup : this.sentMsgs.values()) {
            fut.add((GridFuture)tup.get2());
        }
        fut.markInitialized();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Collected futures to compound futures for flush: " + this.sentMsgs.size());
        }
        return fut;
    }

    public GridHadoopTaskOutput output(GridHadoopTaskInfo taskInfo) throws GridException {
        switch (taskInfo.type()) {
            case MAP: {
                if (this.combinerMap != null) {
                    return this.combinerMap.startAdding();
                }
            }
            case COMBINE: {
                return new PartitionedOutput();
            }
        }
        throw new IllegalStateException("Illegal type: " + taskInfo.type());
    }

    public GridHadoopTaskInput input(GridHadoopTaskInfo taskInfo) throws GridException {
        switch (taskInfo.type()) {
            case COMBINE: {
                return this.combinerMap.input(this.job.combineGroupComparator());
            }
            case REDUCE: {
                int reducer = taskInfo.taskNumber();
                GridHadoopMultimap m = this.maps.get(reducer);
                if (m != null) {
                    return m.input(this.job.reduceGroupComparator());
                }
                return new GridHadoopTaskInput(){

                    public boolean next() {
                        return false;
                    }

                    public Object key() {
                        throw new IllegalStateException();
                    }

                    public Iterator<?> values() {
                        throw new IllegalStateException();
                    }

                    public void close() {
                    }
                };
            }
        }
        throw new IllegalStateException("Illegal type: " + taskInfo.type());
    }

    private class PartitionedOutput
    implements GridHadoopTaskOutput {
        private GridHadoopTaskOutput[] adders;

        private PartitionedOutput() {
            this.adders = new GridHadoopTaskOutput[GridHadoopShuffleJob.this.maps.length()];
        }

        public void write(Object key, Object val) throws GridException {
            int part = 0;
            if (GridHadoopShuffleJob.this.partitioner != null && ((part = GridHadoopShuffleJob.this.partitioner.partition(key, val, this.adders.length)) < 0 || part >= this.adders.length)) {
                throw new GridException("Invalid partition: " + part);
            }
            GridHadoopTaskOutput out = this.adders[part];
            if (out == null) {
                this.adders[part] = out = GridHadoopShuffleJob.this.getOrCreateMap(GridHadoopShuffleJob.this.maps, part).startAdding();
            }
            out.write(key, val);
        }

        public void close() throws GridException {
            for (GridHadoopTaskOutput adder : this.adders) {
                if (adder == null) continue;
                adder.close();
            }
        }
    }

    private static class UnsafeValue
    implements GridHadoopMultimap.Value {
        private final byte[] buf;
        private int off;
        private int size;

        private UnsafeValue(byte[] buf) {
            assert (buf != null);
            this.buf = buf;
        }

        @Override
        public int size() {
            return this.size;
        }

        @Override
        public void copyTo(long ptr) {
            GridUnsafeMemory.UNSAFE.copyMemory(this.buf, GridUnsafeMemory.BYTE_ARR_OFF + (long)this.off, null, ptr, this.size);
        }
    }
}

