/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.materializedview;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.HadoopIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.materializedview.DerivativeDataSourceMetadata;
import org.apache.druid.indexing.materializedview.MaterializedViewSupervisorReport;
import org.apache.druid.indexing.materializedview.MaterializedViewSupervisorSpec;
import org.apache.druid.indexing.materializedview.MaterializedViewTaskConfig;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;

public class MaterializedViewSupervisor
implements Supervisor {
    private static final EmittingLogger log = new EmittingLogger(MaterializedViewSupervisor.class);
    private static final int DEFAULT_MAX_TASK_COUNT = 1;
    private static final long DEFAULT_MIN_DATA_LAG_MS = TimeUnit.DAYS.toMillis(1L);
    private final MetadataSupervisorManager metadataSupervisorManager;
    private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
    private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
    private final MaterializedViewSupervisorSpec spec;
    private final TaskMaster taskMaster;
    private final TaskStorage taskStorage;
    private final MaterializedViewTaskConfig config;
    private final SupervisorStateManager stateManager;
    private final String dataSource;
    private final String supervisorId;
    private final int maxTaskCount;
    private final long minDataLagMs;
    private final Map<Interval, HadoopIndexTask> runningTasks = new HashMap<Interval, HadoopIndexTask>();
    private final Map<Interval, String> runningVersion = new HashMap<Interval, String>();
    private final Object taskLock = new Object();
    private final Object stateLock = new Object();
    private boolean started = false;
    private ListenableFuture<?> future = null;
    private ListeningScheduledExecutorService exec = null;
    private Set<Interval> missInterval = new HashSet<Interval>();

    public MaterializedViewSupervisor(TaskMaster taskMaster, TaskStorage taskStorage, MetadataSupervisorManager metadataSupervisorManager, SqlSegmentsMetadataManager sqlSegmentsMetadataManager, IndexerMetadataStorageCoordinator metadataStorageCoordinator, MaterializedViewTaskConfig config, MaterializedViewSupervisorSpec spec) {
        this.taskMaster = taskMaster;
        this.taskStorage = taskStorage;
        this.metadataStorageCoordinator = metadataStorageCoordinator;
        this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
        this.metadataSupervisorManager = metadataSupervisorManager;
        this.config = config;
        this.spec = spec;
        this.stateManager = new SupervisorStateManager(spec.getSupervisorStateManagerConfig(), spec.isSuspended());
        this.dataSource = spec.getDataSourceName();
        this.supervisorId = StringUtils.format((String)"MaterializedViewSupervisor-%s", (Object[])new Object[]{this.dataSource});
        this.maxTaskCount = spec.getContext().containsKey("maxTaskCount") ? Integer.parseInt(String.valueOf(spec.getContext().get("maxTaskCount"))) : 1;
        this.minDataLagMs = spec.getContext().containsKey("minDataLagMs") ? Long.parseLong(String.valueOf(spec.getContext().get("minDataLagMs"))) : DEFAULT_MIN_DATA_LAG_MS;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.stateLock;
        synchronized (object) {
            Preconditions.checkState((!this.started ? 1 : 0) != 0, (Object)"already started");
            DataSourceMetadata metadata = this.metadataStorageCoordinator.retrieveDataSourceMetadata(this.dataSource);
            if (null == metadata) {
                this.metadataStorageCoordinator.insertDataSourceMetadata(this.dataSource, (DataSourceMetadata)new DerivativeDataSourceMetadata(this.spec.getBaseDataSource(), this.spec.getDimensions(), this.spec.getMetrics()));
            }
            this.exec = MoreExecutors.listeningDecorator((ScheduledExecutorService)Execs.scheduledSingleThreaded((String)StringUtils.encodeForFormat((String)this.supervisorId)));
            Duration delay = this.config.getTaskCheckDuration().toStandardDuration();
            this.future = this.exec.scheduleWithFixedDelay(this::run, 0L, delay.getMillis(), TimeUnit.MILLISECONDS);
            this.started = true;
        }
    }

    @VisibleForTesting
    public void run() {
        try {
            if (this.spec.isSuspended()) {
                log.info("Materialized view supervisor[%s:%s] is suspended", new Object[]{this.spec.getId(), this.spec.getDataSourceName()});
                return;
            }
            DataSourceMetadata metadata = this.metadataStorageCoordinator.retrieveDataSourceMetadata(this.dataSource);
            if (metadata instanceof DerivativeDataSourceMetadata && this.spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata)metadata).getBaseDataSource()) && this.spec.getDimensions().equals(((DerivativeDataSourceMetadata)metadata).getDimensions()) && this.spec.getMetrics().equals(((DerivativeDataSourceMetadata)metadata).getMetrics())) {
                this.checkSegmentsAndSubmitTasks();
            } else {
                log.error("Failed to start %s. Metadata in database(%s) is different from new dataSource metadata(%s)", new Object[]{this.supervisorId, metadata, this.spec});
            }
        }
        catch (Exception e) {
            this.stateManager.recordThrowableEvent((Throwable)e);
            log.makeAlert((Throwable)e, StringUtils.format((String)"uncaught exception in %s.", (Object[])new Object[]{this.supervisorId}), new Object[0]).emit();
        }
        finally {
            this.stateManager.markRunFinished();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(boolean stopGracefully) {
        Object object = this.stateLock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.started, (Object)"not started");
            this.stateManager.maybeSetState((SupervisorStateManager.State)SupervisorStateManager.BasicState.STOPPING);
            if (stopGracefully) {
                Object object2 = this.taskLock;
                synchronized (object2) {
                    this.future.cancel(false);
                    this.future = null;
                    this.exec.shutdownNow();
                    this.exec = null;
                    this.clearTasks();
                    if (!(this.metadataSupervisorManager.getLatest().get(this.supervisorId) instanceof MaterializedViewSupervisorSpec)) {
                        this.clearSegments();
                    }
                }
            }
            this.future.cancel(true);
            this.future = null;
            this.exec.shutdownNow();
            this.exec = null;
            Object object3 = this.taskLock;
            synchronized (object3) {
                this.clearTasks();
                if (!(this.metadataSupervisorManager.getLatest().get(this.supervisorId) instanceof MaterializedViewSupervisorSpec)) {
                    this.clearSegments();
                }
            }
            this.started = false;
        }
    }

    public SupervisorReport getStatus() {
        return new MaterializedViewSupervisorReport(this.dataSource, DateTimes.nowUtc(), this.spec.isSuspended(), this.spec.getBaseDataSource(), this.spec.getDimensions(), this.spec.getMetrics(), JodaUtils.condenseIntervals(this.missInterval), this.stateManager.isHealthy(), this.stateManager.getSupervisorState().getBasicState(), this.stateManager.getExceptionEvents());
    }

    public SupervisorStateManager.State getState() {
        return this.stateManager.getSupervisorState();
    }

    public Boolean isHealthy() {
        return this.stateManager.isHealthy();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reset(DataSourceMetadata dataSourceMetadata) {
        if (dataSourceMetadata == null) {
            DataSourceMetadata oldMetadata = this.metadataStorageCoordinator.retrieveDataSourceMetadata(this.dataSource);
            if (!(!(oldMetadata instanceof DerivativeDataSourceMetadata) || ((DerivativeDataSourceMetadata)oldMetadata).getBaseDataSource().equals(this.spec.getBaseDataSource()) && ((DerivativeDataSourceMetadata)oldMetadata).getDimensions().equals(this.spec.getDimensions()) && ((DerivativeDataSourceMetadata)oldMetadata).getMetrics().equals(this.spec.getMetrics()))) {
                Object object = this.taskLock;
                synchronized (object) {
                    this.clearTasks();
                    this.clearSegments();
                }
            }
        } else {
            throw new IAE("DerivedDataSourceMetadata is not allowed to reset to a new DerivedDataSourceMetadata", new Object[0]);
        }
        this.commitDataSourceMetadata(new DerivativeDataSourceMetadata(this.spec.getBaseDataSource(), this.spec.getDimensions(), this.spec.getMetrics()));
    }

    public void resetOffsets(DataSourceMetadata resetDataSourceMetadata) {
        throw new UnsupportedOperationException("Reset offsets not supported in MaterializedViewSupervisor");
    }

    public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) {
    }

    public LagStats computeLagStats() {
        throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
    }

    public Set<String> getActiveRealtimeSequencePrefixes() {
        throw new UnsupportedOperationException();
    }

    public int getActiveTaskGroupsCount() {
        throw new UnsupportedOperationException("Get Active Task Groups Count is not supported in MaterializedViewSupervisor");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void checkSegmentsAndSubmitTasks() {
        Object object = this.taskLock;
        synchronized (object) {
            ArrayList<Interval> intervalsToRemove = new ArrayList<Interval>();
            for (Map.Entry<Interval, HadoopIndexTask> entry : this.runningTasks.entrySet()) {
                Optional taskStatus = this.taskStorage.getStatus(entry.getValue().getId());
                if (taskStatus.isPresent() && ((TaskStatus)taskStatus.get()).isRunnable()) continue;
                intervalsToRemove.add(entry.getKey());
            }
            for (Interval interval : intervalsToRemove) {
                this.runningTasks.remove(interval);
                this.runningVersion.remove(interval);
            }
            if (this.runningTasks.size() == this.maxTaskCount) {
                return;
            }
            Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> toBuildIntervalAndBaseSegments = this.checkSegments();
            SortedMap sortedToBuildVersion = (SortedMap)toBuildIntervalAndBaseSegments.lhs;
            Map baseSegments = (Map)toBuildIntervalAndBaseSegments.rhs;
            this.missInterval = sortedToBuildVersion.keySet();
            this.submitTasks(sortedToBuildVersion, baseSegments);
        }
    }

    @VisibleForTesting
    Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> getRunningTasks() {
        return new Pair(this.runningTasks, this.runningVersion);
    }

    @VisibleForTesting
    Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegments() {
        Collection derivativeSegmentsCollection = this.metadataStorageCoordinator.retrieveAllUsedSegments(this.dataSource, Segments.ONLY_VISIBLE);
        Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> derivativeSegmentsSnapshot = this.getVersionAndBaseSegments(derivativeSegmentsCollection);
        Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> baseSegmentsSnapshot = this.getMaxCreateDateAndBaseSegments(this.metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(this.spec.getBaseDataSource(), Collections.singletonList(Intervals.ETERNITY)));
        Map baseSegments = (Map)baseSegmentsSnapshot.rhs;
        Map derivativeSegments = (Map)derivativeSegmentsSnapshot.rhs;
        Map maxCreatedDate = (Map)baseSegmentsSnapshot.lhs;
        Map derivativeVersion = (Map)derivativeSegmentsSnapshot.lhs;
        TreeMap sortedToBuildInterval = new TreeMap(Comparators.intervalsByStartThenEnd().reversed());
        MapDifference difference = Maps.difference((Map)maxCreatedDate, (Map)derivativeVersion);
        HashMap<Interval, String> toBuildInterval = new HashMap<Interval, String>(difference.entriesOnlyOnLeft());
        HashMap toDropInterval = new HashMap(difference.entriesOnlyOnRight());
        HashMap checkIfNewestVersion = new HashMap(difference.entriesDiffering());
        for (Map.Entry entry : checkIfNewestVersion.entrySet()) {
            int usedCount;
            String versionOfBase = (String)maxCreatedDate.get(entry.getKey());
            String versionOfDerivative = (String)derivativeVersion.get(entry.getKey());
            int baseCount = ((List)baseSegments.get(entry.getKey())).size();
            if (versionOfBase.compareTo(versionOfDerivative) <= 0 || baseCount != (usedCount = this.metadataStorageCoordinator.retrieveUsedSegmentsForInterval(this.spec.getBaseDataSource(), (Interval)entry.getKey(), Segments.ONLY_VISIBLE).size())) continue;
            toBuildInterval.put((Interval)entry.getKey(), versionOfBase);
        }
        this.runningVersion.forEach((interval, version) -> {
            if (toBuildInterval.containsKey(interval)) {
                if (((String)toBuildInterval.get(interval)).equals(version)) {
                    toBuildInterval.remove(interval);
                } else if (this.taskMaster.getTaskQueue().isPresent()) {
                    ((TaskQueue)this.taskMaster.getTaskQueue().get()).shutdown(this.runningTasks.get(interval).getId(), "version mismatch", new Object[0]);
                    this.runningTasks.remove(interval);
                }
            }
        });
        for (Interval interval2 : toDropInterval.keySet()) {
            for (DataSegment segment : (List)derivativeSegments.get(interval2)) {
                this.sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId());
            }
        }
        sortedToBuildInterval.putAll(toBuildInterval);
        return new Pair(sortedToBuildInterval, (Object)baseSegments);
    }

    private void submitTasks(SortedMap<Interval, String> sortedToBuildVersion, Map<Interval, List<DataSegment>> baseSegments) {
        for (Map.Entry<Interval, String> entry : sortedToBuildVersion.entrySet()) {
            if (this.runningTasks.size() >= this.maxTaskCount) continue;
            HadoopIndexTask task = this.spec.createTask(entry.getKey(), entry.getValue(), baseSegments.get(entry.getKey()));
            try {
                if (!this.taskMaster.getTaskQueue().isPresent()) continue;
                ((TaskQueue)this.taskMaster.getTaskQueue().get()).add((Task)task);
                this.runningVersion.put(entry.getKey(), entry.getValue());
                this.runningTasks.put(entry.getKey(), task);
            }
            catch (EntryExistsException e) {
                log.error("task %s already exsits", new Object[]{task});
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> getVersionAndBaseSegments(Collection<DataSegment> snapshot) {
        HashMap<Interval, String> versions = new HashMap<Interval, String>();
        HashMap<Interval, List> segments = new HashMap<Interval, List>();
        for (DataSegment segment : snapshot) {
            Interval interval = segment.getInterval();
            versions.put(interval, segment.getVersion());
            segments.computeIfAbsent(interval, i -> new ArrayList()).add(segment);
        }
        return new Pair(versions, segments);
    }

    private Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> getMaxCreateDateAndBaseSegments(Collection<Pair<DataSegment, String>> snapshot) {
        Interval maxAllowedToBuildInterval = snapshot.parallelStream().map(pair -> (DataSegment)pair.lhs).map(DataSegment::getInterval).max(Comparators.intervalsByStartThenEnd()).get();
        HashMap<Interval, String> maxCreatedDate = new HashMap<Interval, String>();
        HashMap<Interval, List> segments = new HashMap<Interval, List>();
        for (Pair<DataSegment, String> entry : snapshot) {
            DataSegment segment = (DataSegment)entry.lhs;
            String createDate = (String)entry.rhs;
            Interval interval = segment.getInterval();
            if (!this.hasEnoughLag(interval, maxAllowedToBuildInterval)) continue;
            maxCreatedDate.merge(interval, createDate, (date1, date2) -> DateTimes.max((DateTime)DateTimes.of((String)date1), (DateTime)DateTimes.of((String)date2)).toString());
            segments.computeIfAbsent(interval, i -> new ArrayList()).add(segment);
        }
        return new Pair(maxCreatedDate, segments);
    }

    private boolean hasEnoughLag(Interval target, Interval maxInterval) {
        return this.minDataLagMs <= maxInterval.getStartMillis() - target.getStartMillis();
    }

    private void clearTasks() {
        for (HadoopIndexTask task : this.runningTasks.values()) {
            if (!this.taskMaster.getTaskQueue().isPresent()) continue;
            ((TaskQueue)this.taskMaster.getTaskQueue().get()).shutdown(task.getId(), "killing all tasks", new Object[0]);
        }
        this.runningTasks.clear();
        this.runningVersion.clear();
    }

    private void clearSegments() {
        log.info("Clear all metadata of dataSource %s", new Object[]{this.dataSource});
        this.metadataStorageCoordinator.deletePendingSegments(this.dataSource);
        this.sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(this.dataSource);
        this.metadataStorageCoordinator.deleteDataSourceMetadata(this.dataSource);
    }

    private void commitDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {
        if (!this.metadataStorageCoordinator.insertDataSourceMetadata(this.dataSource, dataSourceMetadata)) {
            try {
                this.metadataStorageCoordinator.resetDataSourceMetadata(this.dataSource, dataSourceMetadata);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

