/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.partitioner;

import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StatsAwareStatelessPartitioner<T extends Operator>
implements StatsListener,
Partitioner<T>,
Serializable {
    private static final Logger logger = LoggerFactory.getLogger(StatsAwareStatelessPartitioner.class);
    private static final long serialVersionUID = 201504021522L;
    private long cooldownMillis = 2000L;
    private long nextMillis;
    private long partitionNextMillis;
    private boolean repartition;
    private transient HashMap<Integer, StatsListener.BatchedOperatorStats> partitionedInstanceStatus = new HashMap();
    @Min(value=1L)
    private int initialPartitionCount = 1;

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        this.partitionedInstanceStatus = new HashMap();
    }

    public StatsAwareStatelessPartitioner() {
    }

    public StatsAwareStatelessPartitioner(String value) {
        this(Integer.parseInt(value));
    }

    public StatsAwareStatelessPartitioner(int initialPartitionCount) {
        this.initialPartitionCount = initialPartitionCount;
    }

    public void setInitialPartitionCount(int initialPartitionCount) {
        this.initialPartitionCount = initialPartitionCount;
    }

    public int getInitialPartitionCount() {
        return this.initialPartitionCount;
    }

    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats) {
        StatsListener.Response response = new StatsListener.Response();
        response.repartitionRequired = false;
        if (!this.partitionedInstanceStatus.containsKey(stats.getOperatorId())) {
            return response;
        }
        this.partitionedInstanceStatus.put(stats.getOperatorId(), stats);
        if (this.getLoad(stats) != 0) {
            if (this.repartition && System.currentTimeMillis() > this.nextMillis) {
                this.repartition = false;
                response.repartitionRequired = true;
                logger.debug("setting repartition to true");
            } else if (!this.repartition) {
                this.repartition = true;
                this.nextMillis = System.currentTimeMillis() + this.cooldownMillis;
            }
        } else {
            this.repartition = false;
        }
        return response;
    }

    public Collection<Partitioner.Partition<T>> definePartitions(Collection<Partitioner.Partition<T>> partitions, Partitioner.PartitioningContext context) {
        if (this.partitionedInstanceStatus == null || this.partitionedInstanceStatus.isEmpty()) {
            if (this.partitionedInstanceStatus == null) {
                this.partitionedInstanceStatus = new HashMap();
            }
            this.nextMillis = this.partitionNextMillis = System.currentTimeMillis() + 2L * this.cooldownMillis;
            return new StatelessPartitioner(this.initialPartitionCount).definePartitions(partitions, context);
        }
        logger.debug("repartition call for operator");
        if (System.currentTimeMillis() < this.partitionNextMillis) {
            return partitions;
        }
        ArrayList<Partitioner.Partition<T>> newPartitions = new ArrayList<Partitioner.Partition<T>>();
        HashMap<Integer, Partitioner.Partition<T>> lowLoadPartitions = new HashMap<Integer, Partitioner.Partition<T>>();
        for (Partitioner.Partition<T> p : partitions) {
            int load = this.getLoad(p.getStats());
            if (load < 0) {
                Partitioner.PartitionKeys pks = (Partitioner.PartitionKeys)p.getPartitionKeys().values().iterator().next();
                Iterator i$ = pks.partitions.iterator();
                while (i$.hasNext()) {
                    int partitionKey = (Integer)i$.next();
                    int reducedMask = pks.mask >>> 1;
                    String lookupKey = Integer.valueOf(reducedMask) + "-" + Integer.valueOf(partitionKey & reducedMask);
                    logger.debug("pks {} lookupKey {}", (Object)pks, (Object)lookupKey);
                    Partitioner.Partition siblingPartition = (Partitioner.Partition)lowLoadPartitions.remove(partitionKey & reducedMask);
                    if (siblingPartition == null) {
                        lowLoadPartitions.put(partitionKey & reducedMask, p);
                        continue;
                    }
                    Partitioner.PartitionKeys newPks = new Partitioner.PartitionKeys(reducedMask, (Set)Sets.newHashSet((Object[])new Integer[]{partitionKey & reducedMask}));
                    Operator.InputPort port = (Operator.InputPort)siblingPartition.getPartitionKeys().keySet().iterator().next();
                    siblingPartition.getPartitionKeys().put(port, newPks);
                    newPartitions.add(siblingPartition);
                }
                continue;
            }
            if (load > 0) {
                Set newKeys;
                int newMask;
                Map keys = p.getPartitionKeys();
                Map.Entry e = keys.entrySet().iterator().next();
                if (((Partitioner.PartitionKeys)e.getValue()).partitions.size() == 1) {
                    newMask = ((Partitioner.PartitionKeys)e.getValue()).mask << 1 | 1;
                    int key = (Integer)((Partitioner.PartitionKeys)e.getValue()).partitions.iterator().next();
                    int key2 = newMask ^ ((Partitioner.PartitionKeys)e.getValue()).mask | key;
                    newKeys = Sets.newHashSet((Object[])new Integer[]{key, key2});
                } else {
                    newMask = ((Partitioner.PartitionKeys)e.getValue()).mask;
                    newKeys = ((Partitioner.PartitionKeys)e.getValue()).partitions;
                }
                Iterator i$ = newKeys.iterator();
                while (i$.hasNext()) {
                    int key = (Integer)i$.next();
                    DefaultPartition newPartition = new DefaultPartition(p.getPartitionedInstance());
                    newPartition.getPartitionKeys().put(e.getKey(), new Partitioner.PartitionKeys(newMask, (Set)Sets.newHashSet((Object[])new Integer[]{key})));
                    newPartitions.add((Partitioner.Partition<T>)newPartition);
                }
                continue;
            }
            newPartitions.add(p);
        }
        newPartitions.addAll(lowLoadPartitions.values());
        this.partitionNextMillis = System.currentTimeMillis() + this.cooldownMillis;
        return newPartitions;
    }

    public void partitioned(Map<Integer, Partitioner.Partition<T>> partitions) {
        logger.debug("Partitioned Map: {}", partitions);
        this.partitionedInstanceStatus.clear();
        for (Map.Entry<Integer, Partitioner.Partition<T>> entry : partitions.entrySet()) {
            if (this.partitionedInstanceStatus.containsKey(entry.getKey())) continue;
            this.partitionedInstanceStatus.put(entry.getKey(), null);
        }
    }

    protected abstract int getLoad(StatsListener.BatchedOperatorStats var1);

    public long getCooldownMillis() {
        return this.cooldownMillis;
    }

    public void setCooldownMillis(long cooldownMillis) {
        this.cooldownMillis = cooldownMillis;
    }
}

