/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.timeline;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.LSMTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieAvroParquetReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieArchivedTimeline
extends HoodieDefaultTimeline {
    public static final String INSTANT_TIME_ARCHIVED_META_FIELD = "instantTime";
    public static final String COMPLETION_TIME_ARCHIVED_META_FIELD = "completionTime";
    private static final String ACTION_ARCHIVED_META_FIELD = "action";
    private static final String METADATA_ARCHIVED_META_FIELD = "metadata";
    private static final String PLAN_ARCHIVED_META_FIELD = "plan";
    private HoodieTableMetaClient metaClient;
    private final Map<String, byte[]> readCommits = new ConcurrentHashMap<String, byte[]>();
    private static final Logger LOG = LoggerFactory.getLogger(HoodieArchivedTimeline.class);

    public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
        this.metaClient = metaClient;
        this.setInstants(this.loadInstants());
        this.details = this::getInstantDetails;
    }

    public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String startTs) {
        this.metaClient = metaClient;
        this.setInstants(this.loadInstants(new StartTsFilter(startTs), LoadMode.METADATA));
        this.details = this::getInstantDetails;
    }

    public HoodieArchivedTimeline() {
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
    }

    public void loadInstantDetailsInMemory(String startTs, String endTs) {
        this.loadInstants(startTs, endTs);
    }

    public void loadCompletedInstantDetailsInMemory() {
        this.loadInstants(null, LoadMode.METADATA);
    }

    public void loadCompactionDetailsInMemory(String compactionInstantTime) {
        this.loadCompactionDetailsInMemory(compactionInstantTime, compactionInstantTime);
    }

    public void loadCompactionDetailsInMemory(String startTs, String endTs) {
        this.loadInstants(new TimeRangeFilter(startTs, endTs), LoadMode.PLAN, record -> record.get(ACTION_ARCHIVED_META_FIELD).toString().equals("commit") && record.get(PLAN_ARCHIVED_META_FIELD) != null);
    }

    public void clearInstantDetailsFromMemory(String instantTime) {
        this.readCommits.remove(instantTime);
    }

    public void clearInstantDetailsFromMemory(String startTs, String endTs) {
        this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant -> this.readCommits.remove(instant.getTimestamp()));
    }

    @Override
    public Option<byte[]> getInstantDetails(HoodieInstant instant) {
        return Option.ofNullable(this.readCommits.get(instant.getTimestamp()));
    }

    public HoodieArchivedTimeline reload() {
        return new HoodieArchivedTimeline(this.metaClient);
    }

    private HoodieInstant readCommit(String instantTime, GenericRecord record, Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer) {
        String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
        String completionTime = record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
        instantDetailsConsumer.ifPresent(consumer -> consumer.accept(instantTime, record));
        return new HoodieInstant(HoodieInstant.State.COMPLETED, action, instantTime, completionTime);
    }

    @Nullable
    private BiConsumer<String, GenericRecord> getInstantDetailsFunc(LoadMode loadMode) {
        switch (loadMode) {
            case METADATA: {
                return (instant, record) -> {
                    ByteBuffer commitMeta = (ByteBuffer)record.get(METADATA_ARCHIVED_META_FIELD);
                    if (commitMeta != null) {
                        this.readCommits.put((String)instant, commitMeta.array());
                    }
                };
            }
            case PLAN: {
                return (instant, record) -> {
                    ByteBuffer plan = (ByteBuffer)record.get(PLAN_ARCHIVED_META_FIELD);
                    if (plan != null) {
                        this.readCommits.put((String)instant, plan.array());
                    }
                };
            }
        }
        return null;
    }

    private List<HoodieInstant> loadInstants() {
        return this.loadInstants(null, LoadMode.SLIM);
    }

    private List<HoodieInstant> loadInstants(String startTs, String endTs) {
        return this.loadInstants(new TimeRangeFilter(startTs, endTs), LoadMode.METADATA);
    }

    private List<HoodieInstant> loadInstants(TimeRangeFilter filter, LoadMode loadMode) {
        return this.loadInstants(filter, loadMode, r -> true);
    }

    private List<HoodieInstant> loadInstants(@Nullable TimeRangeFilter filter, LoadMode loadMode, Function<GenericRecord, Boolean> commitsFilter) {
        ConcurrentHashMap instantsInRange = new ConcurrentHashMap();
        Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer = Option.ofNullable(this.getInstantDetailsFunc(loadMode));
        HoodieArchivedTimeline.loadInstants(this.metaClient, filter, loadMode, commitsFilter, (instantTime, avroRecord) -> instantsInRange.putIfAbsent(instantTime, this.readCommit((String)instantTime, (GenericRecord)avroRecord, instantDetailsConsumer)));
        ArrayList<HoodieInstant> result = new ArrayList<HoodieInstant>(instantsInRange.values());
        Collections.sort(result);
        return result;
    }

    public static void loadInstants(HoodieTableMetaClient metaClient, @Nullable TimeRangeFilter filter, LoadMode loadMode, Function<GenericRecord, Boolean> commitsFilter, BiConsumer<String, GenericRecord> recordConsumer) {
        try {
            List<String> fileNames = LSMTimeline.latestSnapshotManifest(metaClient).getFileNames();
            Schema readSchema = LSMTimeline.getReadSchema(loadMode);
            ((Stream)fileNames.stream().filter((? super T fileName) -> filter == null || LSMTimeline.isFileInRange(filter, fileName)).parallel()).forEach(fileName -> {
                try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader)HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(metaClient.getHadoopConf(), new Path(metaClient.getArchivePath(), fileName));
                     ClosableIterator<IndexedRecord> iterator = reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), readSchema);){
                    while (iterator.hasNext()) {
                        GenericRecord record = (GenericRecord)iterator.next();
                        String instantTime = record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
                        if (filter != null && !filter.isInRange(instantTime) || !((Boolean)commitsFilter.apply(record)).booleanValue()) continue;
                        recordConsumer.accept(instantTime, record);
                    }
                }
                catch (IOException ioException) {
                    throw new HoodieIOException("Error open file reader for path: " + new Path(metaClient.getArchivePath(), fileName));
                }
            });
        }
        catch (IOException e) {
            throw new HoodieIOException("Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);
        }
    }

    @Override
    public HoodieDefaultTimeline getWriteTimeline() {
        Set<String> validActions = CollectionUtils.createSet("commit", "deltacommit", "compaction", "logcompaction", "replacecommit");
        return new HoodieDefaultTimeline(this.getInstantsAsStream().filter((? super T i) -> this.readCommits.containsKey(i.getTimestamp())).filter((? super T s) -> validActions.contains(s.getAction())), this.details);
    }

    public static class StartTsFilter
    extends TimeRangeFilter {
        public StartTsFilter(String startTs) {
            super(startTs, null);
        }

        @Override
        public boolean isInRange(String instantTime) {
            return HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, this.startTs);
        }
    }

    public static class ClosedOpenTimeRangeFilter
    extends TimeRangeFilter {
        public ClosedOpenTimeRangeFilter(String startTs, String endTs) {
            super(startTs, endTs);
        }

        @Override
        public boolean isInRange(String instantTime) {
            return HoodieTimeline.isInClosedOpenRange(instantTime, this.startTs, this.endTs);
        }
    }

    public static class TimeRangeFilter {
        protected final String startTs;
        protected final String endTs;

        public TimeRangeFilter(String startTs, String endTs) {
            this.startTs = startTs;
            this.endTs = endTs;
        }

        public boolean isInRange(String instantTime) {
            return HoodieTimeline.isInRange(instantTime, this.startTs, this.endTs);
        }
    }

    public static enum LoadMode {
        SLIM,
        METADATA,
        PLAN;

    }
}

