/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.master.replication;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkMaker {
    private static final Logger log = LoggerFactory.getLogger(WorkMaker.class);
    private final AccumuloServerContext context;
    private Connector conn;
    private BatchWriter writer;

    public WorkMaker(AccumuloServerContext context, Connector conn) {
        this.context = context;
        this.conn = conn;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        if (!ReplicationTable.isOnline((Connector)this.conn)) {
            log.debug("Replication table is not yet online");
            return;
        }
        Span span = Trace.start((String)"replicationWorkMaker");
        try {
            Scanner s;
            try {
                s = ReplicationTable.getScanner((Connector)this.conn);
                if (null == this.writer) {
                    this.setBatchWriter(ReplicationTable.getBatchWriter((Connector)this.conn));
                }
            }
            catch (ReplicationTableOfflineException e) {
                log.warn("Replication table was online, but not anymore");
                this.writer = null;
                span.stop();
                return;
            }
            ReplicationSchema.StatusSection.limit((ScannerBase)s);
            Text file = new Text();
            for (Map.Entry entry : s) {
                Replication.Status status;
                ReplicationSchema.StatusSection.getFile((Key)((Key)entry.getKey()), (Text)file);
                String tableId = ReplicationSchema.StatusSection.getTableId((Key)((Key)entry.getKey()));
                log.debug("Processing replication status record for " + file + " on table " + tableId);
                try {
                    status = Replication.Status.parseFrom((byte[])((Value)entry.getValue()).get());
                }
                catch (InvalidProtocolBufferException e) {
                    log.error("Could not parse protobuf for {} from table {}", (Object)file, (Object)tableId);
                    continue;
                }
                if (!this.shouldCreateWork(status)) {
                    log.debug("Not creating work: " + status.toString());
                    continue;
                }
                TableConfiguration tableConf = this.context.getServerConfigurationFactory().getTableConfiguration(tableId);
                if (null == tableConf) continue;
                Map<String, String> replicationTargets = this.getReplicationTargets(tableConf);
                if (!replicationTargets.isEmpty()) {
                    Span workSpan = Trace.start((String)"createWorkMutations");
                    try {
                        this.addWorkRecord(file, (Value)entry.getValue(), replicationTargets, tableId);
                        continue;
                    }
                    finally {
                        workSpan.stop();
                        continue;
                    }
                }
                log.warn("No configured targets for table with ID {}", (Object)tableId);
            }
        }
        finally {
            span.stop();
        }
    }

    protected void setBatchWriter(BatchWriter bw) {
        this.writer = bw;
    }

    protected Map<String, String> getReplicationTargets(TableConfiguration tableConf) {
        Map props = tableConf.getAllPropertiesWithPrefix(Property.TABLE_REPLICATION_TARGET);
        HashMap<String, String> targets = new HashMap<String, String>();
        int propKeyLength = Property.TABLE_REPLICATION_TARGET.getKey().length();
        for (Map.Entry prop : props.entrySet()) {
            targets.put(((String)prop.getKey()).substring(propKeyLength), (String)prop.getValue());
        }
        return targets;
    }

    protected boolean shouldCreateWork(Replication.Status status) {
        return StatusUtil.isWorkRequired((Replication.Status)status);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addWorkRecord(Text file, Value v, Map<String, String> targets, String sourceTableId) {
        log.info("Adding work records for " + file + " to targets " + targets);
        try {
            Mutation m = new Mutation(file);
            ReplicationTarget target = new ReplicationTarget();
            DataOutputBuffer buffer = new DataOutputBuffer();
            Text t = new Text();
            for (Map.Entry<String, String> entry : targets.entrySet()) {
                buffer.reset();
                target.setPeerName(entry.getKey());
                target.setRemoteIdentifier(entry.getValue());
                target.setSourceTableId(sourceTableId);
                target.write((DataOutput)buffer);
                t.set(buffer.getData(), 0, buffer.getLength());
                ReplicationSchema.WorkSection.add((Mutation)m, (Text)t, (Value)v);
            }
            try {
                this.writer.addMutation(m);
            }
            catch (MutationsRejectedException e) {
                log.warn("Failed to write work mutations for replication, will retry", (Throwable)e);
            }
        }
        catch (IOException e) {
            log.warn("Failed to serialize data to Text, will retry", (Throwable)e);
        }
        finally {
            try {
                this.writer.flush();
            }
            catch (MutationsRejectedException e) {
                log.warn("Failed to write work mutations for replication, will retry", (Throwable)e);
            }
        }
    }
}

