/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.master.replication;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
import org.apache.hadoop.fs.Path;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SequentialWorkAssigner
extends DistributedWorkQueueWorkAssigner {
    private static final Logger log = LoggerFactory.getLogger(SequentialWorkAssigner.class);
    private static final String NAME = "Sequential Work Assigner";
    private Map<String, Map<TableId, String>> queuedWorkByPeerName;

    public SequentialWorkAssigner() {
    }

    public SequentialWorkAssigner(AccumuloConfiguration conf, AccumuloClient client) {
        this.configure(conf, client);
    }

    public String getName() {
        return NAME;
    }

    protected void setQueuedWork(Map<String, Map<TableId, String>> queuedWork) {
        this.queuedWorkByPeerName = queuedWork;
    }

    @Override
    protected void initializeQueuedWork() {
        List existingWork;
        if (this.queuedWorkByPeerName != null) {
            return;
        }
        this.queuedWorkByPeerName = new HashMap<String, Map<TableId, String>>();
        try {
            existingWork = this.workQueue.getWorkQueued();
        }
        catch (InterruptedException | KeeperException e) {
            throw new RuntimeException("Error reading existing queued replication work", e);
        }
        log.info("Restoring replication work queue state from zookeeper");
        for (String work : existingWork) {
            Map.Entry entry = DistributedWorkQueueWorkAssignerHelper.fromQueueKey((String)work);
            String filename = (String)entry.getKey();
            String peerName = ((ReplicationTarget)entry.getValue()).getPeerName();
            TableId sourceTableId = ((ReplicationTarget)entry.getValue()).getSourceTableId();
            log.debug("In progress replication of {} from table with ID {} to peer {}", new Object[]{filename, sourceTableId, peerName});
            Map<TableId, String> replicationForPeer = this.queuedWorkByPeerName.get(peerName);
            if (replicationForPeer == null) {
                replicationForPeer = new HashMap<TableId, String>();
                this.queuedWorkByPeerName.put(peerName, replicationForPeer);
            }
            replicationForPeer.put(sourceTableId, work);
        }
    }

    @Override
    protected void cleanupFinishedWork() {
        Iterator<Map.Entry<String, Map<TableId, String>>> queuedWork = this.queuedWorkByPeerName.entrySet().iterator();
        String instanceId = this.client.instanceOperations().getInstanceID();
        int elementsRemoved = 0;
        while (queuedWork.hasNext()) {
            Map.Entry<String, Map<TableId, String>> workForPeer = queuedWork.next();
            Map<TableId, String> queuedReplication = workForPeer.getValue();
            Iterator<Map.Entry<TableId, String>> iter = queuedReplication.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<TableId, String> entry = iter.next();
                if (this.zooCache.get(ZooUtil.getRoot((String)instanceId) + "/replication/workqueue" + "/" + entry.getValue()) != null) continue;
                log.debug("Removing {} from work assignment state", (Object)entry.getValue());
                iter.remove();
                ++elementsRemoved;
            }
        }
        log.info("Removed {} elements from internal workqueue state because the work was complete", (Object)elementsRemoved);
    }

    @Override
    protected int getQueueSize() {
        return this.queuedWorkByPeerName.size();
    }

    @Override
    protected boolean shouldQueueWork(ReplicationTarget target) {
        Map<TableId, String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
        if (queuedWorkForPeer == null) {
            return true;
        }
        String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
        return queuedWork == null;
    }

    @Override
    protected boolean queueWork(Path path, ReplicationTarget target) {
        String queuedWork;
        String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey((String)path.getName(), (ReplicationTarget)target);
        Map<TableId, String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
        if (workForPeer == null) {
            workForPeer = new HashMap<TableId, String>();
            this.queuedWorkByPeerName.put(target.getPeerName(), workForPeer);
        }
        if ((queuedWork = workForPeer.get(target.getSourceTableId())) == null) {
            try {
                this.workQueue.addWork(queueKey, path.toString());
                workForPeer.put(target.getSourceTableId(), queueKey);
            }
            catch (InterruptedException | KeeperException e) {
                log.warn("Could not queue work for {} to {}", new Object[]{path, target, e});
                return false;
            }
            return true;
        }
        if (queuedWork.startsWith(path.getName())) {
            log.debug("Not re-queueing work for {} as it has already been queued for replication to {}", (Object)path, (Object)target);
            return false;
        }
        log.debug("Not queueing {} for work as {} must be replicated to {} first", new Object[]{path, queuedWork, target.getPeerName()});
        return false;
    }

    @Override
    protected Set<String> getQueuedWork(ReplicationTarget target) {
        Map<TableId, String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
        if (queuedWorkForPeer == null) {
            return Collections.emptySet();
        }
        String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
        if (queuedWork == null) {
            return Collections.emptySet();
        }
        return Collections.singleton(queuedWork);
    }

    @Override
    protected void removeQueuedWork(ReplicationTarget target, String queueKey) {
        Map<TableId, String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
        if (queuedWorkForPeer == null) {
            log.warn("removeQueuedWork called when no work was queued for {}", (Object)target.getPeerName());
            return;
        }
        String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
        if (!queuedWork.equals(queueKey)) {
            log.warn("removeQueuedWork called on {} with differing queueKeys, expected {} but was {}", new Object[]{target, queueKey, queuedWork});
            return;
        }
        queuedWorkForPeer.remove(target.getSourceTableId());
    }
}

