/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.operator;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.flink.maintenance.api.TaskResult;
import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
import org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class LockRemover
extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<TaskResult, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(LockRemover.class);
    private final String tableName;
    private final TriggerLockFactory lockFactory;
    private final List<String> maintenanceTaskNames;
    private transient List<Counter> succeededTaskResultCounters;
    private transient List<Counter> failedTaskResultCounters;
    private transient List<AtomicLong> taskLastRunDurationMs;
    private transient TriggerLockFactory.Lock lock;
    private transient TriggerLockFactory.Lock recoveryLock;
    private transient long lastProcessedTaskStartEpoch = 0L;

    public LockRemover(String tableName, TriggerLockFactory lockFactory, List<String> maintenanceTaskNames) {
        Preconditions.checkNotNull((Object)lockFactory, (Object)"Lock factory should no be null");
        Preconditions.checkArgument((maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty() ? 1 : 0) != 0, (Object)"Invalid maintenance task names: null or empty");
        this.tableName = tableName;
        this.lockFactory = lockFactory;
        this.maintenanceTaskNames = maintenanceTaskNames;
    }

    public void open() throws Exception {
        super.open();
        this.succeededTaskResultCounters = Lists.newArrayListWithExpectedSize((int)this.maintenanceTaskNames.size());
        this.failedTaskResultCounters = Lists.newArrayListWithExpectedSize((int)this.maintenanceTaskNames.size());
        this.taskLastRunDurationMs = Lists.newArrayListWithExpectedSize((int)this.maintenanceTaskNames.size());
        for (int taskIndex = 0; taskIndex < this.maintenanceTaskNames.size(); ++taskIndex) {
            MetricGroup taskMetricGroup = TableMaintenanceMetrics.groupFor((RuntimeContext)this.getRuntimeContext(), this.tableName, this.maintenanceTaskNames.get(taskIndex), taskIndex);
            this.succeededTaskResultCounters.add(taskMetricGroup.counter("succeededTasks"));
            this.failedTaskResultCounters.add(taskMetricGroup.counter("failedTasks"));
            AtomicLong duration = new AtomicLong(0L);
            this.taskLastRunDurationMs.add(duration);
            taskMetricGroup.gauge("lastRunDurationMs", duration::get);
        }
        this.lockFactory.open();
        this.lock = this.lockFactory.createLock();
        this.recoveryLock = this.lockFactory.createRecoveryLock();
    }

    public void processElement(StreamRecord<TaskResult> streamRecord) {
        TaskResult taskResult = (TaskResult)streamRecord.getValue();
        LOG.info("Processing result {} for task {}", (Object)taskResult, (Object)this.maintenanceTaskNames.get(taskResult.taskIndex()));
        long duration = System.currentTimeMillis() - taskResult.startEpoch();
        this.lock.unlock();
        this.lastProcessedTaskStartEpoch = taskResult.startEpoch();
        this.taskLastRunDurationMs.get(taskResult.taskIndex()).set(duration);
        if (taskResult.success()) {
            this.succeededTaskResultCounters.get(taskResult.taskIndex()).inc();
        } else {
            this.failedTaskResultCounters.get(taskResult.taskIndex()).inc();
        }
    }

    public void processWatermark(Watermark mark) {
        if (mark.getTimestamp() > this.lastProcessedTaskStartEpoch) {
            this.lock.unlock();
            this.recoveryLock.unlock();
        }
    }
}

