/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.actions.FileURI;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFiles;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class OrphanFilesDetector
extends KeyedCoProcessFunction<String, String, String, String> {
    private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesDetector.class);
    private transient MapState<String, Boolean> foundInTable;
    private transient ValueState<String> foundInFileSystem;
    private transient ValueState<Boolean> hasUriError;
    private final DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode;
    private final Map<String, String> equalSchemes;
    private final Map<String, String> equalAuthorities;

    public OrphanFilesDetector(DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode, Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
        this.prefixMismatchMode = prefixMismatchMode;
        this.equalSchemes = equalSchemes;
        this.equalAuthorities = equalAuthorities;
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        this.foundInTable = this.getRuntimeContext().getMapState(new MapStateDescriptor("antiJoinFoundInTable", Types.STRING, Types.BOOLEAN));
        this.hasUriError = this.getRuntimeContext().getState(new ValueStateDescriptor("antiJoinUriError", Types.BOOLEAN));
        this.foundInFileSystem = this.getRuntimeContext().getState(new ValueStateDescriptor("antiJoinFoundInFileSystem", Types.STRING));
    }

    public void processElement1(String value, KeyedCoProcessFunction.Context context, Collector<String> collector) throws Exception {
        if (this.shouldSkipElement(value, context)) {
            return;
        }
        if (!this.foundInTable.contains((Object)value)) {
            this.foundInTable.put((Object)value, (Object)true);
            context.timerService().registerEventTimeTimer(context.timestamp().longValue());
        }
    }

    public void processElement2(String value, KeyedCoProcessFunction.Context context, Collector<String> collector) throws Exception {
        if (this.shouldSkipElement(value, context)) {
            return;
        }
        this.foundInFileSystem.update((Object)value);
        context.timerService().registerEventTimeTimer(context.timestamp().longValue());
    }

    public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
        if (Boolean.TRUE.equals(this.hasUriError.value())) {
            this.clearState();
            return;
        }
        ArrayList<FileURI> foundInTablesList = Lists.newArrayList();
        this.foundInTable.keys().forEach(uri -> foundInTablesList.add(new FileURI(new Path(uri).toUri(), this.equalSchemes, this.equalAuthorities)));
        if (this.foundInFileSystem.value() != null) {
            if (foundInTablesList.isEmpty()) {
                FileURI fileURI = new FileURI(new Path((String)this.foundInFileSystem.value()).toUri(), this.equalSchemes, this.equalAuthorities);
                out.collect((Object)fileURI.getUriAsString());
            } else {
                FileURI actual = new FileURI(new Path((String)this.foundInFileSystem.value()).toUri(), this.equalSchemes, this.equalAuthorities);
                if (this.hasMismatch(actual, foundInTablesList)) {
                    if (this.prefixMismatchMode == DeleteOrphanFiles.PrefixMismatchMode.DELETE) {
                        out.collect((Object)((String)this.foundInFileSystem.value()));
                    } else if (this.prefixMismatchMode == DeleteOrphanFiles.PrefixMismatchMode.ERROR) {
                        ValidationException validationException = new ValidationException("Unable to determine whether certain files are orphan. Metadata references files that match listed/provided files except for authority/scheme. Please, inspect the conflicting authorities/schemes and provide which of them are equal by further configuring the action via equalSchemes() and equalAuthorities() methods. Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting authorities/schemes or to 'DELETE' if you are ABSOLUTELY confident that remaining conflicting authorities/schemes are different. It will be impossible to recover deleted files. Conflicting authorities/schemes", new Object[0]);
                        LOG.warn("Unable to determine whether certain files are orphan. Found in filesystem: {} and in table: {}", new Object[]{actual, StringUtils.join(foundInTablesList, (String)","), validationException});
                        ctx.output(DeleteOrphanFiles.ERROR_STREAM, (Object)validationException);
                    }
                }
            }
        }
        this.clearState();
    }

    private boolean hasMismatch(FileURI actual, List<FileURI> foundInTablesList) {
        return foundInTablesList.stream().noneMatch(valid -> valid.schemeMatch(actual) && valid.authorityMatch(actual));
    }

    private boolean shouldSkipElement(String value, KeyedCoProcessFunction.Context context) throws IOException {
        if (Boolean.TRUE.equals(this.hasUriError.value())) {
            return true;
        }
        if ("__INVALID_URI__".equals(context.getCurrentKey())) {
            context.output(DeleteOrphanFiles.ERROR_STREAM, (Object)new RuntimeException("Invalid URI format detected: " + value));
            this.hasUriError.update((Object)true);
            this.foundInTable.clear();
            this.foundInFileSystem.clear();
            return true;
        }
        return false;
    }

    private void clearState() {
        this.hasUriError.clear();
        this.foundInTable.clear();
        this.foundInFileSystem.clear();
    }
}

