/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
    private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiver.class);
    private final Path archiveFilePath;
    private final HoodieWriteConfig config;
    private HoodieLogFormat.Writer writer;
    private final int maxInstantsToKeep;
    private final int minInstantsToKeep;
    private final HoodieTable<T, I, K, O> table;
    private final HoodieTableMetaClient metaClient;
    private final TransactionManager txnManager;

    public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
        this.config = config;
        this.table = table;
        this.metaClient = table.getMetaClient();
        this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath((String)this.metaClient.getArchivePath());
        this.maxInstantsToKeep = config.getMaxCommitsToKeep();
        this.minInstantsToKeep = config.getMinCommitsToKeep();
        this.txnManager = new TransactionManager(config, (FileSystem)table.getMetaClient().getFs());
    }

    private HoodieLogFormat.Writer openWriter() {
        try {
            if (this.writer == null) {
                return HoodieLogFormat.newWriterBuilder().onParentPath(this.archiveFilePath.getParent()).withFileId(this.archiveFilePath.getName()).withFileExtension(".archive").withFs((FileSystem)this.metaClient.getFs()).overBaseCommit("").build();
            }
            return this.writer;
        }
        catch (IOException e) {
            throw new HoodieException("Unable to initialize HoodieLogFormat writer", (Throwable)e);
        }
    }

    public HoodieLogFormat.Writer reOpenWriter() {
        try {
            if (this.writer != null) {
                this.writer.close();
                this.writer = null;
            }
            this.writer = this.openWriter();
            return this.writer;
        }
        catch (IOException e) {
            throw new HoodieException("Unable to initialize HoodieLogFormat writer", (Throwable)e);
        }
    }

    private void close() {
        try {
            if (this.writer != null) {
                this.writer.close();
            }
        }
        catch (IOException e) {
            throw new HoodieException("Unable to close HoodieLogFormat writer", (Throwable)e);
        }
    }

    public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
        return this.archiveIfRequired(context, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
        try {
            if (acquireLock) {
                this.txnManager.beginTransaction((Option<HoodieInstant>)Option.empty(), (Option<HoodieInstant>)Option.empty());
            }
            List<HoodieInstant> instantsToArchive = this.getInstantsToArchive().collect(Collectors.toList());
            this.verifyLastMergeArchiveFilesIfNecessary(context);
            boolean success = true;
            if (!instantsToArchive.isEmpty()) {
                this.writer = this.openWriter();
                LOG.info((Object)("Archiving instants " + instantsToArchive));
                this.archive(context, instantsToArchive);
                LOG.info((Object)("Deleting archived instants " + instantsToArchive));
                success = this.deleteArchivedInstants(instantsToArchive, context);
            } else {
                LOG.info((Object)"No Instants to archive");
            }
            if (this.shouldMergeSmallArchiveFies()) {
                this.mergeArchiveFilesIfNecessary(context);
            }
            boolean bl = success;
            return bl;
        }
        finally {
            this.close();
            if (acquireLock) {
                this.txnManager.endTransaction((Option<HoodieInstant>)Option.empty());
            }
        }
    }

    public boolean shouldMergeSmallArchiveFies() {
        return this.config.getArchiveMergeEnable() && !StorageSchemes.isAppendSupported((String)this.metaClient.getFs().getScheme());
    }

    private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
        Path planPath = new Path(this.metaClient.getArchivePath(), "mergeArchivePlan");
        this.reOpenWriter();
        FileStatus[] fsStatuses = this.metaClient.getFs().globStatus(new Path(this.metaClient.getArchivePath() + "/.commits_.archive*"));
        Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator());
        int archiveMergeFilesBatchSize = this.config.getArchiveMergeFilesBatchSize();
        long smallFileLimitBytes = this.config.getArchiveMergeSmallFileLimitBytes();
        List<FileStatus> mergeCandidate = this.getMergeCandidates(smallFileLimitBytes, fsStatuses);
        if (mergeCandidate.size() >= archiveMergeFilesBatchSize) {
            List<String> candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList());
            String logFileName = this.computeLogFileName();
            this.buildArchiveMergePlan(candidateFiles, planPath, logFileName);
            this.mergeArchiveFiles(mergeCandidate);
            this.deleteFilesParallelize(this.metaClient, candidateFiles, context, true);
            LOG.info((Object)"Success to delete replaced small archive files.");
            this.metaClient.getFs().delete(planPath, false);
            LOG.info((Object)"Success to merge small archive files.");
        }
    }

    private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) {
        int index;
        for (index = 0; index < fsStatuses.length && fsStatuses[index].getLen() <= smallFileLimitBytes; ++index) {
        }
        return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
    }

    private String computeLogFileName() throws IOException {
        String logWriteToken = this.writer.getLogFile().getLogWriteToken();
        HoodieLogFile hoodieLogFile = this.writer.getLogFile().rollOver((FileSystem)this.metaClient.getFs(), logWriteToken);
        return hoodieLogFile.getFileName();
    }

    private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
        if (this.shouldMergeSmallArchiveFies()) {
            Path planPath = new Path(this.metaClient.getArchivePath(), "mergeArchivePlan");
            HoodieWrapperFileSystem fs = this.metaClient.getFs();
            if (fs.exists(planPath)) {
                HoodieMergeArchiveFilePlan plan = null;
                try {
                    plan = (HoodieMergeArchiveFilePlan)TimelineMetadataUtils.deserializeAvroMetadata((byte[])((byte[])FileIOUtils.readDataFromPath((FileSystem)fs, (Path)planPath).get()), HoodieMergeArchiveFilePlan.class);
                }
                catch (IOException e) {
                    LOG.warn((Object)"Parsing merge archive plan failed.", (Throwable)e);
                    fs.delete(planPath);
                    return;
                }
                Path mergedArchiveFile = new Path(this.metaClient.getArchivePath(), plan.getMergedArchiveFileName());
                List<Path> candidates = plan.getCandidate().stream().map(Path::new).collect(Collectors.toList());
                if (this.candidateAllExists(candidates)) {
                    if (fs.exists(mergedArchiveFile)) {
                        fs.delete(mergedArchiveFile, false);
                    }
                } else if (fs.exists(mergedArchiveFile)) {
                    this.deleteFilesParallelize(this.metaClient, plan.getCandidate(), context, true);
                }
                fs.delete(planPath);
            }
        }
    }

    private boolean candidateAllExists(List<Path> candidates) throws IOException {
        for (Path archiveFile : candidates) {
            if (this.metaClient.getFs().exists(archiveFile)) continue;
            return false;
        }
        return true;
    }

    public void buildArchiveMergePlan(List<String> compactCandidate, Path planPath, String compactedArchiveFileName) throws IOException {
        LOG.info((Object)"Start to build archive merge plan.");
        HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder().setCandidate(compactCandidate).setMergedArchiveFileName(compactedArchiveFileName).build();
        Option content = TimelineMetadataUtils.serializeAvroMetadata((SpecificRecordBase)plan, HoodieMergeArchiveFilePlan.class);
        FileIOUtils.createFileInPath((FileSystem)this.metaClient.getFs(), (Path)planPath, (Option)content);
        LOG.info((Object)"Success to build archive merge plan");
    }

    public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOException {
        LOG.info((Object)"Starting to merge small archive files.");
        Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
        try {
            ArrayList<IndexedRecord> records = new ArrayList<IndexedRecord>();
            for (FileStatus fs : compactCandidate) {
                HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((FileSystem)this.metaClient.getFs(), (HoodieLogFile)new HoodieLogFile(fs.getPath()), (Schema)HoodieArchivedMetaEntry.getClassSchema());
                Throwable throwable = null;
                try {
                    while (reader.hasNext()) {
                        HoodieAvroDataBlock blk = (HoodieAvroDataBlock)reader.next();
                        blk.getRecordIterator().forEachRemaining(records::add);
                        if (records.size() < this.config.getCommitArchivalBatchSize()) continue;
                        this.writeToFile(wrapperSchema, records);
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (reader == null) continue;
                    if (throwable != null) {
                        try {
                            reader.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    reader.close();
                }
            }
            this.writeToFile(wrapperSchema, records);
        }
        catch (Exception e) {
            throw new HoodieCommitException("Failed to merge small archive files", e);
        }
        finally {
            this.writer.close();
        }
        LOG.info((Object)"Success to merge small archive files.");
    }

    private Map<String, Boolean> deleteFilesParallelize(HoodieTableMetaClient metaClient, List<String> paths, HoodieEngineContext context, boolean ignoreFailed) {
        return FSUtils.parallelizeFilesProcess((HoodieEngineContext)context, (FileSystem)metaClient.getFs(), (int)this.config.getArchiveDeleteParallelism(), (FSUtils.SerializableFunction & Serializable)pairOfSubPathAndConf -> {
            Path file = new Path((String)pairOfSubPathAndConf.getKey());
            try {
                HoodieWrapperFileSystem fs = metaClient.getFs();
                if (fs.exists(file)) {
                    return fs.delete(file, false);
                }
                return true;
            }
            catch (IOException e) {
                if (!ignoreFailed) {
                    throw new HoodieIOException("Failed to delete : " + file, e);
                }
                LOG.warn((Object)("Ignore failed deleting : " + file));
                return true;
            }
        }, paths);
    }

    private Stream<HoodieInstant> getCleanInstantsToArchive() {
        HoodieTimeline cleanAndRollbackTimeline = this.table.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"clean", "rollback"})).filterCompletedInstants();
        return cleanAndRollbackTimeline.getInstants().collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream().map(hoodieInstants -> {
            if (hoodieInstants.size() > this.maxInstantsToKeep) {
                return hoodieInstants.subList(0, hoodieInstants.size() - this.minInstantsToKeep);
            }
            return new ArrayList();
        }).flatMap(Collection::stream);
    }

    private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
        HoodieTimeline commitTimeline = this.table.getCompletedCommitsTimeline();
        Option oldestPendingCompactionAndReplaceInstant = this.table.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"compaction", "replacecommit"})).filter(s -> !s.isCompleted()).firstInstant();
        Option oldestInflightCommitInstant = this.table.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"commit", "deltacommit"})).filterInflights().firstInstant();
        Option firstSavepoint = this.table.getCompletedSavepointTimeline().firstInstant();
        Set<String> savepointTimestamps = this.table.getSavepointTimestamps();
        if (!commitTimeline.empty() && commitTimeline.countInstants() > this.maxInstantsToKeep) {
            Option oldestInstantToRetainForCompaction = this.metaClient.getTableType() == HoodieTableType.MERGE_ON_READ && (this.config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_COMMITS || this.config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_AND_TIME) ? CompactionUtils.getOldestInstantToRetainForCompaction((HoodieActiveTimeline)this.table.getActiveTimeline(), (int)this.config.getInlineCompactDeltaCommitMax()) : Option.empty();
            Option oldestInstantToRetainForClustering = ClusteringUtils.getOldestInstantToRetainForClustering((HoodieActiveTimeline)this.table.getActiveTimeline(), (HoodieTableMetaClient)this.table.getMetaClient());
            Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants().filter(s -> {
                if (this.config.shouldArchiveBeyondSavepoint()) {
                    return !savepointTimestamps.contains(s.getTimestamp());
                }
                return !firstSavepoint.isPresent() || !HoodieTimeline.compareTimestamps((String)((HoodieInstant)firstSavepoint.get()).getTimestamp(), (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)s.getTimestamp());
            }).filter(s -> (Boolean)oldestPendingCompactionAndReplaceInstant.map(instant -> HoodieTimeline.compareTimestamps((String)instant.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN, (String)s.getTimestamp())).orElse((Object)true)).filter(s -> {
                if (this.config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) {
                    return (Boolean)oldestInflightCommitInstant.map(instant -> HoodieTimeline.compareTimestamps((String)instant.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN, (String)s.getTimestamp())).orElse((Object)true);
                }
                return true;
            }).filter(s -> (Boolean)oldestInstantToRetainForCompaction.map(instantToRetain -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.LESSER_THAN, (String)instantToRetain.getTimestamp())).orElse((Object)true)).filter(s -> (Boolean)oldestInstantToRetainForClustering.map(instantToRetain -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.LESSER_THAN, (String)instantToRetain.getTimestamp())).orElse((Object)true));
            return instantToArchiveStream.limit(commitTimeline.countInstants() - this.minInstantsToKeep);
        }
        return Stream.empty();
    }

    private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
        HoodieTableMetaClient dataMetaClient;
        Option qualifiedEarliestInstant;
        Stream<Object> instants = Stream.concat(this.getCleanInstantsToArchive(), this.getCommitInstantsToArchive());
        if (this.config.isMetastoreEnabled()) {
            return Stream.empty();
        }
        HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(this.metaClient, false);
        Map<Pair, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants().collect(Collectors.groupingBy(i -> Pair.of((Object)i.getTimestamp(), (Object)HoodieInstant.getComparableAction((String)i.getAction()))));
        if (this.config.isMetadataTableEnabled()) {
            try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create((HoodieEngineContext)this.table.getContext(), (HoodieMetadataConfig)this.config.getMetadataConfig(), (String)this.config.getBasePath(), (String)((String)FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()));){
                Option latestCompactionTime = tableMetadata.getLatestCompactionTime();
                if (!latestCompactionTime.isPresent()) {
                    LOG.info((Object)"Not archiving as there is no compaction yet on the metadata table");
                    instants = Stream.empty();
                } else {
                    LOG.info((Object)("Limiting archiving of instants to latest compaction on metadata table at " + (String)latestCompactionTime.get()));
                    instants = instants.filter(instant -> HoodieTimeline.compareTimestamps((String)instant.getTimestamp(), (BiPredicate)HoodieTimeline.LESSER_THAN, (String)((String)latestCompactionTime.get())));
                }
            }
            catch (Exception e) {
                throw new HoodieException("Error limiting instant archival based on metadata table", (Throwable)e);
            }
        }
        if (HoodieTableMetadata.isMetadataTable((String)this.config.getBasePath()) && (qualifiedEarliestInstant = TimelineUtils.getEarliestInstantForMetadataArchival((HoodieActiveTimeline)(dataMetaClient = HoodieTableMetaClient.builder().setBasePath(HoodieTableMetadata.getDatasetBasePath((String)this.config.getBasePath())).setConf(this.metaClient.getHadoopConf()).build()).getActiveTimeline(), (boolean)this.config.shouldArchiveBeyondSavepoint())).isPresent()) {
            instants = instants.filter(instant -> HoodieTimeline.compareTimestamps((String)instant.getTimestamp(), (BiPredicate)HoodieTimeline.LESSER_THAN, (String)((HoodieInstant)qualifiedEarliestInstant.get()).getTimestamp()));
        }
        return instants.flatMap(hoodieInstant -> {
            List instantsToStream = (List)groupByTsAction.get(Pair.of((Object)hoodieInstant.getTimestamp(), (Object)HoodieInstant.getComparableAction((String)hoodieInstant.getAction())));
            if (instantsToStream != null) {
                return instantsToStream.stream();
            }
            return Stream.empty();
        });
    }

    private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
        LOG.info((Object)("Deleting instants " + archivedInstants));
        ArrayList<String> pendingInstantFiles = new ArrayList<String>();
        ArrayList<String> completedInstantFiles = new ArrayList<String>();
        for (HoodieInstant instant : archivedInstants) {
            String filePath = new Path(this.metaClient.getMetaPath(), instant.getFileName()).toString();
            if (instant.isCompleted()) {
                completedInstantFiles.add(filePath);
                continue;
            }
            pendingInstantFiles.add(filePath);
        }
        context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + this.config.getTableName());
        boolean success = this.deleteArchivedInstantFiles(context, true, pendingInstantFiles);
        success &= this.deleteArchivedInstantFiles(context, success, completedInstantFiles);
        Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals("commit") || i.getAction().equals("deltacommit"))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
        LOG.info((Object)("Latest Committed Instant=" + latestCommitted));
        if (latestCommitted.isPresent()) {
            success &= this.deleteAllInstantsOlderOrEqualsInAuxMetaFolder((HoodieInstant)latestCommitted.get());
        }
        return success;
    }

    private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List<String> files) {
        Map<String, Boolean> resultDeleteInstantFiles = this.deleteFilesParallelize(this.metaClient, files, context, false);
        for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
            LOG.info((Object)("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()));
            success &= result.getValue().booleanValue();
        }
        return success;
    }

    private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant thresholdInstant) throws IOException {
        List instants = null;
        boolean success = true;
        try {
            instants = this.metaClient.scanHoodieInstantsFromFileSystem(new Path(this.metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, false);
        }
        catch (FileNotFoundException e) {
            LOG.warn((Object)("Aux path not found. Skipping: " + this.metaClient.getMetaAuxiliaryPath()));
            return true;
        }
        List instantsToBeDeleted = instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps((String)instant1.getTimestamp(), (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)thresholdInstant.getTimestamp())).collect(Collectors.toList());
        for (HoodieInstant deleteInstant : instantsToBeDeleted) {
            LOG.info((Object)("Deleting instant " + deleteInstant + " in auxiliary meta path " + this.metaClient.getMetaAuxiliaryPath()));
            Path metaFile = new Path(this.metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName());
            if (!this.metaClient.getFs().exists(metaFile)) continue;
            success &= this.metaClient.getFs().delete(metaFile, false);
            LOG.info((Object)("Deleted instant file in auxiliary meta path : " + metaFile));
        }
        return success;
    }

    public void archive(HoodieEngineContext context, List<HoodieInstant> instants) throws HoodieCommitException {
        try {
            Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
            LOG.info((Object)("Wrapper schema " + wrapperSchema.toString()));
            ArrayList<IndexedRecord> records = new ArrayList<IndexedRecord>();
            for (HoodieInstant hoodieInstant : instants) {
                try {
                    this.deleteAnyLeftOverMarkers(context, hoodieInstant);
                    if (this.table.getActiveTimeline().isEmpty(hoodieInstant) && hoodieInstant.isCompleted()) {
                        records.add(this.createAvroRecordFromEmptyInstant(hoodieInstant));
                    } else {
                        records.add(this.convertToAvroRecord(hoodieInstant));
                    }
                    if (records.size() < this.config.getCommitArchivalBatchSize()) continue;
                    this.writeToFile(wrapperSchema, records);
                }
                catch (Exception e) {
                    LOG.error((Object)("Failed to archive commits, .commit file: " + hoodieInstant.getFileName()), (Throwable)e);
                    if (!this.config.isFailOnTimelineArchivingEnabled()) continue;
                    throw e;
                }
            }
            this.writeToFile(wrapperSchema, records);
        }
        catch (Exception e) {
            throw new HoodieCommitException("Failed to archive commits", e);
        }
    }

    private void deleteAnyLeftOverMarkers(HoodieEngineContext context, HoodieInstant instant) {
        WriteMarkers writeMarkers = WriteMarkersFactory.get(this.config.getMarkersType(), this.table, instant.getTimestamp());
        if (writeMarkers.deleteMarkerDir(context, this.config.getMarkersDeleteParallelism())) {
            LOG.info((Object)("Cleaned up left over marker directory for instant :" + instant));
        }
    }

    private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
        if (records.size() > 0) {
            HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
            header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
            String keyField = this.table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
            HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header, keyField);
            this.writer.appendBlock((HoodieLogBlock)block);
            records.clear();
        }
    }

    private IndexedRecord convertToAvroRecord(HoodieInstant hoodieInstant) throws IOException {
        return MetadataConversionUtils.createMetaWrapper(hoodieInstant, this.metaClient);
    }

    private IndexedRecord createAvroRecordFromEmptyInstant(HoodieInstant hoodieInstant) throws IOException {
        return MetadataConversionUtils.createMetaWrapperForEmptyInstant(hoodieInstant);
    }
}

