/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CleanFunction<T>
extends AbstractRichFunction
implements SinkFunction<T>,
CheckpointedFunction,
CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(CleanFunction.class);
    private final Configuration conf;
    protected HoodieFlinkWriteClient writeClient;
    private NonThrownExecutor executor;
    private volatile boolean isCleaning;

    public CleanFunction(Configuration conf) {
        this.conf = conf;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
            this.writeClient = StreamerUtil.createWriteClient(this.conf, this.getRuntimeContext());
            this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
            if (OptionsResolver.isInsertOverwrite(this.conf)) {
                String instantTime = HoodieActiveTimeline.createNewInstantTime();
                LOG.info(String.format("exec sync clean with instant time %s...", instantTime));
                this.executor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.writeClient.clean(instantTime)), "wait for sync cleaning finish", new Object[0]);
            }
        }
    }

    public void notifyCheckpointComplete(long l) throws Exception {
        if (this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && this.isCleaning) {
            this.executor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
                try {
                    this.writeClient.waitForCleaningFinish();
                }
                finally {
                    this.isCleaning = false;
                }
            }), "wait for cleaning finish", new Object[0]);
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !this.isCleaning) {
            try {
                this.writeClient.startAsyncCleaning();
                this.isCleaning = true;
            }
            catch (Throwable throwable) {
                LOG.warn("Error while start async cleaning", throwable);
            }
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

    public void close() throws Exception {
        if (this.executor != null) {
            this.executor.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
        }
    }
}

