/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.orphan;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.flink.utils.BoundedTwoInputOperator;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.operation.CleanOrphanFilesResult;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableConsumer;

public class FlinkOrphanFilesClean
extends OrphanFilesClean {
    @Nullable
    protected final Integer parallelism;

    public FlinkOrphanFilesClean(FileStoreTable table, long olderThanMillis, SerializableConsumer<Path> fileCleaner, @Nullable Integer parallelism) {
        super(table, olderThanMillis, fileCleaner);
        this.parallelism = parallelism;
    }

    @Nullable
    public DataStream<CleanOrphanFilesResult> doOrphanClean(StreamExecutionEnvironment env) {
        Configuration flinkConf = new Configuration();
        flinkConf.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        flinkConf.set(ExecutionOptions.SORT_INPUTS, (Object)false);
        flinkConf.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, (Object)false);
        if (this.parallelism != null) {
            flinkConf.set(CoreOptions.DEFAULT_PARALLELISM, (Object)this.parallelism);
        }
        flinkConf.setString("execution.batch.adaptive.auto-parallelism.enabled", "false");
        env.configure((ReadableConfig)flinkConf);
        List<String> branches = this.validBranches();
        AtomicLong deletedFilesCountInLocal = new AtomicLong(0L);
        AtomicLong deletedFilesLenInBytesInLocal = new AtomicLong(0L);
        this.cleanSnapshotDir(branches, path -> deletedFilesCountInLocal.incrementAndGet(), deletedFilesLenInBytesInLocal::addAndGet);
        OutputTag<Tuple2<String, String>> manifestOutputTag = new OutputTag<Tuple2<String, String>>("manifest-output"){};
        SingleOutputStreamOperator usedManifestFiles = env.fromCollection(branches).process((ProcessFunction)new ProcessFunction<String, Tuple2<String, String>>(){

            public void processElement(String branch, ProcessFunction.Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                for (Snapshot snapshot : FlinkOrphanFilesClean.this.safelyGetAllSnapshots(branch)) {
                    out.collect((Object)new Tuple2((Object)branch, (Object)snapshot.toJson()));
                }
            }
        }).rebalance().process((ProcessFunction)new ProcessFunction<Tuple2<String, String>, String>((OutputTag)manifestOutputTag){
            final /* synthetic */ OutputTag val$manifestOutputTag;
            {
                this.val$manifestOutputTag = outputTag;
            }

            public void processElement(Tuple2<String, String> branchAndSnapshot, ProcessFunction.Context ctx, Collector<String> out) throws Exception {
                String branch = (String)branchAndSnapshot.f0;
                Snapshot snapshot = Snapshot.fromJson((String)branchAndSnapshot.f1);
                Consumer<String> manifestConsumer = manifest -> {
                    Tuple2 tuple2 = new Tuple2((Object)branch, manifest);
                    ctx.output(this.val$manifestOutputTag, (Object)tuple2);
                };
                FlinkOrphanFilesClean.this.collectWithoutDataFile(branch, snapshot, arg_0 -> out.collect(arg_0), manifestConsumer);
            }
        });
        SingleOutputStreamOperator usedFiles = usedManifestFiles.getSideOutput((OutputTag)manifestOutputTag).keyBy((KeySelector & Serializable)tuple2 -> (String)tuple2.f0 + ":" + (String)tuple2.f1).transform("datafile-reader", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)new BoundedOneInputOperator<Tuple2<String, String>, String>(){
            private final Set<Tuple2<String, String>> manifests = new HashSet<Tuple2<String, String>>();

            public void processElement(StreamRecord<Tuple2<String, String>> element) {
                this.manifests.add((Tuple2<String, String>)element.getValue());
            }

            public void endInput() throws IOException {
                HashMap<String, ManifestFile> branchManifests = new HashMap<String, ManifestFile>();
                for (Tuple2<String, String> tuple2 : this.manifests) {
                    ManifestFile manifestFile = branchManifests.computeIfAbsent((String)tuple2.f0, key -> FlinkOrphanFilesClean.this.table.switchToBranch((String)key).store().manifestFileFactory().create());
                    ((List)FlinkOrphanFilesClean.retryReadingFiles(() -> manifestFile.readWithIOException((String)tuple2.f1), Collections.emptyList())).forEach(f -> {
                        ArrayList<String> files = new ArrayList<String>();
                        files.add(f.fileName());
                        files.addAll(f.file().extraFiles());
                        files.forEach(file -> this.output.collect((Object)new StreamRecord(file)));
                    });
                }
            }
        });
        usedFiles = usedFiles.union(new DataStream[]{usedManifestFiles});
        List fileDirs = this.listPaimonFileDirs().stream().map(Path::toUri).map(Object::toString).collect(Collectors.toList());
        SingleOutputStreamOperator candidates = env.fromCollection(fileDirs).process((ProcessFunction)new ProcessFunction<String, Pair<String, Long>>(){

            public void processElement(String dir, ProcessFunction.Context ctx, Collector<Pair<String, Long>> out) {
                for (FileStatus fileStatus : FlinkOrphanFilesClean.this.tryBestListingDirs(new Path(dir))) {
                    if (!FlinkOrphanFilesClean.this.oldEnough(fileStatus)) continue;
                    out.collect(Pair.of(fileStatus.getPath().toUri().toString(), fileStatus.getLen()));
                }
            }
        });
        SingleOutputStreamOperator deleted = usedFiles.keyBy((KeySelector & Serializable)f -> f).connect((DataStream)candidates.keyBy((KeySelector & Serializable)pathAndSize -> new Path((String)pathAndSize.getKey()).getName())).transform("files_join", TypeInformation.of(CleanOrphanFilesResult.class), (TwoInputStreamOperator)new BoundedTwoInputOperator<String, Pair<String, Long>, CleanOrphanFilesResult>(){
            private boolean buildEnd;
            private long emittedFilesCount;
            private long emittedFilesLen;
            private final Set<String> used = new HashSet<String>();

            public InputSelection nextSelection() {
                return this.buildEnd ? InputSelection.SECOND : InputSelection.FIRST;
            }

            public void endInput(int inputId) {
                switch (inputId) {
                    case 1: {
                        org.apache.flink.util.Preconditions.checkState((!this.buildEnd ? 1 : 0) != 0, (Object)"Should not build ended.");
                        LOG.info("Finish build phase.");
                        this.buildEnd = true;
                        break;
                    }
                    case 2: {
                        org.apache.flink.util.Preconditions.checkState((boolean)this.buildEnd, (Object)"Should build ended.");
                        LOG.info("Finish probe phase.");
                        LOG.info("Clean files count : {}", (Object)this.emittedFilesCount);
                        LOG.info("Clean files size : {}", (Object)this.emittedFilesLen);
                        this.output.collect((Object)new StreamRecord((Object)new CleanOrphanFilesResult(this.emittedFilesCount, this.emittedFilesLen)));
                    }
                }
            }

            public void processElement1(StreamRecord<String> element) {
                this.used.add((String)element.getValue());
            }

            public void processElement2(StreamRecord<Pair<String, Long>> element) {
                org.apache.flink.util.Preconditions.checkState((boolean)this.buildEnd, (Object)"Should build ended.");
                Pair fileInfo = (Pair)element.getValue();
                String value = (String)fileInfo.getLeft();
                Path path = new Path(value);
                if (!this.used.contains(path.getName())) {
                    ++this.emittedFilesCount;
                    this.emittedFilesLen += ((Long)fileInfo.getRight()).longValue();
                    FlinkOrphanFilesClean.this.fileCleaner.accept(path);
                    LOG.info("Dry clean: {}", (Object)path);
                }
            }
        });
        if (deletedFilesCountInLocal.get() != 0L || deletedFilesLenInBytesInLocal.get() != 0L) {
            deleted = deleted.union(new DataStream[]{env.fromElements((Object[])new CleanOrphanFilesResult[]{new CleanOrphanFilesResult(deletedFilesCountInLocal.get(), deletedFilesLenInBytesInLocal.get())})});
        }
        return deleted;
    }

    public static CleanOrphanFilesResult executeDatabaseOrphanFiles(StreamExecutionEnvironment env, Catalog catalog, long olderThanMillis, SerializableConsumer<Path> fileCleaner, @Nullable Integer parallelism, String databaseName, @Nullable String tableName) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
        List<String> tableNames = Collections.singletonList(tableName);
        if (tableName == null || "*".equals(tableName)) {
            tableNames = catalog.listTables(databaseName);
        }
        ArrayList<DataStream<CleanOrphanFilesResult>> orphanFilesCleans = new ArrayList<DataStream<CleanOrphanFilesResult>>(tableNames.size());
        for (String t : tableNames) {
            Identifier identifier = new Identifier(databaseName, t);
            Table table = catalog.getTable(identifier);
            Preconditions.checkArgument(table instanceof FileStoreTable, "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.", table.getClass().getName());
            DataStream<CleanOrphanFilesResult> clean = new FlinkOrphanFilesClean((FileStoreTable)table, olderThanMillis, fileCleaner, parallelism).doOrphanClean(env);
            if (clean == null) continue;
            orphanFilesCleans.add(clean);
        }
        DataStream result = null;
        for (DataStream dataStream : orphanFilesCleans) {
            if (result == null) {
                result = dataStream;
                continue;
            }
            result = result.union(new DataStream[]{dataStream});
        }
        return FlinkOrphanFilesClean.sum(result);
    }

    private static CleanOrphanFilesResult sum(DataStream<CleanOrphanFilesResult> deleted) {
        long deletedFilesCount = 0L;
        long deletedFilesLenInBytes = 0L;
        if (deleted != null) {
            try {
                CloseableIterator iterator = deleted.global().executeAndCollect("OrphanFilesClean");
                while (iterator.hasNext()) {
                    CleanOrphanFilesResult cleanOrphanFilesResult = (CleanOrphanFilesResult)iterator.next();
                    deletedFilesCount += cleanOrphanFilesResult.getDeletedFileCount();
                    deletedFilesLenInBytes += cleanOrphanFilesResult.getDeletedFileTotalLenInBytes();
                }
                iterator.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return new CleanOrphanFilesResult(deletedFilesCount, deletedFilesLenInBytes);
    }
}

