/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.master.replication;

import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.WorkAssigner;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DistributedWorkQueueWorkAssigner
implements WorkAssigner {
    private static final Logger log = LoggerFactory.getLogger(DistributedWorkQueueWorkAssigner.class);
    protected Connector conn;
    protected AccumuloConfiguration conf;
    protected DistributedWorkQueue workQueue;
    protected int maxQueueSize;
    protected ZooCache zooCache;

    protected boolean isWorkRequired(Replication.Status status) {
        return StatusUtil.isWorkRequired((Replication.Status)status);
    }

    protected Connector getConnector() {
        return this.conn;
    }

    protected void setConnector(Connector conn) {
        this.conn = conn;
    }

    protected AccumuloConfiguration getConf() {
        return this.conf;
    }

    protected void setConf(AccumuloConfiguration conf) {
        this.conf = conf;
    }

    protected DistributedWorkQueue getWorkQueue() {
        return this.workQueue;
    }

    protected void setWorkQueue(DistributedWorkQueue workQueue) {
        this.workQueue = workQueue;
    }

    protected int getMaxQueueSize() {
        return this.maxQueueSize;
    }

    protected void setMaxQueueSize(int maxQueueSize) {
        this.maxQueueSize = maxQueueSize;
    }

    protected ZooCache getZooCache() {
        return this.zooCache;
    }

    protected void setZooCache(ZooCache zooCache) {
        this.zooCache = zooCache;
    }

    protected void initializeWorkQueue(AccumuloConfiguration conf) {
        this.workQueue = new DistributedWorkQueue(ZooUtil.getRoot((Instance)this.conn.getInstance()) + "/replication/workqueue", conf);
    }

    public void configure(AccumuloConfiguration conf, Connector conn) {
        this.conf = conf;
        this.conn = conn;
    }

    public void assignWork() {
        if (null == this.workQueue) {
            this.initializeWorkQueue(this.conf);
        }
        this.initializeQueuedWork();
        if (null == this.zooCache) {
            this.zooCache = new ZooCache();
        }
        this.maxQueueSize = this.conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
        this.createWork();
        this.cleanupFinishedWork();
    }

    protected void createWork() {
        Scanner s;
        try {
            s = ReplicationTable.getScanner((Connector)this.conn);
        }
        catch (ReplicationTableOfflineException e) {
            return;
        }
        ReplicationSchema.OrderSection.limit((ScannerBase)s);
        Text buffer = new Text();
        for (Map.Entry orderEntry : s) {
            Scanner workScanner;
            if (this.getQueueSize() > this.maxQueueSize) {
                log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", (Object)this.maxQueueSize);
                return;
            }
            String file = ReplicationSchema.OrderSection.getFile((Key)((Key)orderEntry.getKey()), (Text)buffer);
            ReplicationSchema.OrderSection.getTableId((Key)((Key)orderEntry.getKey()), (Text)buffer);
            String sourceTableId = buffer.toString();
            log.info("Determining if {} from {} needs to be replicated", (Object)file, (Object)sourceTableId);
            try {
                workScanner = ReplicationTable.getScanner((Connector)this.conn);
            }
            catch (ReplicationTableOfflineException e) {
                log.warn("Replication table is offline. Will retry...");
                UtilWaitThread.sleep((long)5000L);
                return;
            }
            ReplicationSchema.WorkSection.limit((ScannerBase)workScanner);
            workScanner.setRange(Range.exact((CharSequence)file));
            int newReplicationTasksSubmitted = 0;
            int workEntriesRead = 0;
            for (Map.Entry workEntry : workScanner) {
                Replication.Status status;
                ++workEntriesRead;
                try {
                    status = StatusUtil.fromValue((Value)((Value)workEntry.getValue()));
                }
                catch (InvalidProtocolBufferException e) {
                    log.warn("Could not deserialize protobuf from work entry for {} to {}, will retry", new Object[]{file, ReplicationTarget.from((Text)((Key)workEntry.getKey()).getColumnQualifier()), e});
                    continue;
                }
                ReplicationTarget target = ReplicationSchema.WorkSection.getTarget((Key)((Key)workEntry.getKey()), (Text)buffer);
                Set<String> keysBeingReplicated = this.getQueuedWork(target);
                Path p = new Path(file);
                String filename = p.getName();
                String key = DistributedWorkQueueWorkAssignerHelper.getQueueKey((String)filename, (ReplicationTarget)target);
                if (!this.shouldQueueWork(target)) {
                    if (this.isWorkRequired(status) || !keysBeingReplicated.contains(key)) continue;
                    log.debug("Removing {} from replication state to {} because replication is complete", (Object)key, (Object)target.getPeerName());
                    this.removeQueuedWork(target, key);
                    continue;
                }
                if (this.isWorkRequired(status)) {
                    if (!this.queueWork(p, target)) continue;
                    ++newReplicationTasksSubmitted;
                    continue;
                }
                log.debug("Not queueing work for {} to {} because {} doesn't need replication", new Object[]{file, target, ProtobufUtil.toString((GeneratedMessage)status)});
                if (!keysBeingReplicated.contains(key)) continue;
                log.debug("Removing {} from replication state to {} because replication is complete", (Object)key, (Object)target.getPeerName());
                this.removeQueuedWork(target, key);
            }
            log.debug("Read {} replication entries from the WorkSection of the replication table", (Object)workEntriesRead);
            log.info("Assigned {} replication work entries for {}", (Object)newReplicationTasksSubmitted, (Object)file);
        }
    }

    protected abstract boolean shouldQueueWork(ReplicationTarget var1);

    protected abstract int getQueueSize();

    protected abstract void initializeQueuedWork();

    protected abstract boolean queueWork(Path var1, ReplicationTarget var2);

    protected abstract Set<String> getQueuedWork(ReplicationTarget var1);

    protected abstract void removeQueuedWork(ReplicationTarget var1, String var2);

    protected abstract void cleanupFinishedWork();
}

