/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.qjournal.server;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
import org.apache.hadoop.hdfs.qjournal.server.JNStorage;
import org.apache.hadoop.hdfs.qjournal.server.Journal;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class JournalNodeSyncer {
    public static final Logger LOG = LoggerFactory.getLogger(JournalNodeSyncer.class);
    private final JournalNode jn;
    private final Journal journal;
    private final String jid;
    private final QJournalProtocolProtos.JournalIdProto jidProto;
    private final JNStorage jnStorage;
    private final Configuration conf;
    private volatile Daemon syncJournalDaemon;
    private volatile boolean shouldSync = true;
    private List<JournalNodeProxy> otherJNProxies = Lists.newArrayList();
    private int numOtherJNs;
    private int journalNodeIndexForSync = 0;
    private final long journalSyncInterval;
    private final int logSegmentTransferTimeout;
    private final DataTransferThrottler throttler;

    JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid, Configuration conf) {
        this.jn = jouranlNode;
        this.journal = journal;
        this.jid = jid;
        this.jidProto = this.convertJournalId(this.jid);
        this.jnStorage = journal.getStorage();
        this.conf = conf;
        this.journalSyncInterval = conf.getLong("dfs.journalnode.sync.interval", 120000L);
        this.logSegmentTransferTimeout = conf.getInt("dfs.edit.log.transfer.timeout", 30000);
        this.throttler = JournalNodeSyncer.getThrottler(conf);
    }

    void stopSync() {
        this.shouldSync = false;
        File editsSyncDir = this.journal.getStorage().getEditsSyncDir();
        if (editsSyncDir.exists()) {
            FileUtil.fullyDelete((File)editsSyncDir);
        }
        if (this.syncJournalDaemon != null) {
            this.syncJournalDaemon.interrupt();
        }
    }

    public void start() {
        LOG.info("Starting SyncJournal daemon for journal " + this.jid);
        if (this.getOtherJournalNodeProxies()) {
            this.startSyncJournalsDaemon();
        } else {
            LOG.warn("Failed to start SyncJournal daemon for journal " + this.jid);
        }
    }

    private boolean createEditsSyncDir() {
        File editsSyncDir = this.journal.getStorage().getEditsSyncDir();
        if (editsSyncDir.exists()) {
            LOG.info(editsSyncDir + " directory already exists.");
            return true;
        }
        return editsSyncDir.mkdir();
    }

    private boolean getOtherJournalNodeProxies() {
        List<InetSocketAddress> otherJournalNodes = this.getOtherJournalNodeAddrs();
        if (otherJournalNodes == null || otherJournalNodes.isEmpty()) {
            LOG.warn("Other JournalNode addresses not available. Journal Syncing cannot be done");
            return false;
        }
        for (InetSocketAddress addr : otherJournalNodes) {
            try {
                this.otherJNProxies.add(new JournalNodeProxy(addr));
            }
            catch (IOException e) {
                LOG.warn("Could not add proxy for Journal at addresss " + addr, (Throwable)e);
            }
        }
        if (this.otherJNProxies.isEmpty()) {
            LOG.error("Cannot sync as there is no other JN available for sync.");
            return false;
        }
        this.numOtherJNs = this.otherJNProxies.size();
        return true;
    }

    private void startSyncJournalsDaemon() {
        this.syncJournalDaemon = new Daemon(() -> {
            while (!this.journal.isFormatted()) {
                try {
                    Thread.sleep(this.journalSyncInterval);
                }
                catch (InterruptedException e) {
                    LOG.error("JournalNodeSyncer daemon received Runtime exception.", (Throwable)e);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            if (!this.createEditsSyncDir()) {
                LOG.error("Failed to create directory for downloading log segments: %s. Stopping Journal Node Sync.", (Object)this.journal.getStorage().getEditsSyncDir());
                return;
            }
            while (this.shouldSync) {
                try {
                    if (!this.journal.isFormatted()) {
                        LOG.warn("Journal cannot sync. Not formatted.");
                    } else {
                        this.syncJournals();
                    }
                }
                catch (Throwable t) {
                    if (!this.shouldSync) {
                        if (t instanceof InterruptedException) {
                            LOG.info("Stopping JournalNode Sync.");
                            Thread.currentThread().interrupt();
                            return;
                        }
                        LOG.warn("JournalNodeSyncer received an exception while shutting down.", t);
                        break;
                    }
                    if (t instanceof InterruptedException) {
                        LOG.warn("JournalNodeSyncer interrupted", t);
                        Thread.currentThread().interrupt();
                        return;
                    }
                    LOG.error("JournalNodeSyncer daemon received Runtime exception. ", t);
                }
                try {
                    Thread.sleep(this.journalSyncInterval);
                }
                catch (InterruptedException e) {
                    if (!this.shouldSync) {
                        LOG.info("Stopping JournalNode Sync.");
                    } else {
                        LOG.warn("JournalNodeSyncer interrupted", (Throwable)e);
                    }
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        });
        this.syncJournalDaemon.start();
    }

    private void syncJournals() {
        this.syncWithJournalAtIndex(this.journalNodeIndexForSync);
        this.journalNodeIndexForSync = (this.journalNodeIndexForSync + 1) % this.numOtherJNs;
    }

    private void syncWithJournalAtIndex(int index) {
        QJournalProtocolProtos.GetEditLogManifestResponseProto editLogManifest;
        List<RemoteEditLog> thisJournalEditLogs;
        LOG.info("Syncing Journal " + this.jn.getBoundIpcAddress().getAddress() + ":" + this.jn.getBoundIpcAddress().getPort() + " with " + this.otherJNProxies.get(index) + ", journal id: " + this.jid);
        QJournalProtocolPB jnProxy = this.otherJNProxies.get(index).jnProxy;
        if (jnProxy == null) {
            LOG.error("JournalNode Proxy not found.");
            return;
        }
        try {
            thisJournalEditLogs = this.journal.getEditLogManifest(0L, false).getLogs();
        }
        catch (IOException e) {
            LOG.error("Exception in getting local edit log manifest", (Throwable)e);
            return;
        }
        try {
            editLogManifest = jnProxy.getEditLogManifest(null, QJournalProtocolProtos.GetEditLogManifestRequestProto.newBuilder().setJid(this.jidProto).setSinceTxId(0L).setInProgressOk(false).build());
        }
        catch (ServiceException e) {
            LOG.error("Could not sync with Journal at " + this.otherJNProxies.get(this.journalNodeIndexForSync), (Throwable)e);
            return;
        }
        this.getMissingLogSegments(thisJournalEditLogs, editLogManifest, this.otherJNProxies.get(index));
    }

    private List<InetSocketAddress> getOtherJournalNodeAddrs() {
        URI uri = null;
        try {
            String uriStr = this.conf.get("dfs.namenode.shared.edits.dir");
            if (uriStr == null || uriStr.isEmpty()) {
                LOG.warn("Could not construct Shared Edits Uri");
                return null;
            }
            uri = new URI(uriStr);
            return Util.getLoggerAddresses(uri, Sets.newHashSet((Object[])new InetSocketAddress[]{this.jn.getBoundIpcAddress()}));
        }
        catch (URISyntaxException e) {
            LOG.error("The conf property dfs.namenode.shared.edits.dir not set properly.");
        }
        catch (IOException e) {
            LOG.error("Could not parse JournalNode addresses: " + uri);
        }
        return null;
    }

    private QJournalProtocolProtos.JournalIdProto convertJournalId(String journalId) {
        return QJournalProtocolProtos.JournalIdProto.newBuilder().setIdentifier(journalId).build();
    }

    private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs, QJournalProtocolProtos.GetEditLogManifestResponseProto response, JournalNodeProxy remoteJNproxy) {
        List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(response.getManifest()).getLogs();
        if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
            LOG.warn("Journal at " + remoteJNproxy.jnAddr + " has no edit logs");
            return;
        }
        List<RemoteEditLog> missingLogs = this.getMissingLogList(thisJournalEditLogs, otherJournalEditLogs);
        if (!missingLogs.isEmpty()) {
            NamespaceInfo nsInfo = this.jnStorage.getNamespaceInfo();
            for (RemoteEditLog missingLog : missingLogs) {
                URL url = null;
                boolean success = false;
                try {
                    if (remoteJNproxy.httpServerUrl == null) {
                        remoteJNproxy.httpServerUrl = this.getHttpServerURI("http", remoteJNproxy.jnAddr.getHostName(), response.getHttpPort());
                    }
                    String urlPath = GetJournalEditServlet.buildPath(this.jid, missingLog.getStartTxId(), nsInfo, false);
                    url = new URL(remoteJNproxy.httpServerUrl, urlPath);
                    success = this.downloadMissingLogSegment(url, missingLog);
                }
                catch (MalformedURLException e) {
                    LOG.error("MalformedURL when download missing log segment", (Throwable)e);
                }
                catch (Exception e) {
                    LOG.error("Exception in downloading missing log segment from url " + url, (Throwable)e);
                }
                if (success) continue;
                LOG.error("Aborting current sync attempt.");
                break;
            }
        }
    }

    private List<RemoteEditLog> getMissingLogList(List<RemoteEditLog> thisJournalEditLogs, List<RemoteEditLog> otherJournalEditLogs) {
        if (thisJournalEditLogs.isEmpty()) {
            return otherJournalEditLogs;
        }
        ArrayList missingEditLogs = Lists.newArrayList();
        int localJnIndex = 0;
        int remoteJnIndex = 0;
        int localJnNumLogs = thisJournalEditLogs.size();
        int remoteJnNumLogs = otherJournalEditLogs.size();
        while (localJnIndex < localJnNumLogs && remoteJnIndex < remoteJnNumLogs) {
            long remoteJNstartTxId;
            long localJNstartTxId = thisJournalEditLogs.get(localJnIndex).getStartTxId();
            if (localJNstartTxId == (remoteJNstartTxId = otherJournalEditLogs.get(remoteJnIndex).getStartTxId())) {
                ++localJnIndex;
                ++remoteJnIndex;
                continue;
            }
            if (localJNstartTxId > remoteJNstartTxId) {
                missingEditLogs.add(otherJournalEditLogs.get(remoteJnIndex));
                ++remoteJnIndex;
                continue;
            }
            ++localJnIndex;
        }
        if (remoteJnIndex < remoteJnNumLogs) {
            while (remoteJnIndex < remoteJnNumLogs) {
                missingEditLogs.add(otherJournalEditLogs.get(remoteJnIndex));
                ++remoteJnIndex;
            }
        }
        return missingEditLogs;
    }

    private URL getHttpServerURI(String scheme, String hostname, int port) throws MalformedURLException {
        return new URL(scheme, hostname, port, "");
    }

    private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) throws IOException {
        LOG.info("Downloading missing Edit Log from " + url + " to " + this.jnStorage.getRoot());
        assert (log.getStartTxId() > 0L && log.getEndTxId() > 0L) : "bad log: " + log;
        File finalEditsFile = this.jnStorage.getFinalizedEditsFile(log.getStartTxId(), log.getEndTxId());
        if (finalEditsFile.exists() && FileUtil.canRead((File)finalEditsFile)) {
            LOG.info("Skipping download of remote edit log " + log + " since it's already stored locally at " + finalEditsFile);
            return true;
        }
        File tmpEditsFile = this.jnStorage.getTemporaryEditsFile(log.getStartTxId(), log.getEndTxId());
        try {
            Util.doGetUrl(url, (List<File>)ImmutableList.of((Object)tmpEditsFile), this.jnStorage, false, this.logSegmentTransferTimeout, this.throttler);
        }
        catch (IOException e) {
            LOG.error("Download of Edit Log file for Syncing failed. Deleting temp file: " + tmpEditsFile);
            if (!tmpEditsFile.delete()) {
                LOG.warn("Deleting " + tmpEditsFile + " has failed");
            }
            return false;
        }
        LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " + tmpEditsFile.length() + " bytes.");
        boolean moveSuccess = this.journal.moveTmpSegmentToCurrent(tmpEditsFile, finalEditsFile, log.getEndTxId());
        if (!moveSuccess) {
            LOG.debug("Move to current directory unsuccessful. Deleting temporary file: " + tmpEditsFile);
            if (!tmpEditsFile.delete()) {
                LOG.warn("Deleting " + tmpEditsFile + " has failed");
            }
            return false;
        }
        return true;
    }

    private static DataTransferThrottler getThrottler(Configuration conf) {
        long transferBandwidth = conf.getLong("dfs.edit.log.transfer.bandwidthPerSec", 0L);
        DataTransferThrottler throttler = null;
        if (transferBandwidth > 0L) {
            throttler = new DataTransferThrottler(transferBandwidth);
        }
        return throttler;
    }

    private class JournalNodeProxy {
        private final InetSocketAddress jnAddr;
        private final QJournalProtocolPB jnProxy;
        private URL httpServerUrl;

        JournalNodeProxy(InetSocketAddress jnAddr) throws IOException {
            this.jnAddr = jnAddr;
            this.jnProxy = (QJournalProtocolPB)RPC.getProxy(QJournalProtocolPB.class, (long)RPC.getProtocolVersion(QJournalProtocolPB.class), (InetSocketAddress)jnAddr, (Configuration)JournalNodeSyncer.this.conf);
        }

        public String toString() {
            return this.jnAddr.toString();
        }
    }
}

