/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.heartbeat;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieHeartbeatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class HoodieHeartbeatClient
implements AutoCloseable,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieHeartbeatClient.class);
    private final transient FileSystem fs;
    private final String basePath;
    private final String heartbeatFolderPath;
    private final Long heartbeatIntervalInMs;
    private final Long maxAllowableHeartbeatIntervalInMs;
    private final Map<String, Heartbeat> instantToHeartbeatMap;

    public HoodieHeartbeatClient(FileSystem fs, String basePath, Long heartbeatIntervalInMs, Integer numTolerableHeartbeatMisses) {
        ValidationUtils.checkArgument(heartbeatIntervalInMs >= 1000L, "Cannot set heartbeat lower than 1 second");
        this.fs = fs;
        this.basePath = basePath;
        this.heartbeatFolderPath = HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
        this.heartbeatIntervalInMs = heartbeatIntervalInMs;
        this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs * (long)numTolerableHeartbeatMisses.intValue();
        this.instantToHeartbeatMap = new ConcurrentHashMap<String, Heartbeat>();
    }

    public void start(String instantTime) {
        LOG.info("Received request to start heartbeat for instant time " + instantTime);
        Heartbeat heartbeat = this.instantToHeartbeatMap.get(instantTime);
        ValidationUtils.checkArgument(heartbeat == null || heartbeat.isHeartbeatStopped() == false, "Cannot restart a stopped heartbeat for " + instantTime);
        if (heartbeat == null || !heartbeat.isHeartbeatStarted().booleanValue()) {
            Heartbeat newHeartbeat = new Heartbeat();
            newHeartbeat.setHeartbeatStarted(true);
            this.instantToHeartbeatMap.put(instantTime, newHeartbeat);
            this.updateHeartbeat(instantTime);
            newHeartbeat.getTimer().scheduleAtFixedRate((TimerTask)new HeartbeatTask(instantTime), this.heartbeatIntervalInMs, (long)this.heartbeatIntervalInMs);
        }
    }

    public void stop(String instantTime) throws HoodieException {
        Heartbeat heartbeat = this.instantToHeartbeatMap.get(instantTime);
        if (this.isHeartbeatStarted(heartbeat)) {
            this.stopHeartbeatTimer(heartbeat);
            HeartbeatUtils.deleteHeartbeatFile(this.fs, this.basePath, instantTime);
            LOG.info("Deleted heartbeat file for instant " + instantTime);
        }
    }

    public void stopHeartbeatTimers() throws HoodieException {
        this.instantToHeartbeatMap.values().stream().filter(this::isHeartbeatStarted).forEach(this::stopHeartbeatTimer);
    }

    private boolean isHeartbeatStarted(Heartbeat heartbeat) {
        return heartbeat != null && heartbeat.isHeartbeatStarted() != false && heartbeat.isHeartbeatStopped() == false;
    }

    private void stopHeartbeatTimer(Heartbeat heartbeat) {
        LOG.info("Stopping heartbeat for instant " + heartbeat.getInstantTime());
        heartbeat.getTimer().cancel();
        heartbeat.setHeartbeatStopped(true);
        LOG.info("Stopped heartbeat for instant " + heartbeat.getInstantTime());
    }

    public static Boolean heartbeatExists(FileSystem fs, String basePath, String instantTime) throws IOException {
        Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + "/" + instantTime);
        return fs.exists(heartbeatFilePath);
    }

    public boolean isHeartbeatExpired(String instantTime) throws IOException {
        Long currentTime = System.currentTimeMillis();
        Heartbeat lastHeartbeatForWriter = this.instantToHeartbeatMap.get(instantTime);
        if (lastHeartbeatForWriter == null) {
            LOG.info("Heartbeat not found in internal map, falling back to reading from DFS");
            long lastHeartbeatForWriterTime = HoodieHeartbeatUtils.getLastHeartbeatTime(this.fs, this.basePath, instantTime);
            lastHeartbeatForWriter = new Heartbeat();
            lastHeartbeatForWriter.setLastHeartbeatTime(lastHeartbeatForWriterTime);
            lastHeartbeatForWriter.setInstantTime(instantTime);
            lastHeartbeatForWriter.getTimer().cancel();
        }
        if (currentTime - lastHeartbeatForWriter.getLastHeartbeatTime() > this.maxAllowableHeartbeatIntervalInMs) {
            LOG.warn("Heartbeat expired, currentTime = " + currentTime + ", last heartbeat = " + lastHeartbeatForWriter + ", heartbeat interval = " + this.heartbeatIntervalInMs);
            return true;
        }
        return false;
    }

    private void updateHeartbeat(String instantTime) throws HoodieHeartbeatException {
        try {
            Long newHeartbeatTime = System.currentTimeMillis();
            FSDataOutputStream outputStream = this.fs.create(new Path(this.heartbeatFolderPath + "/" + instantTime), true);
            outputStream.close();
            Heartbeat heartbeat = this.instantToHeartbeatMap.get(instantTime);
            if (heartbeat.getLastHeartbeatTime() != null && this.isHeartbeatExpired(instantTime)) {
                LOG.error("Aborting, missed generating heartbeat within allowable interval " + this.maxAllowableHeartbeatIntervalInMs);
                Thread.currentThread().interrupt();
            }
            heartbeat.setInstantTime(instantTime);
            heartbeat.setLastHeartbeatTime(newHeartbeatTime);
            heartbeat.setNumHeartbeats(heartbeat.getNumHeartbeats() + 1);
        }
        catch (IOException io) {
            Boolean isHeartbeatStopped = this.instantToHeartbeatMap.get(instantTime).isHeartbeatStopped;
            if (isHeartbeatStopped.booleanValue()) {
                LOG.warn(String.format("update heart beat failed, because the instant time %s was stopped ? : %s", instantTime, isHeartbeatStopped));
                return;
            }
            throw new HoodieHeartbeatException("Unable to generate heartbeat for instant " + instantTime, io);
        }
    }

    public String getHeartbeatFolderPath() {
        return this.heartbeatFolderPath;
    }

    public Heartbeat getHeartbeat(String instantTime) {
        return this.instantToHeartbeatMap.get(instantTime);
    }

    @Override
    public void close() {
        this.stopHeartbeatTimers();
        this.instantToHeartbeatMap.clear();
    }

    class HeartbeatTask
    extends TimerTask {
        private final String instantTime;

        HeartbeatTask(String instantTime) {
            this.instantTime = instantTime;
        }

        @Override
        public void run() {
            HoodieHeartbeatClient.this.updateHeartbeat(this.instantTime);
        }
    }

    static class Heartbeat {
        private String instantTime;
        private Boolean isHeartbeatStarted = false;
        private Boolean isHeartbeatStopped = false;
        private Long lastHeartbeatTime;
        private Integer numHeartbeats = 0;
        private Timer timer = new Timer(true);

        Heartbeat() {
        }

        public String getInstantTime() {
            return this.instantTime;
        }

        public void setInstantTime(String instantTime) {
            this.instantTime = instantTime;
        }

        public Boolean isHeartbeatStarted() {
            return this.isHeartbeatStarted;
        }

        public void setHeartbeatStarted(Boolean heartbeatStarted) {
            this.isHeartbeatStarted = heartbeatStarted;
        }

        public Boolean isHeartbeatStopped() {
            return this.isHeartbeatStopped;
        }

        public void setHeartbeatStopped(Boolean heartbeatStopped) {
            this.isHeartbeatStopped = heartbeatStopped;
        }

        public Long getLastHeartbeatTime() {
            return this.lastHeartbeatTime;
        }

        public void setLastHeartbeatTime(Long lastHeartbeatTime) {
            this.lastHeartbeatTime = lastHeartbeatTime;
        }

        public Integer getNumHeartbeats() {
            return this.numHeartbeats;
        }

        public void setNumHeartbeats(Integer numHeartbeats) {
            this.numHeartbeats = numHeartbeats;
        }

        public Timer getTimer() {
            return this.timer;
        }

        public void setTimer(Timer timer) {
            this.timer = timer;
        }

        public String toString() {
            return "Heartbeat{instantTime='" + this.instantTime + '\'' + ", isHeartbeatStarted=" + this.isHeartbeatStarted + ", isHeartbeatStopped=" + this.isHeartbeatStopped + ", lastHeartbeatTime=" + this.lastHeartbeatTime + ", numHeartbeats=" + this.numHeartbeats + ", timer=" + this.timer + '}';
        }
    }
}

