/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.tserver.replication;

import com.google.protobuf.GeneratedMessage;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.impl.ClientExecReturn;
import org.apache.accumulo.core.client.impl.Credentials;
import org.apache.accumulo.core.client.impl.ReplicationClient;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.replication.thrift.KeyValues;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.replication.thrift.WalEdits;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.trace.ProbabilitySampler;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.replication.ReplicaSystem;
import org.apache.accumulo.server.replication.ReplicaSystemHelper;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.htrace.Sampler;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccumuloReplicaSystem
implements ReplicaSystem {
    private static final Logger log = LoggerFactory.getLogger(AccumuloReplicaSystem.class);
    private static final String RFILE_SUFFIX = ".rf";
    private String instanceName;
    private String zookeepers;
    private AccumuloConfiguration conf;
    private VolumeManager fs;

    protected String getInstanceName() {
        return this.instanceName;
    }

    protected void setInstanceName(String instanceName) {
        this.instanceName = instanceName;
    }

    protected String getZookeepers() {
        return this.zookeepers;
    }

    protected void setZookeepers(String zookeepers) {
        this.zookeepers = zookeepers;
    }

    protected AccumuloConfiguration getConf() {
        return this.conf;
    }

    protected void setConf(AccumuloConfiguration conf) {
        this.conf = conf;
    }

    protected VolumeManager getFs() {
        return this.fs;
    }

    protected void setFs(VolumeManager fs) {
        this.fs = fs;
    }

    public static String buildConfiguration(String instanceName, String zookeepers) {
        return instanceName + "," + zookeepers;
    }

    public void configure(String configuration) {
        Objects.requireNonNull(configuration);
        int index = configuration.indexOf(44);
        if (-1 == index) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            throw new IllegalArgumentException("Expected comma in configuration string");
        }
        this.instanceName = configuration.substring(0, index);
        this.zookeepers = configuration.substring(index + 1);
        this.conf = new ServerConfigurationFactory(HdfsZooInstance.getInstance()).getConfiguration();
        try {
            this.fs = VolumeManagerImpl.get((AccumuloConfiguration)this.conf);
        }
        catch (IOException e) {
            log.error("Could not connect to filesystem", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public Replication.Status replicate(final Path p, final Replication.Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
        String password;
        File keytab;
        Instance localInstance = HdfsZooInstance.getInstance();
        final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration();
        log.debug("Replication RPC timeout is {}", (Object)localConf.get(Property.REPLICATION_RPC_TIMEOUT.getKey()));
        final String principal = this.getPrincipal(localConf, target);
        if (localConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
            String keytabPath = this.getKeytab(localConf, target);
            keytab = new File(keytabPath);
            if (!keytab.exists() || !keytab.isFile()) {
                log.error("{} is not a regular file. Cannot login to replicate", (Object)keytabPath);
                return status;
            }
            password = null;
        } else {
            keytab = null;
            password = this.getPassword(localConf, target);
        }
        if (null != keytab) {
            try {
                final UserGroupInformation accumuloUgi = UserGroupInformation.getCurrentUser();
                UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI((String)principal, (String)keytab.getAbsolutePath());
                return (Replication.Status)ugi.doAs((PrivilegedAction)new PrivilegedAction<Replication.Status>(){

                    @Override
                    public Replication.Status run() {
                        KerberosToken token;
                        try {
                            token = new KerberosToken(principal, keytab);
                        }
                        catch (IOException e) {
                            log.error("Failed to create KerberosToken", (Throwable)e);
                            return status;
                        }
                        ClientContext peerContext = AccumuloReplicaSystem.this.getContextForPeer(localConf, target, principal, (AuthenticationToken)token);
                        return AccumuloReplicaSystem.this._replicate(p, status, target, helper, localConf, peerContext, accumuloUgi);
                    }
                });
            }
            catch (IOException e) {
                log.error("Failed to perform local login", (Throwable)e);
                return status;
            }
        }
        PasswordToken token = new PasswordToken((CharSequence)password);
        ClientContext peerContext = this.getContextForPeer(localConf, target, principal, (AuthenticationToken)token);
        return this._replicate(p, status, target, helper, localConf, peerContext, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Replication.Status _replicate(Path p, Replication.Status status, ReplicationTarget target, ReplicaSystemHelper helper, AccumuloConfiguration localConf, final ClientContext peerContext, UserGroupInformation accumuloUgi) {
        try {
            double tracePercent = localConf.getFraction(Property.REPLICATION_TRACE_PERCENT);
            ProbabilitySampler sampler = new ProbabilitySampler(tracePercent);
            Trace.on((String)"AccumuloReplicaSystem", (Sampler)sampler);
            final String remoteTableId = target.getRemoteIdentifier();
            int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
            for (int i = 0; i < numAttempts; ++i) {
                String peerTserverStr;
                log.debug("Attempt {}", (Object)i);
                log.debug("Fetching peer tserver address");
                Span span = Trace.start((String)"Fetch peer tserver");
                try {
                    peerTserverStr = (String)ReplicationClient.executeCoordinatorWithReturn((ClientContext)peerContext, (ClientExecReturn)new ClientExecReturn<String, ReplicationCoordinator.Client>(){

                        public String execute(ReplicationCoordinator.Client client) throws Exception {
                            return client.getServicerAddress(remoteTableId, peerContext.rpcCreds());
                        }
                    });
                }
                catch (AccumuloException | AccumuloSecurityException e) {
                    log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", (Object)target, (Object)e);
                    continue;
                }
                finally {
                    span.stop();
                }
                if (null == peerTserverStr) {
                    log.warn("Did not receive tserver from master at {}, cannot proceed with replication. Will retry.", (Object)target);
                    continue;
                }
                HostAndPort peerTserver = HostAndPort.fromString((String)peerTserverStr);
                long timeout = localConf.getTimeInMillis(Property.REPLICATION_RPC_TIMEOUT);
                long sizeLimit = this.conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
                try {
                    Replication.Status finalStatus;
                    if (p.getName().endsWith(RFILE_SUFFIX)) {
                        span = Trace.start((String)"RFile replication");
                        try {
                            finalStatus = this.replicateRFiles(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, timeout);
                        }
                        finally {
                            span.stop();
                        }
                    }
                    span = Trace.start((String)"WAL replication");
                    try {
                        finalStatus = this.replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi, timeout);
                    }
                    finally {
                        span.stop();
                    }
                    log.debug("New status for {} after replicating to {} is {}", new Object[]{p, peerContext.getInstance(), ProtobufUtil.toString((GeneratedMessage)finalStatus)});
                    Replication.Status status2 = finalStatus;
                    return status2;
                }
                catch (AccumuloException | AccumuloSecurityException | TTransportException e) {
                    log.warn("Could not connect to remote server {}, will retry", (Object)peerTserverStr, (Object)e);
                    UtilWaitThread.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
                    continue;
                }
            }
            log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", (Object)numAttempts, (Object)p);
            Replication.Status status3 = status;
            return status3;
        }
        finally {
            Trace.off();
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected Replication.Status replicateRFiles(ClientContext peerContext, HostAndPort peerTserver, ReplicationTarget target, Path p, Replication.Status status, long sizeLimit, String remoteTableId, TCredentials tcreds, ReplicaSystemHelper helper, long timeout) throws TTransportException, AccumuloException, AccumuloSecurityException {
        try (DataInputStream input = this.getRFileInputStream(p);){
            Replication.Status lastStatus = status;
            Replication.Status currentStatus = status;
            while (true) {
                ReplicationStats replResult = (ReplicationStats)ReplicationClient.executeServicerWithReturn((ClientContext)peerContext, (HostAndPort)peerTserver, (ClientExecReturn)new RFileClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds), (long)timeout);
                long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
                if (newBegin < 0L) {
                    newBegin = Long.MAX_VALUE;
                }
                currentStatus = Replication.Status.newBuilder((Replication.Status)currentStatus).setBegin(newBegin).build();
                log.debug("Sent batch for replication of {} to {}, with new Status {}", new Object[]{p, target, ProtobufUtil.toString((GeneratedMessage)currentStatus)});
                if (currentStatus.equals((Object)lastStatus)) break;
                if (!StatusUtil.isWorkRequired((Replication.Status)currentStatus)) {
                    Replication.Status status2 = currentStatus;
                    return status2;
                }
                lastStatus = currentStatus;
            }
            log.debug("Did not replicate any new data for {} to {}, (state was {})", new Object[]{p, target, ProtobufUtil.toString((GeneratedMessage)lastStatus)});
            Replication.Status status3 = status;
            return status3;
        }
        catch (IOException e) {
            log.error("Could not create input stream from RFile, will retry", (Throwable)e);
            return status;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected Replication.Status replicateLogs(ClientContext peerContext, HostAndPort peerTserver, final ReplicationTarget target, final Path p, Replication.Status status, long sizeLimit, String remoteTableId, TCredentials tcreds, final ReplicaSystemHelper helper, UserGroupInformation accumuloUgi, long timeout) throws TTransportException, AccumuloException, AccumuloSecurityException {
        log.debug("Replication WAL to peer tserver");
        try {
            FSDataInputStream fsinput = this.fs.open(p);
            try {
                DataInputStream input = this.getWalStream(p, fsinput);
                try {
                    Replication.Status status2;
                    Set<Integer> tids;
                    log.debug("Skipping unwanted data in WAL");
                    Span span = Trace.start((String)"Consume WAL prefix");
                    span.data("file", p.toString());
                    try {
                        tids = this.consumeWalPrefix(target, input, p, status, sizeLimit);
                    }
                    catch (IOException e) {
                        log.warn("Unexpected error consuming file.");
                        Replication.Status status3 = status;
                        if (input != null) {
                            input.close();
                        }
                        if (fsinput == null) return status3;
                        fsinput.close();
                        return status3;
                    }
                    finally {
                        span.stop();
                    }
                    log.debug("Sending batches of data to peer tserver");
                    Replication.Status lastStatus = status;
                    Replication.Status currentStatus = status;
                    final AtomicReference exceptionRef = new AtomicReference();
                    while (true) {
                        ReplicationStats replResult;
                        span = Trace.start((String)"Replicate WAL batch");
                        span.data("Batch size (bytes)", Long.toString(sizeLimit));
                        span.data("File", p.toString());
                        span.data("Peer instance name", peerContext.getInstance().getInstanceName());
                        span.data("Peer tserver", peerTserver.toString());
                        span.data("Remote table ID", remoteTableId);
                        try {
                            replResult = (ReplicationStats)ReplicationClient.executeServicerWithReturn((ClientContext)peerContext, (HostAndPort)peerTserver, (ClientExecReturn)new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids), (long)timeout);
                        }
                        catch (Exception e) {
                            log.error("Caught exception replicating data to {} at {}", new Object[]{peerContext.getInstance().getInstanceName(), peerTserver, e});
                            throw e;
                        }
                        finally {
                            span.stop();
                        }
                        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
                        if (newBegin < 0L) {
                            newBegin = Long.MAX_VALUE;
                        }
                        currentStatus = Replication.Status.newBuilder((Replication.Status)currentStatus).setBegin(newBegin).build();
                        log.debug("Sent batch for replication of {} to {}, with new Status {}", new Object[]{p, target, ProtobufUtil.toString((GeneratedMessage)currentStatus)});
                        if (currentStatus.equals((Object)lastStatus)) break;
                        span = Trace.start((String)"Update replication table");
                        try {
                            if (null != accumuloUgi) {
                                final Replication.Status copy = currentStatus;
                                accumuloUgi.doAs((PrivilegedAction)new PrivilegedAction<Void>(){

                                    @Override
                                    public Void run() {
                                        try {
                                            helper.recordNewStatus(p, copy, target);
                                        }
                                        catch (Exception e) {
                                            exceptionRef.set(e);
                                        }
                                        return null;
                                    }
                                });
                                Exception e = (Exception)exceptionRef.get();
                                if (null != e) {
                                    if (e instanceof TableNotFoundException) {
                                        throw (TableNotFoundException)((Object)e);
                                    }
                                    if (e instanceof AccumuloSecurityException) {
                                        throw (AccumuloSecurityException)((Object)e);
                                    }
                                    if (!(e instanceof AccumuloException)) throw new RuntimeException("Received unexpected exception", e);
                                    throw (AccumuloException)((Object)e);
                                }
                            } else {
                                helper.recordNewStatus(p, currentStatus, target);
                            }
                        }
                        catch (TableNotFoundException e) {
                            log.error("Tried to update status in replication table for {} as {}, but the table did not exist", new Object[]{p, ProtobufUtil.toString((GeneratedMessage)currentStatus), e});
                            throw new RuntimeException("Replication table did not exist, will retry", e);
                        }
                        finally {
                            span.stop();
                        }
                        log.debug("Recorded updated status for {}: {}", (Object)p, (Object)ProtobufUtil.toString((GeneratedMessage)currentStatus));
                        if (!StatusUtil.isWorkRequired((Replication.Status)currentStatus)) {
                            status2 = currentStatus;
                            return status2;
                        }
                        lastStatus = currentStatus;
                    }
                    log.debug("Did not replicate any new data for {} to {}, (state was {})", new Object[]{p, target, ProtobufUtil.toString((GeneratedMessage)lastStatus)});
                    status2 = status;
                    return status2;
                }
                finally {
                    if (input != null) {
                        try {
                            input.close();
                        }
                        catch (Throwable lastStatus) {
                            Throwable span;
                            span.addSuppressed(lastStatus);
                        }
                    }
                }
            }
            finally {
                if (fsinput != null) {
                    try {
                        fsinput.close();
                    }
                    catch (Throwable span) {
                        Throwable input;
                        input.addSuppressed(span);
                    }
                }
            }
        }
        catch (DfsLogger.LogHeaderIncompleteException e) {
            log.warn("Could not read header from {}, assuming that there is no data present in the WAL, therefore replication is complete", (Object)p);
            Replication.Status newStatus = status.getInfiniteEnd() ? Replication.Status.newBuilder((Replication.Status)status).setBegin(Long.MAX_VALUE).build() : Replication.Status.newBuilder((Replication.Status)status).setBegin(status.getEnd()).build();
            Span span = Trace.start((String)"Update replication table");
            try {
                helper.recordNewStatus(p, newStatus, target);
                return newStatus;
            }
            catch (TableNotFoundException tnfe) {
                log.error("Tried to update status in replication table for {} as {}, but the table did not exist", new Object[]{p, ProtobufUtil.toString((GeneratedMessage)newStatus), e});
                throw new RuntimeException("Replication table did not exist, will retry", e);
            }
            finally {
                span.stop();
            }
        }
        catch (IOException e) {
            log.error("Could not create stream for WAL", (Throwable)e);
            return status;
        }
    }

    protected String getPassword(AccumuloConfiguration localConf, ReplicationTarget target) {
        Objects.requireNonNull(localConf);
        Objects.requireNonNull(target);
        Map peerPasswords = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD);
        String password = (String)peerPasswords.get(Property.REPLICATION_PEER_PASSWORD.getKey() + target.getPeerName());
        if (null == password) {
            throw new IllegalArgumentException("Cannot get password for " + target.getPeerName());
        }
        return password;
    }

    protected String getKeytab(AccumuloConfiguration localConf, ReplicationTarget target) {
        Objects.requireNonNull(localConf);
        Objects.requireNonNull(target);
        Map peerKeytabs = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_KEYTAB);
        String keytab = (String)peerKeytabs.get(Property.REPLICATION_PEER_KEYTAB.getKey() + target.getPeerName());
        if (null == keytab) {
            throw new IllegalArgumentException("Cannot get keytab for " + target.getPeerName());
        }
        return keytab;
    }

    protected String getPrincipal(AccumuloConfiguration localConf, ReplicationTarget target) {
        Objects.requireNonNull(localConf);
        Objects.requireNonNull(target);
        String peerName = target.getPeerName();
        String userKey = Property.REPLICATION_PEER_USER.getKey() + peerName;
        Map peerUsers = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_USER);
        String user = (String)peerUsers.get(userKey);
        if (null == user) {
            throw new IllegalArgumentException("Cannot get user for " + target.getPeerName());
        }
        return user;
    }

    protected ClientContext getContextForPeer(AccumuloConfiguration localConf, ReplicationTarget target, String principal, AuthenticationToken token) {
        Objects.requireNonNull(localConf);
        Objects.requireNonNull(target);
        Objects.requireNonNull(principal);
        Objects.requireNonNull(token);
        return new ClientContext(this.getPeerInstance(target), new Credentials(principal, token), localConf);
    }

    protected Instance getPeerInstance(ReplicationTarget target) {
        return new ZooKeeperInstance(this.instanceName, this.zookeepers);
    }

    protected RFileReplication getKeyValues(ReplicationTarget target, DataInputStream input, Path p, Replication.Status status, long sizeLimit) {
        throw new UnsupportedOperationException();
    }

    protected Set<Integer> consumeWalPrefix(ReplicationTarget target, DataInputStream wal, Path p, Replication.Status status, long sizeLimit) throws IOException {
        HashSet<Integer> tids = new HashSet<Integer>();
        LogFileKey key = new LogFileKey();
        LogFileValue value = new LogFileValue();
        HashSet<Integer> desiredTids = new HashSet<Integer>();
        block3: for (long i = 0L; i < status.getBegin(); ++i) {
            key.readFields(wal);
            value.readFields(wal);
            switch (key.event) {
                case DEFINE_TABLET: {
                    if (!target.getSourceTableId().equals(key.tablet.getTableId())) continue block3;
                    desiredTids.add(key.tabletId);
                    continue block3;
                }
            }
        }
        return tids;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public DataInputStream getWalStream(Path p, FSDataInputStream input) throws IOException {
        Span span = Trace.start((String)"Read WAL header");
        span.data("file", p.toString());
        try {
            DfsLogger.DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(input, this.conf);
            DataInputStream dataInputStream = streams.getDecryptingInputStream();
            return dataInputStream;
        }
        finally {
            span.stop();
        }
    }

    protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Replication.Status status, long sizeLimit, Set<Integer> desiredTids) throws IOException {
        WalEdits edits = new WalEdits();
        edits.edits = new ArrayList();
        long size = 0L;
        long entriesConsumed = 0L;
        long numUpdates = 0L;
        LogFileKey key = new LogFileKey();
        LogFileValue value = new LogFileValue();
        block6: while (size < sizeLimit) {
            try {
                key.readFields(wal);
                value.readFields(wal);
            }
            catch (EOFException e) {
                log.debug("Caught EOFException reading {}", (Object)p);
                if (!status.getInfiniteEnd() || !status.getClosed()) break;
                log.debug("{} is closed and has unknown length, assuming entire file has been consumed", (Object)p);
                entriesConsumed = Long.MAX_VALUE;
                break;
            }
            ++entriesConsumed;
            switch (key.event) {
                case DEFINE_TABLET: {
                    if (!target.getSourceTableId().equals(key.tablet.getTableId())) continue block6;
                    desiredTids.add(key.tabletId);
                    continue block6;
                }
                case MUTATION: 
                case MANY_MUTATIONS: {
                    if (!desiredTids.contains(key.tabletId)) continue block6;
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    DataOutputStream out = new DataOutputStream(baos);
                    key.write(out);
                    numUpdates += this.writeValueAvoidingReplicationCycles(out, value, target);
                    out.flush();
                    byte[] data = baos.toByteArray();
                    size += (long)data.length;
                    edits.addToEdits(ByteBuffer.wrap(data));
                    continue block6;
                }
            }
            log.trace("Ignorning WAL entry which doesn't contain mutations, should not have received such entries");
        }
        return new WalReplication(edits, size, entriesConsumed, numUpdates);
    }

    protected long writeValueAvoidingReplicationCycles(DataOutputStream out, LogFileValue value, ReplicationTarget target) throws IOException {
        String name;
        int mutationsToSend = 0;
        for (Mutation m : value.mutations) {
            if (m.getReplicationSources().contains(target.getPeerName())) continue;
            ++mutationsToSend;
        }
        int mutationsRemoved = value.mutations.size() - mutationsToSend;
        if (mutationsRemoved > 0) {
            log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", (Object)mutationsRemoved, (Object)target.getPeerName());
        }
        if (StringUtils.isBlank((String)(name = this.conf.get(Property.REPLICATION_NAME)))) {
            throw new IllegalArgumentException("Local system has no replication name configured");
        }
        out.writeInt(mutationsToSend);
        for (Mutation m : value.mutations) {
            if (m.getReplicationSources().contains(target.getPeerName())) continue;
            m.addReplicationSource(name);
            m.write((DataOutput)out);
        }
        return mutationsToSend;
    }

    protected DataInputStream getRFileInputStream(Path p) throws IOException {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public static class WalReplication
    extends ReplicationStats {
        public WalEdits walEdits;
        public long numUpdates;

        public WalReplication(WalEdits edits, long size, long entriesConsumed, long numMutations) {
            super(size, edits.getEditsSize(), entriesConsumed);
            this.walEdits = edits;
            this.numUpdates = numMutations;
        }

        @Override
        public int hashCode() {
            return super.hashCode() + Objects.hashCode(this.walEdits) + Objects.hashCode(this.numUpdates);
        }

        @Override
        public boolean equals(Object o) {
            if (o instanceof WalReplication) {
                WalReplication other = (WalReplication)o;
                return super.equals(other) && this.walEdits.equals(other.walEdits) && this.numUpdates == other.numUpdates;
            }
            return false;
        }
    }

    public static class RFileReplication
    extends ReplicationStats {
        public KeyValues keyValues;

        public RFileReplication(KeyValues kvs, long size) {
            super(size, kvs.keyValues.size(), kvs.keyValues.size());
            this.keyValues = kvs;
        }
    }

    public static class ReplicationStats {
        public long sizeInBytes;
        public long sizeInRecords;
        public long entriesConsumed;

        public ReplicationStats(long sizeInBytes, long sizeInRecords, long entriesConsumed) {
            this.sizeInBytes = sizeInBytes;
            this.sizeInRecords = sizeInRecords;
            this.entriesConsumed = entriesConsumed;
        }

        public int hashCode() {
            return Objects.hashCode(this.sizeInBytes + this.sizeInRecords + this.entriesConsumed);
        }

        public boolean equals(Object o) {
            if (o != null && ReplicationStats.class.isAssignableFrom(o.getClass())) {
                ReplicationStats other = (ReplicationStats)o;
                return this.sizeInBytes == other.sizeInBytes && this.sizeInRecords == other.sizeInRecords && this.entriesConsumed == other.entriesConsumed;
            }
            return false;
        }
    }

    protected class RFileClientExecReturn
    implements ClientExecReturn<ReplicationStats, ReplicationServicer.Client> {
        private ReplicationTarget target;
        private DataInputStream input;
        private Path p;
        private Replication.Status status;
        private long sizeLimit;
        private String remoteTableId;
        private TCredentials tcreds;

        public RFileClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Replication.Status status, long sizeLimit, String remoteTableId, TCredentials tcreds) {
            this.target = target;
            this.input = input;
            this.p = p;
            this.status = status;
            this.sizeLimit = sizeLimit;
            this.remoteTableId = remoteTableId;
            this.tcreds = tcreds;
        }

        public ReplicationStats execute(ReplicationServicer.Client client) throws Exception {
            RFileReplication kvs = AccumuloReplicaSystem.this.getKeyValues(this.target, this.input, this.p, this.status, this.sizeLimit);
            if (0 < kvs.keyValues.getKeyValuesSize()) {
                long entriesReplicated = client.replicateKeyValues(this.remoteTableId, kvs.keyValues, this.tcreds);
                if (entriesReplicated != (long)kvs.keyValues.getKeyValuesSize()) {
                    log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", (Object)kvs.keyValues.getKeyValuesSize(), (Object)entriesReplicated);
                }
                return kvs;
            }
            return new ReplicationStats(0L, 0L, 0L);
        }
    }

    protected class WalClientExecReturn
    implements ClientExecReturn<ReplicationStats, ReplicationServicer.Client> {
        private ReplicationTarget target;
        private DataInputStream input;
        private Path p;
        private Replication.Status status;
        private long sizeLimit;
        private String remoteTableId;
        private TCredentials tcreds;
        private Set<Integer> tids;

        public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Replication.Status status, long sizeLimit, String remoteTableId, TCredentials tcreds, Set<Integer> tids) {
            this.target = target;
            this.input = input;
            this.p = p;
            this.status = status;
            this.sizeLimit = sizeLimit;
            this.remoteTableId = remoteTableId;
            this.tcreds = tcreds;
            this.tids = tids;
        }

        public ReplicationStats execute(ReplicationServicer.Client client) throws Exception {
            WalReplication edits = AccumuloReplicaSystem.this.getWalEdits(this.target, this.input, this.p, this.status, this.sizeLimit, this.tids);
            log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", new Object[]{Long.MAX_VALUE == edits.entriesConsumed ? "all remaining" : Long.valueOf(edits.entriesConsumed), edits.sizeInBytes, this.p});
            if (0 < edits.walEdits.getEditsSize()) {
                log.debug("Sending {} edits", (Object)edits.walEdits.getEditsSize());
                long entriesReplicated = client.replicateLog(this.remoteTableId, edits.walEdits, this.tcreds);
                if (entriesReplicated != edits.numUpdates) {
                    log.warn("Sent {} WAL entries for replication but {} were reported as replicated", (Object)edits.numUpdates, (Object)entriesReplicated);
                } else {
                    log.debug("Replicated {} edits", (Object)entriesReplicated);
                }
                return edits;
            }
            if (edits.entriesConsumed > 0L) {
                return edits;
            }
            return new ReplicationStats(0L, 0L, 0L);
        }
    }
}

