/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.api;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.MaintenanceTaskBuilder;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
import org.apache.iceberg.flink.maintenance.operator.LockRemover;
import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class TableMaintenance {
    static final String SOURCE_OPERATOR_NAME_PREFIX = "Monitor source for ";
    static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager";
    static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner";
    static final String FILTER_OPERATOR_NAME_PREFIX = "Filter ";
    static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover";

    private TableMaintenance() {
    }

    @Internal
    public static Builder forChangeStream(DataStream<TableChange> changeStream, TableLoader tableLoader, TriggerLockFactory lockFactory) {
        Preconditions.checkNotNull(changeStream, "The change stream should not be null");
        Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
        Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
        return new Builder(null, changeStream, tableLoader, lockFactory);
    }

    public static Builder forTable(StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) {
        Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
        Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
        Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
        return new Builder(env, null, tableLoader, lockFactory);
    }

    @Internal
    public static class PunctuatedWatermarkStrategy
    implements WatermarkStrategy<Trigger> {
        public WatermarkGenerator<Trigger> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Trigger>(){

                public void onEvent(Trigger event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(event.timestamp()));
                }

                public void onPeriodicEmit(WatermarkOutput output) {
                }
            };
        }

        public TimestampAssigner<Trigger> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, unused) -> element.timestamp();
        }
    }

    public static class Builder {
        private final StreamExecutionEnvironment env;
        private final DataStream<TableChange> inputStream;
        private final TableLoader tableLoader;
        private final List<MaintenanceTaskBuilder<?>> taskBuilders;
        private final TriggerLockFactory lockFactory;
        private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
        private String slotSharingGroup = "default";
        private Duration rateLimit = Duration.ofMinutes(1L);
        private Duration lockCheckDelay = Duration.ofSeconds(30L);
        private int parallelism = -1;
        private int maxReadBack = 100;

        private Builder(StreamExecutionEnvironment env, DataStream<TableChange> inputStream, TableLoader tableLoader, TriggerLockFactory lockFactory) {
            this.env = env;
            this.inputStream = inputStream;
            this.tableLoader = tableLoader;
            this.lockFactory = lockFactory;
            this.taskBuilders = Lists.newArrayListWithCapacity(4);
        }

        public Builder uidSuffix(String newUidSuffix) {
            this.uidSuffix = newUidSuffix;
            return this;
        }

        public Builder slotSharingGroup(String newSlotSharingGroup) {
            this.slotSharingGroup = newSlotSharingGroup;
            return this;
        }

        public Builder rateLimit(Duration newRateLimit) {
            Preconditions.checkNotNull(this.rateLimit.toMillis() > 0L, "Rate limit should be greater than 0");
            this.rateLimit = newRateLimit;
            return this;
        }

        public Builder lockCheckDelay(Duration newLockCheckDelay) {
            this.lockCheckDelay = newLockCheckDelay;
            return this;
        }

        public Builder parallelism(int newParallelism) {
            OperatorValidationUtils.validateParallelism((int)newParallelism);
            this.parallelism = newParallelism;
            return this;
        }

        public Builder maxReadBack(int newMaxReadBack) {
            Preconditions.checkArgument(this.inputStream == null, "Can't set maxReadBack when change stream is provided");
            this.maxReadBack = newMaxReadBack;
            return this;
        }

        public Builder add(MaintenanceTaskBuilder<?> task) {
            this.taskBuilders.add(task);
            return this;
        }

        public void append() throws IOException {
            Preconditions.checkArgument(!this.taskBuilders.isEmpty(), "Provide at least one task");
            Preconditions.checkNotNull(this.uidSuffix, "Uid suffix should no be null");
            ArrayList<String> taskNames = Lists.newArrayListWithCapacity(this.taskBuilders.size());
            ArrayList<TriggerEvaluator> evaluators = Lists.newArrayListWithCapacity(this.taskBuilders.size());
            for (int i = 0; i < this.taskBuilders.size(); ++i) {
                taskNames.add(Builder.nameFor(this.taskBuilders.get(i), i));
                evaluators.add(this.taskBuilders.get(i).evaluator());
            }
            try (TableLoader loader = this.tableLoader.clone();){
                loader.open();
                String tableName = loader.loadTable().name();
                SingleOutputStreamOperator triggers = DataStreamUtils.reinterpretAsKeyedStream(this.changeStream(tableName, loader), (KeySelector & Serializable)unused -> true).process((KeyedProcessFunction)new TriggerManager(loader, this.lockFactory, taskNames, evaluators, this.rateLimit.toMillis(), this.lockCheckDelay.toMillis())).name(TableMaintenance.TRIGGER_MANAGER_OPERATOR_NAME).uid(TableMaintenance.TRIGGER_MANAGER_OPERATOR_NAME + this.uidSuffix).slotSharingGroup(this.slotSharingGroup).forceNonParallel().assignTimestampsAndWatermarks((WatermarkStrategy)new PunctuatedWatermarkStrategy()).name(TableMaintenance.WATERMARK_ASSIGNER_OPERATOR_NAME).uid(TableMaintenance.WATERMARK_ASSIGNER_OPERATOR_NAME + this.uidSuffix).slotSharingGroup(this.slotSharingGroup).forceNonParallel();
                DataStream unioned = null;
                for (int i = 0; i < this.taskBuilders.size(); ++i) {
                    int taskIndex = i;
                    SingleOutputStreamOperator filtered = triggers.filter((FilterFunction & Serializable)t2 -> t2.taskId() != null && t2.taskId() == taskIndex).name(TableMaintenance.FILTER_OPERATOR_NAME_PREFIX + taskIndex).forceNonParallel().uid(TableMaintenance.FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" + this.uidSuffix).slotSharingGroup(this.slotSharingGroup);
                    MaintenanceTaskBuilder<?> builder = this.taskBuilders.get(taskIndex);
                    DataStream result = builder.append((DataStream<Trigger>)filtered, taskIndex, (String)taskNames.get(taskIndex), tableName, loader, this.uidSuffix, this.slotSharingGroup, this.parallelism);
                    unioned = unioned == null ? result : unioned.union(new DataStream[]{result});
                }
                unioned.transform(TableMaintenance.LOCK_REMOVER_OPERATOR_NAME, TypeInformation.of(Void.class), (OneInputStreamOperator)new LockRemover(tableName, this.lockFactory, taskNames)).forceNonParallel().uid("lock-remover-" + this.uidSuffix).slotSharingGroup(this.slotSharingGroup);
            }
        }

        private DataStream<TableChange> changeStream(String tableName, TableLoader loader) {
            if (this.inputStream == null) {
                MonitorSource source = new MonitorSource(loader, RateLimiterStrategy.perSecond((double)(1.0 / (double)this.rateLimit.getSeconds())), this.maxReadBack);
                return this.env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), TableMaintenance.SOURCE_OPERATOR_NAME_PREFIX + tableName).uid(TableMaintenance.SOURCE_OPERATOR_NAME_PREFIX + this.uidSuffix).slotSharingGroup(this.slotSharingGroup).forceNonParallel();
            }
            return this.inputStream.global();
        }

        private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int taskIndex) {
            return String.format("%s [%s]", streamBuilder.getClass().getSimpleName(), String.valueOf(taskIndex));
        }
    }
}

