/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopGroupByClosure;
import cascading.flow.hadoop.util.FalseCollection;
import cascading.provider.FactoryLoader;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.Tuples;
import cascading.tuple.collect.Spillable;
import cascading.tuple.collect.SpillableTupleList;
import cascading.tuple.collect.TupleCollectionFactory;
import cascading.tuple.hadoop.collect.HadoopTupleCollectionFactory;
import cascading.tuple.io.IndexTuple;
import cascading.tuple.util.TupleViews;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopCoGroupClosure
extends HadoopGroupByClosure {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopCoGroupClosure.class);
    protected Collection<Tuple>[] collections;
    protected final int numSelfJoins;
    private Tuple[] joinedTuplesArray;
    private final Tuple emptyTuple;
    private TupleBuilder joinedBuilder;
    private Tuple joinedTuple = new Tuple();
    private final TupleCollectionFactory<Configuration> tupleCollectionFactory;

    public HadoopCoGroupClosure(FlowProcess flowProcess, int numSelfJoins, Fields[] groupingFields, Fields[] valueFields) {
        super(flowProcess, groupingFields, valueFields);
        this.numSelfJoins = numSelfJoins;
        this.emptyTuple = Tuple.size((int)groupingFields[0].size());
        FactoryLoader loader = FactoryLoader.getInstance();
        this.tupleCollectionFactory = (TupleCollectionFactory)loader.loadFactoryFrom(flowProcess, "cascading.factory.tuple.collection.classname", HadoopTupleCollectionFactory.class);
        this.initLists();
    }

    @Override
    public int size() {
        return Math.max(this.joinFields.length, this.numSelfJoins + 1);
    }

    @Override
    public Iterator<Tuple> getIterator(int pos) {
        if (pos < 0 || pos >= this.collections.length) {
            throw new IllegalArgumentException("invalid group position: " + pos);
        }
        return this.makeIterator(pos, this.collections[pos].iterator());
    }

    @Override
    public Tuple getGroupTuple(Tuple keysTuple) {
        Tuples.asModifiable((Tuple)this.joinedTuple);
        for (int i = 0; i < this.collections.length; ++i) {
            this.joinedTuplesArray[i] = this.collections[i].isEmpty() ? this.emptyTuple : keysTuple;
        }
        this.joinedTuple = this.joinedBuilder.makeResult(this.joinedTuplesArray);
        return this.joinedTuple;
    }

    @Override
    public boolean isEmpty(int pos) {
        return this.collections[pos].isEmpty();
    }

    @Override
    public void reset(Tuple grouping, Iterator<Tuple>[] values) {
        super.reset(grouping, values);
        this.build();
    }

    protected void build() {
        this.clearGroups();
        if (this.collections[0] instanceof FalseCollection) {
            ((FalseCollection)this.collections[0]).reset(null);
        }
        while (this.values[0].hasNext()) {
            IndexTuple current = (IndexTuple)this.values[0].next();
            int pos = current.getIndex();
            if (this.numSelfJoins == 0 && pos == 0) {
                ((FalseCollection)this.collections[0]).reset(this.createIterator(current, this.values[0]));
                break;
            }
            this.collections[pos].add(current.getTuple());
        }
    }

    protected void clearGroups() {
        for (Collection<Tuple> collection : this.collections) {
            collection.clear();
            if (!(collection instanceof Spillable)) continue;
            ((Spillable)collection).setGrouping(this.grouping);
        }
    }

    protected void initLists() {
        this.collections = new Collection[this.size()];
        if (this.numSelfJoins != 0) {
            Arrays.fill(this.collections, this.createTupleCollection(this.joinFields[0]));
        } else {
            this.collections[0] = new FalseCollection();
            for (int i = 1; i < this.joinFields.length; ++i) {
                this.collections[i] = this.createTupleCollection(this.joinFields[i]);
            }
        }
        this.joinedBuilder = this.makeJoinedBuilder(this.joinFields);
        this.joinedTuplesArray = new Tuple[this.collections.length];
    }

    private TupleBuilder makeJoinedBuilder(Fields[] joinFields) {
        Object[] fields;
        Object[] objectArray = fields = this.isSelfJoin() ? new Fields[this.size()] : joinFields;
        if (this.isSelfJoin()) {
            Arrays.fill(fields, 0, fields.length, joinFields[0]);
        }
        return new TupleBuilder((Fields[])fields){
            Tuple result;
            final /* synthetic */ Fields[] val$fields;
            {
                this.val$fields = fieldsArray;
                this.result = TupleViews.createComposite((Fields[])this.val$fields);
            }

            @Override
            public Tuple makeResult(Tuple[] tuples) {
                return TupleViews.reset((Tuple)this.result, (Object[])tuples);
            }
        };
    }

    protected Collection<Tuple> createTupleCollection(Fields joinField) {
        Collection collection = (Collection)this.tupleCollectionFactory.create(this.flowProcess);
        if (collection instanceof Spillable) {
            ((Spillable)collection).setSpillListener(this.createListener(joinField));
        }
        return collection;
    }

    private Spillable.SpillListener createListener(Fields joinField) {
        return new SpillListener(this.flowProcess, joinField);
    }

    public Iterator<Tuple> createIterator(final IndexTuple current, final Iterator<IndexTuple> values) {
        return new Iterator<Tuple>(){
            IndexTuple value;
            {
                this.value = current;
            }

            @Override
            public boolean hasNext() {
                return this.value != null;
            }

            @Override
            public Tuple next() {
                if (this.value == null && !values.hasNext()) {
                    throw new NoSuchElementException();
                }
                Tuple result = this.value.getTuple();
                this.value = values.hasNext() ? (IndexTuple)values.next() : null;
                return result;
            }

            @Override
            public void remove() {
            }
        };
    }

    static interface TupleBuilder {
        public Tuple makeResult(Tuple[] var1);
    }

    private class SpillListener
    implements Spillable.SpillListener {
        private final FlowProcess flowProcess;
        private final Fields joinField;

        public SpillListener(FlowProcess flowProcess, Fields joinField) {
            this.flowProcess = flowProcess;
            this.joinField = joinField;
        }

        public void notifyWriteSpillBegin(Spillable spillable, int spillSize, String spillReason) {
            int numFiles = spillable.spillCount();
            if (numFiles % 10 == 0) {
                LOG.info("spilling group: {}, on grouping: {}, num times: {}, with reason: {}", new Object[]{this.joinField.printVerbose(), spillable.getGrouping().print(), numFiles + 1, spillReason});
                Runtime runtime = Runtime.getRuntime();
                long freeMem = runtime.freeMemory() / 1024L / 1024L;
                long maxMem = runtime.maxMemory() / 1024L / 1024L;
                long totalMem = runtime.totalMemory() / 1024L / 1024L;
                LOG.info("mem on spill (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem);
            }
            LOG.info("spilling {} tuples in list to file number {}", (Object)spillSize, (Object)(numFiles + 1));
            this.flowProcess.increment((Enum)Spill.Num_Spills_Written, 1L);
            this.flowProcess.increment((Enum)Spill.Num_Tuples_Spilled, (long)spillSize);
        }

        public void notifyWriteSpillEnd(SpillableTupleList spillableTupleList, long duration) {
            this.flowProcess.increment((Enum)Spill.Duration_Millis_Written, duration);
        }

        public void notifyReadSpillBegin(Spillable spillable) {
            this.flowProcess.increment((Enum)Spill.Num_Spills_Read, 1L);
        }
    }

    public static enum Spill {
        Num_Spills_Written,
        Num_Spills_Read,
        Num_Tuples_Spilled,
        Duration_Millis_Written;

    }
}

