/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.recon.spi.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.server.http.HttpConfig;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.DBUpdates;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class OzoneManagerServiceProviderImpl
implements OzoneManagerServiceProvider {
    private static final Logger LOG = LoggerFactory.getLogger(OzoneManagerServiceProviderImpl.class);
    private URLConnectionFactory connectionFactory;
    private File omSnapshotDBParentDir = null;
    private String omDBSnapshotUrl;
    private OzoneManagerProtocol ozoneManagerClient;
    private final OzoneConfiguration configuration;
    private ScheduledExecutorService scheduler;
    private ReconOMMetadataManager omMetadataManager;
    private ReconTaskController reconTaskController;
    private ReconTaskStatusDao reconTaskStatusDao;
    private ReconUtils reconUtils;
    private OzoneManagerSyncMetrics metrics;
    private long deltaUpdateLimit;
    private int deltaUpdateLoopLimit;
    private AtomicBoolean isSyncDataFromOMRunning;
    private final String threadNamePrefix;
    private ThreadFactory threadFactory;

    @Inject
    public OzoneManagerServiceProviderImpl(OzoneConfiguration configuration, ReconOMMetadataManager omMetadataManager, ReconTaskController reconTaskController, ReconUtils reconUtils, OzoneManagerProtocol ozoneManagerClient) {
        boolean flushParam;
        int connectionTimeout = (int)configuration.getTimeDuration("ozone.recon.om.connection.timeout", configuration.get("recon.om.connection.timeout", "5s"), TimeUnit.MILLISECONDS);
        int connectionRequestTimeout = (int)configuration.getTimeDuration("ozone.recon.om.connection.request.timeout", configuration.get("recon.om.connection.request.timeout", "5s"), TimeUnit.MILLISECONDS);
        this.connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory((int)connectionTimeout, (int)connectionRequestTimeout, (Configuration)configuration);
        String ozoneManagerHttpAddress = configuration.get("ozone.om.http-address");
        String ozoneManagerHttpsAddress = configuration.get("ozone.om.https-address");
        long deltaUpdateLimits = configuration.getLong("recon.om.delta.update.limit", 2000L);
        int deltaUpdateLoopLimits = configuration.getInt("recon.om.delta.update.loop.limit", 10);
        this.omSnapshotDBParentDir = reconUtils.getReconDbDir((ConfigurationSource)configuration, "ozone.recon.om.db.dir");
        HttpConfig.Policy policy = HttpConfig.getHttpPolicy((MutableConfigurationSource)configuration);
        this.omDBSnapshotUrl = "http://" + ozoneManagerHttpAddress + "/dbCheckpoint";
        if (policy.isHttpsEnabled()) {
            this.omDBSnapshotUrl = "https://" + ozoneManagerHttpsAddress + "/dbCheckpoint";
        }
        if (flushParam = configuration.getBoolean("ozone.recon.om.snapshot.task.flush.param", configuration.getBoolean("recon.om.snapshot.task.flush.param", false))) {
            this.omDBSnapshotUrl = this.omDBSnapshotUrl + "?flushBeforeCheckpoint=true";
        }
        this.reconUtils = reconUtils;
        this.omMetadataManager = omMetadataManager;
        this.reconTaskController = reconTaskController;
        this.reconTaskStatusDao = reconTaskController.getReconTaskStatusDao();
        this.ozoneManagerClient = ozoneManagerClient;
        this.configuration = configuration;
        this.metrics = OzoneManagerSyncMetrics.create();
        this.deltaUpdateLimit = deltaUpdateLimits;
        this.deltaUpdateLoopLimit = deltaUpdateLoopLimits;
        this.isSyncDataFromOMRunning = new AtomicBoolean();
        this.threadNamePrefix = reconUtils.getReconNodeDetails(configuration).threadNamePrefix();
        this.threadFactory = new ThreadFactoryBuilder().setNameFormat(this.threadNamePrefix + "SyncOM-%d").build();
    }

    public void registerOMDBTasks() {
        ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(OmSnapshotTaskName.OmDeltaRequest.name(), System.currentTimeMillis(), this.getCurrentOMDBSequenceNumber());
        if (!this.reconTaskStatusDao.existsById(OmSnapshotTaskName.OmDeltaRequest.name())) {
            this.reconTaskStatusDao.insert(reconTaskStatusRecord);
            LOG.info("Registered {} task ", (Object)OmSnapshotTaskName.OmDeltaRequest.name());
        }
        reconTaskStatusRecord = new ReconTaskStatus(OmSnapshotTaskName.OmSnapshotRequest.name(), System.currentTimeMillis(), this.getCurrentOMDBSequenceNumber());
        if (!this.reconTaskStatusDao.existsById(OmSnapshotTaskName.OmSnapshotRequest.name())) {
            this.reconTaskStatusDao.insert(reconTaskStatusRecord);
            LOG.info("Registered {} task ", (Object)OmSnapshotTaskName.OmSnapshotRequest.name());
        }
    }

    @Override
    public OMMetadataManager getOMMetadataManagerInstance() {
        return this.omMetadataManager;
    }

    @Override
    public void start() {
        LOG.info("Starting Ozone Manager Service Provider.");
        this.scheduler = Executors.newScheduledThreadPool(1, this.threadFactory);
        this.registerOMDBTasks();
        try {
            this.omMetadataManager.start(this.configuration);
        }
        catch (IOException ioEx) {
            LOG.error("Error staring Recon OM Metadata Manager.", (Throwable)ioEx);
        }
        this.reconTaskController.start();
        long initialDelay = this.configuration.getTimeDuration("ozone.recon.om.snapshot.task.initial.delay", this.configuration.get("recon.om.snapshot.task.initial.delay", "1m"), TimeUnit.MILLISECONDS);
        this.startSyncDataFromOM(initialDelay);
    }

    private void startSyncDataFromOM(long initialDelay) {
        long interval = this.configuration.getTimeDuration("ozone.recon.om.snapshot.task.interval.delay", this.configuration.get("recon.om.snapshot.task.interval.delay", "10m"), TimeUnit.MILLISECONDS);
        LOG.debug("Started the OM DB sync scheduler.");
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                boolean isSuccess = this.syncDataFromOM();
                if (!isSuccess) {
                    LOG.debug("OM DB sync is already running.");
                }
            }
            catch (Throwable t) {
                LOG.error("Unexpected exception while syncing data from OM.", t);
            }
        }, initialDelay, interval, TimeUnit.MILLISECONDS);
    }

    private void stopSyncDataFromOMThread() {
        this.scheduler.shutdownNow();
        LOG.debug("Shutdown the OM DB sync scheduler.");
    }

    @Override
    public boolean triggerSyncDataFromOMImmediately() {
        if (!this.isSyncDataFromOMRunning.get()) {
            this.stopSyncDataFromOMThread();
            this.scheduler = Executors.newScheduledThreadPool(1, this.threadFactory);
            this.startSyncDataFromOM(0L);
            return true;
        }
        LOG.debug("OM DB sync is already running.");
        return false;
    }

    @Override
    public void stop() throws Exception {
        LOG.info("Stopping Ozone Manager Service Provider.");
        this.reconTaskController.stop();
        this.omMetadataManager.stop();
        this.scheduler.shutdownNow();
        this.metrics.unRegister();
        this.connectionFactory.destroy();
    }

    @VisibleForTesting
    public String getOzoneManagerSnapshotUrl() throws IOException {
        if (!this.configuration.getBoolean("ozone.om.ratis.enable", false)) {
            return this.omDBSnapshotUrl;
        }
        String omLeaderUrl = this.omDBSnapshotUrl;
        List serviceList = this.ozoneManagerClient.getServiceList();
        HttpConfig.Policy policy = HttpConfig.getHttpPolicy((MutableConfigurationSource)this.configuration);
        if (!serviceList.isEmpty()) {
            for (ServiceInfo info : serviceList) {
                if (!info.getNodeType().equals((Object)HddsProtos.NodeType.OM) || !info.getOmRoleInfo().hasServerRole() || !info.getOmRoleInfo().getServerRole().equals(RaftProtos.RaftPeerRole.LEADER.name())) continue;
                omLeaderUrl = (policy.isHttpsEnabled() ? "https://" + info.getServiceAddress(OzoneManagerProtocolProtos.ServicePort.Type.HTTPS) : "http://" + info.getServiceAddress(OzoneManagerProtocolProtos.ServicePort.Type.HTTP)) + "/dbCheckpoint";
            }
        }
        return omLeaderUrl;
    }

    private boolean isOmSpnegoEnabled() {
        return this.configuration.get("ozone.om.http.auth.type", "simple").equals("kerberos");
    }

    @VisibleForTesting
    DBCheckpoint getOzoneManagerDBSnapshot() {
        String snapshotFileName = "om.snapshot.db_" + System.currentTimeMillis();
        File targetFile = new File(this.omSnapshotDBParentDir, snapshotFileName + ".tar");
        try {
            SecurityUtil.doAsLoginUser(() -> {
                try (InputStream inputStream = this.reconUtils.makeHttpCall(this.connectionFactory, this.getOzoneManagerSnapshotUrl(), this.isOmSpnegoEnabled()).getInputStream();){
                    FileUtils.copyInputStreamToFile((InputStream)inputStream, (File)targetFile);
                }
                return null;
            });
            Path untarredDbDir = Paths.get(this.omSnapshotDBParentDir.getAbsolutePath(), snapshotFileName);
            this.reconUtils.untarCheckpointFile(targetFile, untarredDbDir);
            FileUtils.deleteQuietly((File)targetFile);
            return new RocksDBCheckpoint(untarredDbDir);
        }
        catch (IOException e) {
            LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", (Throwable)e);
            return null;
        }
    }

    @VisibleForTesting
    boolean updateReconOmDBWithNewSnapshot() throws IOException {
        long startTime = Time.monotonicNow();
        DBCheckpoint dbSnapshot = this.getOzoneManagerDBSnapshot();
        this.metrics.updateSnapshotRequestLatency(Time.monotonicNow() - startTime);
        if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
            LOG.info("Got new checkpoint from OM : " + dbSnapshot.getCheckpointLocation());
            try {
                this.omMetadataManager.updateOmDB(dbSnapshot.getCheckpointLocation().toFile());
                return true;
            }
            catch (IOException e) {
                LOG.error("Unable to refresh Recon OM DB Snapshot. ", (Throwable)e);
            }
        } else {
            LOG.error("Null snapshot location got from OM.");
        }
        return false;
    }

    @VisibleForTesting
    void getAndApplyDeltaUpdatesFromOM(long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler) throws IOException, RocksDBException {
        int loopCount;
        LOG.info("OriginalFromSequenceNumber : {} ", (Object)fromSequenceNumber);
        long deltaUpdateCnt = Long.MAX_VALUE;
        long inLoopStartSequenceNumber = fromSequenceNumber;
        for (loopCount = 0; loopCount < this.deltaUpdateLoopLimit && deltaUpdateCnt >= this.deltaUpdateLimit; ++loopCount) {
            if (!this.innerGetAndApplyDeltaUpdatesFromOM(inLoopStartSequenceNumber, omdbUpdatesHandler)) {
                LOG.error("Retrieve OM DB delta update failed for sequence number : {}, so falling back to full snapshot.", (Object)inLoopStartSequenceNumber);
                throw new RocksDBException("Unable to get delta updates since sequenceNumber - " + inLoopStartSequenceNumber);
            }
            long inLoopLatestSequenceNumber = this.getCurrentOMDBSequenceNumber();
            deltaUpdateCnt = inLoopLatestSequenceNumber - inLoopStartSequenceNumber;
            inLoopStartSequenceNumber = inLoopLatestSequenceNumber;
        }
        LOG.info("Delta updates received from OM : {} loops, {} records", (Object)loopCount, (Object)(this.getCurrentOMDBSequenceNumber() - fromSequenceNumber));
    }

    @VisibleForTesting
    boolean innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler) throws IOException, RocksDBException {
        OzoneManagerProtocolProtos.DBUpdatesRequest dbUpdatesRequest = OzoneManagerProtocolProtos.DBUpdatesRequest.newBuilder().setSequenceNumber(fromSequenceNumber).setLimitCount(this.deltaUpdateLimit).build();
        DBUpdates dbUpdates = this.ozoneManagerClient.getDBUpdates(dbUpdatesRequest);
        int numUpdates = 0;
        long latestSequenceNumberOfOM = -1L;
        if (null != dbUpdates && dbUpdates.getCurrentSequenceNumber() != -1L) {
            latestSequenceNumberOfOM = dbUpdates.getLatestSequenceNumber();
            RDBStore rocksDBStore = (RDBStore)this.omMetadataManager.getStore();
            RocksDatabase rocksDB = rocksDBStore.getDb();
            numUpdates = dbUpdates.getData().size();
            if (numUpdates > 0) {
                this.metrics.incrNumUpdatesInDeltaTotal(numUpdates);
            }
            for (byte[] data : dbUpdates.getData()) {
                ManagedWriteBatch writeBatch = new ManagedWriteBatch(data);
                Throwable throwable = null;
                try {
                    writeBatch.iterate((WriteBatch.Handler)omdbUpdatesHandler);
                    RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(writeBatch);
                    Throwable throwable2 = null;
                    try {
                        ManagedWriteOptions wOpts = new ManagedWriteOptions();
                        Throwable throwable3 = null;
                        try {
                            rdbBatchOperation.commit(rocksDB, wOpts);
                        }
                        catch (Throwable throwable4) {
                            throwable3 = throwable4;
                            throw throwable4;
                        }
                        finally {
                            if (wOpts == null) continue;
                            if (throwable3 != null) {
                                try {
                                    wOpts.close();
                                }
                                catch (Throwable throwable5) {
                                    throwable3.addSuppressed(throwable5);
                                }
                                continue;
                            }
                            wOpts.close();
                        }
                    }
                    catch (Throwable throwable6) {
                        throwable2 = throwable6;
                        throw throwable6;
                    }
                    finally {
                        if (rdbBatchOperation == null) continue;
                        if (throwable2 != null) {
                            try {
                                rdbBatchOperation.close();
                            }
                            catch (Throwable throwable7) {
                                throwable2.addSuppressed(throwable7);
                            }
                            continue;
                        }
                        rdbBatchOperation.close();
                    }
                }
                catch (Throwable throwable8) {
                    throwable = throwable8;
                    throw throwable8;
                }
                finally {
                    if (writeBatch == null) continue;
                    if (throwable != null) {
                        try {
                            writeBatch.close();
                        }
                        catch (Throwable throwable9) {
                            throwable.addSuppressed(throwable9);
                        }
                        continue;
                    }
                    writeBatch.close();
                }
            }
        }
        long lag = latestSequenceNumberOfOM == -1L ? 0L : latestSequenceNumberOfOM - this.getCurrentOMDBSequenceNumber();
        this.metrics.setSequenceNumberLag(lag);
        LOG.info("Number of updates received from OM : {}, SequenceNumber diff: {}, SequenceNumber Lag from OM {}, isDBUpdateSuccess: {}", new Object[]{numUpdates, this.getCurrentOMDBSequenceNumber() - fromSequenceNumber, lag, null != dbUpdates && dbUpdates.isDBUpdateSuccess()});
        return null != dbUpdates && dbUpdates.isDBUpdateSuccess();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @VisibleForTesting
    public boolean syncDataFromOM() {
        if (this.isSyncDataFromOMRunning.compareAndSet(false, true)) {
            try {
                LOG.info("Syncing data from Ozone Manager.");
                long currentSequenceNumber = this.getCurrentOMDBSequenceNumber();
                LOG.debug("Seq number of Recon's OM DB : {}", (Object)currentSequenceNumber);
                boolean fullSnapshot = false;
                if (currentSequenceNumber <= 0L) {
                    fullSnapshot = true;
                } else {
                    try (OMDBUpdatesHandler omdbUpdatesHandler = new OMDBUpdatesHandler(this.omMetadataManager);){
                        LOG.info("Obtaining delta updates from Ozone Manager");
                        this.getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, omdbUpdatesHandler);
                        ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(OmSnapshotTaskName.OmDeltaRequest.name(), System.currentTimeMillis(), this.getCurrentOMDBSequenceNumber());
                        this.reconTaskStatusDao.update(reconTaskStatusRecord);
                        this.reconTaskController.consumeOMEvents(new OMUpdateEventBatch(omdbUpdatesHandler.getEvents()), this.omMetadataManager);
                    }
                    catch (InterruptedException intEx) {
                        Thread.currentThread().interrupt();
                    }
                    catch (Exception e) {
                        this.metrics.incrNumDeltaRequestsFailed();
                        LOG.warn("Unable to get and apply delta updates from OM.", (Object)e.getMessage());
                        fullSnapshot = true;
                    }
                }
                if (!fullSnapshot) return true;
                try {
                    this.metrics.incrNumSnapshotRequests();
                    LOG.info("Obtaining full snapshot from Ozone Manager");
                    boolean success = this.updateReconOmDBWithNewSnapshot();
                    if (!success) return true;
                    ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(OmSnapshotTaskName.OmSnapshotRequest.name(), System.currentTimeMillis(), this.getCurrentOMDBSequenceNumber());
                    this.reconTaskStatusDao.update(reconTaskStatusRecord);
                    LOG.info("Calling reprocess on Recon tasks.");
                    this.reconTaskController.reInitializeTasks(this.omMetadataManager);
                    return true;
                }
                catch (InterruptedException intEx) {
                    Thread.currentThread().interrupt();
                    return true;
                }
                catch (Exception e) {
                    this.metrics.incrNumSnapshotRequestsFailed();
                    LOG.error("Unable to update Recon's metadata with new OM DB. ", (Throwable)e);
                }
                return true;
            }
            finally {
                this.isSyncDataFromOMRunning.set(false);
            }
        }
        LOG.debug("OM DB sync is already running.");
        return false;
    }

    private long getCurrentOMDBSequenceNumber() {
        return this.omMetadataManager.getLastSequenceNumberFromDB();
    }

    public OzoneManagerSyncMetrics getMetrics() {
        return this.metrics;
    }

    public static Logger getLogger() {
        return LOG;
    }

    public static enum OmSnapshotTaskName {
        OmSnapshotRequest,
        OmDeltaRequest;

    }
}

