/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.meta;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hudi.common.table.timeline.dto.InstantStateDTO;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.meta.CkpMessage;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
import org.apache.hudi.util.HttpRequestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimelineBasedCkpMetadata
extends CkpMetadata {
    private static final Logger LOG = LoggerFactory.getLogger(TimelineBasedCkpMetadata.class);
    private final HttpRequestClient httpRequestClient;

    public TimelineBasedCkpMetadata(FileSystem fs, String basePath, String uniqueId, HoodieWriteConfig writeConfig) {
        super(fs, basePath, uniqueId);
        this.httpRequestClient = new HttpRequestClient(writeConfig);
        LOG.info("Timeline server based CkpMetadata enabled");
    }

    @Override
    public void startInstant(String instant) {
        super.startInstant(instant);
        this.sendRefreshRequest();
    }

    @Override
    public void commitInstant(String instant) {
        super.commitInstant(instant);
        this.sendRefreshRequest();
    }

    @Override
    public void abortInstant(String instant) {
        super.abortInstant(instant);
        this.sendRefreshRequest();
    }

    @Override
    protected Stream<CkpMessage> fetchCkpMessages(Path ckpMetaPath) throws IOException {
        Stream<CkpMessage> ckpMessageStream;
        try {
            List<InstantStateDTO> instantStateDTOList = this.httpRequestClient.executeRequestWithRetry(InstantStateHandler.ALL_INSTANT_STATE_URL, this.getRequestParams(ckpMetaPath.toString()), new TypeReference<List<InstantStateDTO>>(){}, HttpRequestClient.RequestMethod.GET);
            ckpMessageStream = instantStateDTOList.stream().map(c -> new CkpMessage(c.getInstant(), c.getState()));
        }
        catch (Exception e) {
            LOG.error("Failed to execute scan ckp metadata, fall back to read from file system...", (Throwable)e);
            ckpMessageStream = super.fetchCkpMessages(ckpMetaPath);
        }
        return ckpMessageStream;
    }

    private Map<String, String> getRequestParams(String dirPath) {
        return Collections.singletonMap("instantstatedirpath", dirPath);
    }

    private void sendRefreshRequest() {
        try {
            boolean success = this.httpRequestClient.executeRequestWithRetry(InstantStateHandler.REFRESH_INSTANT_STATE, this.getRequestParams(this.path.toString()), new TypeReference<Boolean>(){}, HttpRequestClient.RequestMethod.POST);
            if (!success) {
                LOG.warn("Timeline server responses with failed refresh");
            }
        }
        catch (Exception e) {
            LOG.error("Failed to execute refresh", (Throwable)e);
        }
    }
}

