/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.bufferserver.internal;

import com.datatorrent.bufferserver.internal.DataList;
import com.datatorrent.bufferserver.internal.DataListener;
import com.datatorrent.bufferserver.internal.PhysicalNode;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.packet.Tuple;
import com.datatorrent.bufferserver.policy.GiveAll;
import com.datatorrent.bufferserver.policy.Policy;
import com.datatorrent.bufferserver.util.BitVector;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.bufferserver.util.SerializedData;
import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import java.util.Collection;
import java.util.HashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogicalNode
implements DataListener {
    private final String identifier;
    private final String upstream;
    private final String group;
    private final HashSet<PhysicalNode> physicalNodes;
    private final HashSet<BitVector> partitions;
    private final Policy policy = GiveAll.getInstance();
    private final DataList.DataListIterator iterator;
    private final long skipWindowId;
    private long baseSeconds;
    private boolean caughtup;
    boolean ready = true;
    private static final Logger logger = LoggerFactory.getLogger(LogicalNode.class);

    public LogicalNode(String identifier, String upstream, String group, DataList.DataListIterator iterator, long skipWindowId) {
        this.identifier = identifier;
        this.upstream = upstream;
        this.group = group;
        this.physicalNodes = new HashSet();
        this.partitions = new HashSet();
        this.iterator = iterator;
        this.skipWindowId = skipWindowId;
    }

    public String getGroup() {
        return this.group;
    }

    public DataList.DataListIterator getIterator() {
        return this.iterator;
    }

    public void addConnection(AbstractLengthPrependerClient connection) {
        PhysicalNode pn = new PhysicalNode(connection);
        if (!this.physicalNodes.contains(pn)) {
            this.physicalNodes.add(pn);
        }
    }

    public void removeChannel(AbstractLengthPrependerClient client) {
        for (PhysicalNode pn : this.physicalNodes) {
            if (pn.getClient() != client) continue;
            this.physicalNodes.remove(pn);
            break;
        }
    }

    public void addPartition(int partition, int mask) {
        this.partitions.add(new BitVector(partition, mask));
    }

    public boolean isReady() {
        if (!this.ready) {
            this.ready = true;
            for (PhysicalNode pn : this.physicalNodes) {
                if (!pn.isBlocked()) continue;
                this.ready = pn.unblock() & this.ready;
            }
        }
        return this.ready;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void catchUp() {
        this.caughtup = false;
        if (this.isReady()) {
            block13: {
                logger.debug("catching up {}->{}", (Object)this.upstream, (Object)this.group);
                long lBaseSeconds = (long)this.iterator.getBaseSeconds() << 32;
                logger.debug("BaseSeconds = {} and lBaseSeconds = {}", (Object)Codec.getStringWindowId(this.baseSeconds), (Object)Codec.getStringWindowId(lBaseSeconds));
                if (lBaseSeconds > this.baseSeconds) {
                    this.baseSeconds = lBaseSeconds;
                }
                logger.debug("Set the base seconds to {}", (Object)Codec.getStringWindowId(this.baseSeconds));
                int skippedPayloadTuples = 0;
                try {
                    block8: while (this.ready && this.iterator.hasNext()) {
                        SerializedData data = this.iterator.next();
                        switch (data.buffer[data.dataOffset]) {
                            case 1: {
                                ++skippedPayloadTuples;
                                break;
                            }
                            case 2: {
                                Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
                                this.baseSeconds = (long)tuple.getBaseSeconds() << 32;
                                int intervalMillis = tuple.getWindowWidth();
                                if (intervalMillis <= 0) {
                                    logger.warn("Interval value set to non positive value = {}", (Object)intervalMillis);
                                }
                                this.ready = GiveAll.getInstance().distribute(this.physicalNodes, data);
                                break;
                            }
                            case 3: {
                                Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
                                logger.debug("{}->{} condition {} =? {}", new Object[]{this.upstream, this.group, Codec.getStringWindowId(this.baseSeconds | (long)tuple.getWindowId()), Codec.getStringWindowId(this.skipWindowId)});
                                if ((this.baseSeconds | (long)tuple.getWindowId()) <= this.skipWindowId) continue block8;
                                logger.debug("caught up {}->{} skipping {} payload tuples", new Object[]{this.upstream, this.group, skippedPayloadTuples});
                                this.ready = GiveAll.getInstance().distribute(this.physicalNodes, data);
                                this.caughtup = true;
                                break block13;
                            }
                            case 5: 
                            case 10: 
                            case 11: {
                                this.ready = GiveAll.getInstance().distribute(this.physicalNodes, data);
                                logger.debug("Message {} was distributed to {}", (Object)MessageType.valueOf(data.buffer[data.dataOffset]), this.physicalNodes);
                                break;
                            }
                            default: {
                                logger.debug("Message {} was not distributed to {}", (Object)MessageType.valueOf(data.buffer[data.dataOffset]), this.physicalNodes);
                                break;
                            }
                        }
                    }
                }
                catch (InterruptedException ie) {
                    throw new RuntimeException(ie);
                }
            }
            if (this.iterator.hasNext()) {
                this.addedData();
            }
        }
        logger.debug("Exiting catch up because caughtup = {}", (Object)this.caughtup);
    }

    @Override
    public boolean addedData() {
        if (this.isReady()) {
            if (this.caughtup) {
                try {
                    if (this.partitions.isEmpty()) {
                        block12: while (this.ready && this.iterator.hasNext()) {
                            SerializedData data = this.iterator.next();
                            switch (data.buffer[data.dataOffset]) {
                                case 1: {
                                    this.ready = this.policy.distribute(this.physicalNodes, data);
                                    continue block12;
                                }
                                case 0: 
                                case 127: {
                                    continue block12;
                                }
                                case 2: {
                                    int length = data.length - data.dataOffset + data.offset;
                                    Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, length);
                                    this.baseSeconds = (long)resetWindow.getBaseSeconds() << 32;
                                    this.ready = GiveAll.getInstance().distribute(this.physicalNodes, data);
                                    continue block12;
                                }
                            }
                            this.ready = GiveAll.getInstance().distribute(this.physicalNodes, data);
                        }
                    }
                    block13: while (this.ready && this.iterator.hasNext()) {
                        SerializedData data = this.iterator.next();
                        int length = data.length - data.dataOffset + data.offset;
                        block6 : switch (data.buffer[data.dataOffset]) {
                            case 1: {
                                Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, length);
                                int value = tuple.getPartition();
                                for (BitVector bv : this.partitions) {
                                    if (!bv.matches(value)) continue;
                                    this.ready = this.policy.distribute(this.physicalNodes, data);
                                    break block6;
                                }
                                continue block13;
                            }
                            case 0: 
                            case 127: {
                                break;
                            }
                            case 2: {
                                Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, length);
                                this.baseSeconds = (long)tuple.getBaseSeconds() << 32;
                                this.ready = GiveAll.getInstance().distribute(this.physicalNodes, data);
                                break;
                            }
                            default: {
                                this.ready = GiveAll.getInstance().distribute(this.physicalNodes, data);
                            }
                        }
                    }
                }
                catch (InterruptedException ie) {
                    throw new RuntimeException(ie);
                }
            } else {
                this.catchUp();
            }
        }
        return !this.ready;
    }

    @Override
    public int getPartitions(Collection<BitVector> partitions) {
        partitions.addAll(this.partitions);
        return partitions.size();
    }

    public final int getPhysicalNodeCount() {
        return this.physicalNodes.size();
    }

    public String getUpstream() {
        return this.upstream;
    }

    public String getIdentifier() {
        return this.identifier;
    }

    public void boot(EventLoop eventloop) {
        for (PhysicalNode pn : this.physicalNodes) {
            eventloop.disconnect((Listener.ClientListener)pn.getClient());
        }
        this.physicalNodes.clear();
    }

    public String toString() {
        return "LogicalNode@" + Integer.toHexString(this.hashCode()) + "identifier=" + this.identifier + ", upstream=" + this.upstream + ", group=" + this.group + ", partitions=" + this.partitions + ", iterator=" + this.iterator + '}';
    }
}

