/*
 * Decompiled with CFR 0.152.
 */
package datafu.pig.sets;

import datafu.pig.sets.SetOperationsBase;
import java.io.IOException;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;

public class SetDifference
extends SetOperationsBase {
    private static final BagFactory bagFactory = BagFactory.getInstance();

    private PriorityQueue<Pair> loadBags(Tuple input) throws IOException {
        PriorityQueue<Pair> pq = new PriorityQueue<Pair>(input.size());
        for (int i = 0; i < input.size(); ++i) {
            Iterator inputIterator;
            if (input.get(i) == null || !(inputIterator = ((DataBag)input.get(i)).iterator()).hasNext()) continue;
            pq.add(new Pair(inputIterator, i));
        }
        return pq;
    }

    public int countMatches(PriorityQueue<Pair> pq) {
        Pair nextPair = pq.peek();
        Tuple data = nextPair.data;
        if (!nextPair.index.equals(0)) {
            throw new RuntimeException("Expected next bag to have index 0");
        }
        int matches = 0;
        for (Pair p : pq) {
            if (!data.equals(p.data)) continue;
            ++matches;
        }
        return matches - 1;
    }

    public DataBag exec(Tuple input) throws IOException {
        if (input.size() < 2) {
            throw new RuntimeException("Expected at least two inputs, but found " + input.size());
        }
        for (Object o : input) {
            if (o == null || o instanceof DataBag) continue;
            throw new RuntimeException("Inputs must be bags");
        }
        DataBag outputBag = bagFactory.newDefaultBag();
        DataBag bag1 = (DataBag)input.get(0);
        DataBag bag2 = (DataBag)input.get(1);
        if (bag1 == null || bag1.size() == 0L) {
            return outputBag;
        }
        if (input.size() == 2 && (bag2 == null || bag2.size() == 0L)) {
            return bag1;
        }
        PriorityQueue<Pair> pq = this.loadBags(input);
        Tuple lastData = null;
        while (true) {
            Pair p;
            Pair nextPair;
            if ((nextPair = pq.peek()).data.compareTo(lastData) != 0 && nextPair.index.equals(0) && this.countMatches(pq) == 0) {
                outputBag.add(nextPair.data);
                lastData = nextPair.data;
            }
            if ((p = pq.poll()).hasNext()) {
                p.next();
                pq.offer(p);
                continue;
            }
            if (p.index.equals(0)) break;
        }
        return outputBag;
    }

    private static class Pair
    implements Comparable<Pair> {
        private final Iterator<Tuple> it;
        private final Integer index;
        private Tuple data;

        public Pair(Iterator<Tuple> it, int index) {
            this.index = index;
            this.it = it;
            this.data = it.next();
        }

        @Override
        public int compareTo(Pair o) {
            int r = this.data.compareTo((Object)o.data);
            if (r == 0) {
                return this.index.compareTo(o.index);
            }
            return r;
        }

        public boolean hasNext() {
            return this.it.hasNext();
        }

        public Tuple next() {
            Tuple nextData = this.it.next();
            if (this.data.compareTo((Object)nextData) > 0) {
                throw new RuntimeException("Out of order!");
            }
            this.data = nextData;
            return this.data;
        }

        public String toString() {
            return String.format("[%s within %d]", this.data, this.index);
        }
    }
}

