/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ThrowableCatchingRunnable;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.iceberg.flink.sink.shuffle.AggregatedStatistics;
import org.apache.iceberg.flink.sink.shuffle.AggregatedStatisticsTracker;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S>
implements OperatorCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
    private final String operatorName;
    private final ExecutorService coordinatorExecutor;
    private final OperatorCoordinator.Context operatorCoordinatorContext;
    private final SubtaskGateways subtaskGateways;
    private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
    private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
    private final transient AggregatedStatisticsTracker<D, S> aggregatedStatisticsTracker;
    private volatile AggregatedStatistics<D, S> completedStatistics;
    private volatile boolean started;

    DataStatisticsCoordinator(String operatorName, OperatorCoordinator.Context context, TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
        this.operatorName = operatorName;
        this.coordinatorThreadFactory = new CoordinatorExecutorThreadFactory("DataStatisticsCoordinator-" + operatorName, context.getUserCodeClassloader());
        this.coordinatorExecutor = Executors.newSingleThreadExecutor(this.coordinatorThreadFactory);
        this.operatorCoordinatorContext = context;
        this.subtaskGateways = new SubtaskGateways(operatorName, this.parallelism());
        this.statisticsSerializer = statisticsSerializer;
        this.aggregatedStatisticsTracker = new AggregatedStatisticsTracker<D, S>(operatorName, statisticsSerializer, this.parallelism());
    }

    public void start() throws Exception {
        LOG.info("Starting data statistics coordinator: {}.", (Object)this.operatorName);
        this.started = true;
    }

    public void close() throws Exception {
        this.coordinatorExecutor.shutdown();
        LOG.info("Closed data statistics coordinator: {}.", (Object)this.operatorName);
    }

    @org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting
    void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
        this.ensureStarted();
        if (!this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
            try {
                Callable<Void> guardedCallable = () -> {
                    try {
                        return (Void)callable.call();
                    }
                    catch (Throwable t2) {
                        LOG.error("Uncaught Exception in data statistics coordinator: {} executor", (Object)this.operatorName, (Object)t2);
                        ExceptionUtils.rethrowException((Throwable)t2);
                        return null;
                    }
                };
                this.coordinatorExecutor.submit(guardedCallable).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new FlinkRuntimeException(errorMessage, (Throwable)e);
            }
        }
        try {
            callable.call();
        }
        catch (Throwable t2) {
            LOG.error("Uncaught Exception in data statistics coordinator: {} executor", (Object)this.operatorName, (Object)t2);
            throw new FlinkRuntimeException(errorMessage, t2);
        }
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.coordinatorExecutor.execute((Runnable)new ThrowableCatchingRunnable(throwable -> this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), (Throwable)throwable), runnable));
    }

    private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, String actionString) {
        this.ensureStarted();
        this.runInCoordinatorThread(() -> {
            try {
                action.run();
            }
            catch (Throwable t2) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t2);
                LOG.error("Uncaught exception in the data statistics coordinator: {} while {}. Triggering job failover", new Object[]{this.operatorName, actionString, t2});
                this.operatorCoordinatorContext.failJob(t2);
            }
        });
    }

    private void ensureStarted() {
        Preconditions.checkState((boolean)this.started, (String)"The coordinator of %s has not started yet.", (Object[])new Object[]{this.operatorName});
    }

    private int parallelism() {
        return this.operatorCoordinatorContext.currentParallelism();
    }

    private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, S> event) {
        AggregatedStatistics<D, S> aggregatedStatistics = this.aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event);
        if (aggregatedStatistics != null) {
            this.completedStatistics = aggregatedStatistics;
            this.sendDataStatisticsToSubtasks(this.completedStatistics.checkpointId(), this.completedStatistics.dataStatistics());
        }
    }

    private void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<D, S> globalDataStatistics) {
        this.callInCoordinatorThread(() -> {
            DataStatisticsEvent<D, S> dataStatisticsEvent = DataStatisticsEvent.create(checkpointId, globalDataStatistics, this.statisticsSerializer);
            int parallelism = this.parallelism();
            for (int i = 0; i < parallelism; ++i) {
                this.subtaskGateways.getSubtaskGateway(i).sendEvent(dataStatisticsEvent);
            }
            return null;
        }, String.format("Failed to send operator %s coordinator global data statistics for checkpoint %d", this.operatorName, checkpointId));
    }

    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
        this.runInCoordinatorThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            LOG.debug("Handling event from subtask {} (#{}) of {}: {}", new Object[]{subtask, attemptNumber, this.operatorName, event});
            Preconditions.checkArgument((boolean)(event instanceof DataStatisticsEvent));
            this.handleDataStatisticRequest(subtask, (DataStatisticsEvent)event);
        }), String.format("handling operator event %s from subtask %d (#%d)", event.getClass(), subtask, attemptNumber));
    }

    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
        this.runInCoordinatorThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            LOG.debug("Snapshotting data statistics coordinator {} for checkpoint {}", (Object)this.operatorName, (Object)checkpointId);
            resultFuture.complete(DataStatisticsUtil.serializeAggregatedStatistics(this.completedStatistics, this.statisticsSerializer));
        }), String.format("taking checkpoint %d", checkpointId));
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        Preconditions.checkState((!this.started ? 1 : 0) != 0, (String)"The coordinator %s can only be reset if it was not yet started", (Object[])new Object[]{this.operatorName});
        if (checkpointData == null) {
            LOG.info("Data statistic coordinator {} has nothing to restore from checkpoint {}", (Object)this.operatorName, (Object)checkpointId);
            return;
        }
        LOG.info("Restoring data statistic coordinator {} from checkpoint {}", (Object)this.operatorName, (Object)checkpointId);
        this.completedStatistics = DataStatisticsUtil.deserializeAggregatedStatistics(checkpointData, this.statisticsSerializer);
    }

    public void subtaskReset(int subtask, long checkpointId) {
        this.runInCoordinatorThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            LOG.info("Operator {} subtask {} is reset to checkpoint {}", new Object[]{this.operatorName, subtask, checkpointId});
            Preconditions.checkState((boolean)this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
            this.subtaskGateways.reset(subtask);
        }), String.format("handling subtask %d recovery to checkpoint %d", subtask, checkpointId));
    }

    public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
        this.runInCoordinatorThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            LOG.info("Unregistering gateway after failure for subtask {} (#{}) of data statistic {}", new Object[]{subtask, attemptNumber, this.operatorName});
            Preconditions.checkState((boolean)this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
            this.subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
        }), String.format("handling subtask %d (#%d) failure", subtask, attemptNumber));
    }

    public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) {
        Preconditions.checkArgument((subtask == gateway.getSubtask() ? 1 : 0) != 0);
        Preconditions.checkArgument((attemptNumber == gateway.getExecution().getAttemptNumber() ? 1 : 0) != 0);
        this.runInCoordinatorThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Preconditions.checkState((boolean)this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
            this.subtaskGateways.registerSubtaskGateway(gateway);
        }), String.format("making event gateway to subtask %d (#%d) available", subtask, attemptNumber));
    }

    @org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting
    AggregatedStatistics<D, S> completedStatistics() {
        return this.completedStatistics;
    }

    private static class CoordinatorExecutorThreadFactory
    implements ThreadFactory,
    Thread.UncaughtExceptionHandler {
        private final String coordinatorThreadName;
        private final ClassLoader classLoader;
        private final Thread.UncaughtExceptionHandler errorHandler;
        @javax.annotation.Nullable
        private Thread thread;

        CoordinatorExecutorThreadFactory(String coordinatorThreadName, ClassLoader contextClassLoader) {
            this(coordinatorThreadName, contextClassLoader, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE);
        }

        @VisibleForTesting
        CoordinatorExecutorThreadFactory(String coordinatorThreadName, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler errorHandler) {
            this.coordinatorThreadName = coordinatorThreadName;
            this.classLoader = contextClassLoader;
            this.errorHandler = errorHandler;
        }

        @Override
        public synchronized Thread newThread(@NotNull Runnable runnable) {
            this.thread = new Thread(runnable, this.coordinatorThreadName);
            this.thread.setContextClassLoader(this.classLoader);
            this.thread.setUncaughtExceptionHandler(this);
            return this.thread;
        }

        @Override
        public synchronized void uncaughtException(Thread t2, Throwable e) {
            this.errorHandler.uncaughtException(t2, e);
        }

        boolean isCurrentThreadCoordinatorThread() {
            return Thread.currentThread() == this.thread;
        }
    }

    private static class SubtaskGateways {
        private final String operatorName;
        private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;

        private SubtaskGateways(String operatorName, int parallelism) {
            this.operatorName = operatorName;
            this.gateways = new Map[parallelism];
            for (int i = 0; i < parallelism; ++i) {
                this.gateways[i] = Maps.newHashMap();
            }
        }

        private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway gateway) {
            int attemptNumber;
            int subtaskIndex = gateway.getSubtask();
            Preconditions.checkState((!this.gateways[subtaskIndex].containsKey(attemptNumber = gateway.getExecution().getAttemptNumber()) ? 1 : 0) != 0, (String)"Coordinator of %s already has a subtask gateway for %d (#%d)", (Object[])new Object[]{this.operatorName, subtaskIndex, attemptNumber});
            LOG.debug("Coordinator of {} registers gateway for subtask {} attempt {}", new Object[]{this.operatorName, subtaskIndex, attemptNumber});
            this.gateways[subtaskIndex].put(attemptNumber, gateway);
        }

        private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber) {
            LOG.debug("Coordinator of {} unregisters gateway for subtask {} attempt {}", new Object[]{this.operatorName, subtaskIndex, attemptNumber});
            this.gateways[subtaskIndex].remove(attemptNumber);
        }

        private OperatorCoordinator.SubtaskGateway getSubtaskGateway(int subtaskIndex) {
            Preconditions.checkState((!this.gateways[subtaskIndex].isEmpty() ? 1 : 0) != 0, (String)"Coordinator of %s subtask %d is not ready yet to receive events", (Object[])new Object[]{this.operatorName, subtaskIndex});
            return Iterables.getOnlyElement(this.gateways[subtaskIndex].values());
        }

        private void reset(int subtaskIndex) {
            this.gateways[subtaskIndex].clear();
        }
    }
}

