/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.shaded.org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
@InterfaceAudience.Private
public class SplitLogWorker
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(SplitLogWorker.class);
    Thread worker;
    private final SplitLogWorkerCoordination coordination;
    private final RegionServerServices server;

    public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) {
        this.server = server;
        this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
        this.coordination.init(server, conf, splitTaskExecutor, this);
    }

    public SplitLogWorker(Configuration conf, RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
        this(server, conf, server, (String f, CancelableProgressable p) -> SplitLogWorker.splitLog(f, p, conf, server, sequenceIdChecker, factory));
    }

    private static boolean processSyncReplicationWAL(String name, Configuration conf, RegionServerServices server, FileSystem fs, Path walDir) throws IOException {
        Path walFile = new Path(walDir, name);
        String filename = walFile.getName();
        Optional<String> optSyncPeerId = AbstractWALProvider.getSyncReplicationPeerIdFromWALName(filename);
        if (!optSyncPeerId.isPresent()) {
            return true;
        }
        String peerId = optSyncPeerId.get();
        ReplicationPeerImpl peer = server.getReplicationSourceService().getReplicationPeers().getPeer(peerId);
        if (peer == null || !peer.getPeerConfig().isSyncReplication()) {
            return true;
        }
        Pair<SyncReplicationState, SyncReplicationState> stateAndNewState = peer.getSyncReplicationStateAndNewState();
        if (stateAndNewState.getFirst().equals((Object)SyncReplicationState.ACTIVE) && stateAndNewState.getSecond().equals((Object)SyncReplicationState.NONE)) {
            String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
            Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
            Path tmpRemoteWAL = new Path(remoteWALDirForPeer, filename + ".tmp");
            FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
            try (FSDataInputStream in = fs.open(walFile);
                 FSDataOutputStream out = remoteFs.createNonRecursive(tmpRemoteWAL, true, CommonFSUtils.getDefaultBufferSize(remoteFs), remoteFs.getDefaultReplication(tmpRemoteWAL), remoteFs.getDefaultBlockSize(tmpRemoteWAL), null);){
                IOUtils.copy((InputStream)in, (OutputStream)out);
            }
            Path toCommitRemoteWAL = new Path(remoteWALDirForPeer, filename + ".ren");
            FSUtils.renameFile(remoteFs, tmpRemoteWAL, toCommitRemoteWAL);
            FSUtils.renameFile(remoteFs, toCommitRemoteWAL, new Path(remoteWALDirForPeer, filename));
        } else if (stateAndNewState.getFirst().equals((Object)SyncReplicationState.ACTIVE) && stateAndNewState.getSecond().equals((Object)SyncReplicationState.STANDBY) || stateAndNewState.getFirst().equals((Object)SyncReplicationState.STANDBY)) {
            String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
            Path remoteSnapshotDirForPeer = ReplicationUtils.getPeerSnapshotWALDir(remoteWALDir, peerId);
            FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
            if (remoteFs.exists(new Path(remoteSnapshotDirForPeer, filename))) {
                LOG.warn("Giveup splitting {} since it has been replayed in the remote cluster and the content will be replicated back", (Object)filename);
                return false;
            }
        }
        return true;
    }

    static TaskExecutor.Status splitLog(String filename, CancelableProgressable p, Configuration conf, RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
        FileSystem fs;
        Path walDir;
        try {
            walDir = CommonFSUtils.getWALRootDir(conf);
            fs = walDir.getFileSystem(conf);
        }
        catch (IOException e) {
            LOG.warn("Resigning, could not find root dir or fs", (Throwable)e);
            return TaskExecutor.Status.RESIGNED;
        }
        try {
            if (!SplitLogWorker.processSyncReplicationWAL(filename, conf, server, fs, walDir)) {
                return TaskExecutor.Status.DONE;
            }
        }
        catch (IOException e) {
            LOG.warn("failed to process sync replication wal {}", (Object)filename, (Object)e);
            return TaskExecutor.Status.RESIGNED;
        }
        try {
            SplitLogWorkerCoordination splitLogWorkerCoordination;
            SplitLogWorkerCoordination splitLogWorkerCoordination2 = splitLogWorkerCoordination = server.getCoordinatedStateManager() == null ? null : server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
            if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf, p, sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
                return TaskExecutor.Status.PREEMPTED;
            }
        }
        catch (InterruptedIOException iioe) {
            LOG.warn("Resigning, interrupted splitting WAL {}", (Object)filename, (Object)iioe);
            return TaskExecutor.Status.RESIGNED;
        }
        catch (IOException e) {
            if (e instanceof FileNotFoundException) {
                LOG.warn("Done, WAL {} does not exist anymore", (Object)filename, (Object)e);
                return TaskExecutor.Status.DONE;
            }
            Throwable cause = e.getCause();
            if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException || cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
                LOG.warn("Resigning, can't connect to target regionserver splitting WAL {}", (Object)filename, (Object)e);
                return TaskExecutor.Status.RESIGNED;
            }
            if (cause instanceof InterruptedException) {
                LOG.warn("Resigning, interrupted splitting WAL {}", (Object)filename, (Object)e);
                return TaskExecutor.Status.RESIGNED;
            }
            LOG.warn("Error splitting WAL {}", (Object)filename, (Object)e);
            return TaskExecutor.Status.ERR;
        }
        LOG.debug("Done splitting WAL {}", (Object)filename);
        return TaskExecutor.Status.DONE;
    }

    @Override
    public void run() {
        try {
            LOG.info("SplitLogWorker " + this.server.getServerName() + " starting");
            this.coordination.registerListener();
            boolean res = false;
            while (!res && !this.coordination.isStop()) {
                res = this.coordination.isReady();
            }
            if (!this.coordination.isStop()) {
                this.coordination.taskLoop();
            }
        }
        catch (Throwable t) {
            if (ExceptionUtil.isInterrupt(t)) {
                LOG.info("SplitLogWorker interrupted. Exiting. " + (this.coordination.isStop() ? "" : " (ERROR: exitWorker is not set, exiting anyway)"));
            } else {
                LOG.error("unexpected error ", t);
            }
        }
        finally {
            this.coordination.removeListener();
            LOG.info("SplitLogWorker " + this.server.getServerName() + " exiting");
        }
    }

    public void stopTask() {
        LOG.info("Sending interrupt to stop the worker thread");
        this.worker.interrupt();
    }

    public void start() {
        this.worker = new Thread(null, this, "SplitLogWorker-" + this.server.getServerName().toShortString());
        this.worker.start();
    }

    public void stop() {
        this.coordination.stopProcessingTasks();
        this.stopTask();
    }

    public int getTaskReadySeq() {
        return this.coordination.getTaskReadySeq();
    }

    @FunctionalInterface
    public static interface TaskExecutor {
        public Status exec(String var1, CancelableProgressable var2);

        public static enum Status {
            DONE,
            ERR,
            RESIGNED,
            PREEMPTED;

        }
    }
}

