/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AssignedTasks<T extends AbstractTask> {
    private static final Logger log = LoggerFactory.getLogger(AssignedTasks.class);
    private final String logPrefix;
    private final String taskTypeName;
    private final Time time;
    private Map<TaskId, T> created = new HashMap<TaskId, T>();
    private Map<TaskId, T> suspended = new HashMap<TaskId, T>();
    private Map<TaskId, T> restoring = new HashMap<TaskId, T>();
    private Set<TopicPartition> restoredPartitions = new HashSet<TopicPartition>();
    private Set<TaskId> previousActiveTasks = new HashSet<TaskId>();
    private Map<TaskId, T> running = new ConcurrentHashMap<TaskId, T>();
    private Map<TopicPartition, T> runningByPartition = new HashMap<TopicPartition, T>();

    AssignedTasks(String logPrefix, String taskTypeName, Time time) {
        this.logPrefix = logPrefix;
        this.taskTypeName = taskTypeName;
        this.time = time;
    }

    void addNewTask(T task) {
        log.trace("{} Add newly created {} {} with assigned partitions {}", new Object[]{this.logPrefix, this.taskTypeName, ((AbstractTask)task).id(), ((AbstractTask)task).partitions()});
        this.created.put(((AbstractTask)task).id(), task);
    }

    Set<TopicPartition> uninitializedPartitions() {
        if (this.created.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        for (Map.Entry<TaskId, T> entry : this.created.entrySet()) {
            if (!((AbstractTask)entry.getValue()).hasStateStores()) continue;
            partitions.addAll(((AbstractTask)entry.getValue()).partitions());
        }
        return partitions;
    }

    void initializeNewTasks() {
        if (!this.created.isEmpty()) {
            log.trace("{} Initializing {}s {}", new Object[]{this.logPrefix, this.taskTypeName, this.created.keySet()});
        }
        Iterator<Map.Entry<TaskId, T>> it = this.created.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, T> entry = it.next();
            try {
                if (!((AbstractTask)entry.getValue()).initializeStateStores()) {
                    log.debug("{} transitioning {} {} to restoring", new Object[]{this.logPrefix, this.taskTypeName, entry.getKey()});
                    this.restoring.put(entry.getKey(), entry.getValue());
                } else {
                    this.transitionToRunning((AbstractTask)entry.getValue());
                }
                it.remove();
            }
            catch (LockException e) {
                log.trace("{} Could not create {} {} due to {}; will retry in the next run loop", new Object[]{this.logPrefix, this.taskTypeName, entry.getKey(), e.getMessage()});
            }
        }
    }

    Set<TopicPartition> updateRestored(Collection<TopicPartition> restored) {
        if (restored.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet<TopicPartition> resume = new HashSet<TopicPartition>();
        this.restoredPartitions.addAll(restored);
        Iterator<Map.Entry<TaskId, T>> it = this.restoring.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, T> entry = it.next();
            AbstractTask task = (AbstractTask)entry.getValue();
            if (this.restoredPartitions.containsAll(task.changelogPartitions())) {
                this.transitionToRunning(task);
                resume.addAll(task.partitions());
                it.remove();
                log.trace("{} {} {} completed restoration as all its changelog partitions {} have been applied to restore state", new Object[]{this.logPrefix, this.taskTypeName, task.id(), task.changelogPartitions()});
                continue;
            }
            if (!log.isTraceEnabled()) continue;
            HashSet<TopicPartition> outstandingPartitions = new HashSet<TopicPartition>(task.changelogPartitions());
            outstandingPartitions.removeAll(this.restoredPartitions);
            log.trace("{} partition restoration not complete for {} {} partitions: {}", new Object[]{this.logPrefix, this.taskTypeName, task.id(), task.changelogPartitions()});
        }
        if (this.allTasksRunning()) {
            this.restoredPartitions.clear();
        }
        return resume;
    }

    boolean allTasksRunning() {
        return this.created.isEmpty() && this.suspended.isEmpty() && this.restoring.isEmpty();
    }

    Collection<T> runningTasks() {
        return this.running.values();
    }

    RuntimeException suspend() {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        log.trace("{} Suspending running {} {}", new Object[]{this.logPrefix, this.taskTypeName, this.runningTaskIds()});
        firstException.compareAndSet(null, this.suspendTasks(this.running.values()));
        log.trace("{} Close restoring {} {}", new Object[]{this.logPrefix, this.taskTypeName, this.restoring.keySet()});
        firstException.compareAndSet(null, this.closeTasksUnclean(this.restoring.values()));
        firstException.compareAndSet(null, this.closeTasksUnclean(this.created.values()));
        this.previousActiveTasks.clear();
        this.previousActiveTasks.addAll(this.running.keySet());
        this.running.clear();
        this.restoring.clear();
        this.created.clear();
        this.runningByPartition.clear();
        return firstException.get();
    }

    private RuntimeException closeTasksUnclean(Collection<T> tasks) {
        RuntimeException exception = null;
        for (AbstractTask task : tasks) {
            try {
                task.close(false, false);
            }
            catch (RuntimeException e) {
                log.error("{} Failed to close {}, {}", new Object[]{this.logPrefix, this.taskTypeName, task.id, e});
                if (exception != null) continue;
                exception = e;
            }
        }
        return exception;
    }

    private RuntimeException suspendTasks(Collection<T> tasks) {
        RuntimeException exception = null;
        Iterator<T> it = tasks.iterator();
        while (it.hasNext()) {
            AbstractTask task = (AbstractTask)it.next();
            try {
                task.suspend();
                this.suspended.put(task.id(), task);
            }
            catch (CommitFailedException e) {
                this.suspended.put(task.id(), task);
                log.warn("{} Failed to commit {} {} state when suspending due to CommitFailedException", new Object[]{this.logPrefix, this.taskTypeName, task.id});
            }
            catch (ProducerFencedException e) {
                this.closeZombieTask(task);
                it.remove();
            }
            catch (RuntimeException e) {
                log.error("{} Suspending {} {} failed due to the following error:", new Object[]{this.logPrefix, this.taskTypeName, task.id, e});
                try {
                    task.close(false, false);
                }
                catch (Exception f) {
                    log.error("{} After suspending failed, closing the same {} {} failed again due to the following error:", new Object[]{this.logPrefix, this.taskTypeName, task.id, f});
                }
                if (exception != null) continue;
                exception = e;
            }
        }
        return exception;
    }

    private void closeZombieTask(T task) {
        log.warn("{} Producer of {} {} fenced; closing zombie task", new Object[]{this.logPrefix, this.taskTypeName, ((AbstractTask)task).id});
        try {
            ((AbstractTask)task).close(false, true);
        }
        catch (Exception e) {
            log.warn("{} Failed to close zombie {} due to {}, ignore and proceed", new Object[]{this.taskTypeName, this.logPrefix, e});
        }
    }

    boolean hasRunningTasks() {
        return !this.running.isEmpty();
    }

    boolean maybeResumeSuspendedTask(TaskId taskId, Set<TopicPartition> partitions) {
        if (this.suspended.containsKey(taskId)) {
            AbstractTask task = (AbstractTask)this.suspended.get(taskId);
            if (task.partitions().equals(partitions)) {
                this.suspended.remove(taskId);
                log.trace("{} Resuming suspended {} {} with assigned partitions {}", new Object[]{this.logPrefix, this.taskTypeName, taskId, partitions});
                task.resume();
                try {
                    this.transitionToRunning(task);
                }
                catch (ProducerFencedException e) {
                    this.closeZombieTask(task);
                    this.suspended.remove(taskId);
                    this.running.remove(task.id());
                    throw e;
                }
                return true;
            }
            log.trace("{} couldn't resume task {} assigned partitions {}, task partitions", new Object[]{this.logPrefix, taskId, partitions, task.partitions});
        }
        return false;
    }

    private void transitionToRunning(T task) {
        log.debug("{} transitioning {} {} to running", new Object[]{this.logPrefix, this.taskTypeName, ((AbstractTask)task).id()});
        this.running.put(((AbstractTask)task).id(), task);
        ((AbstractTask)task).initializeTopology();
        for (TopicPartition topicPartition : ((AbstractTask)task).partitions()) {
            this.runningByPartition.put(topicPartition, task);
        }
        for (TopicPartition topicPartition : ((AbstractTask)task).changelogPartitions()) {
            this.runningByPartition.put(topicPartition, task);
        }
    }

    T runningTaskFor(TopicPartition partition) {
        return (T)((AbstractTask)this.runningByPartition.get(partition));
    }

    Set<TaskId> runningTaskIds() {
        return this.running.keySet();
    }

    Map<TaskId, T> runningTaskMap() {
        return Collections.unmodifiableMap(this.running);
    }

    public String toString(String indent) {
        StringBuilder builder = new StringBuilder();
        this.describe(builder, this.running.values(), indent, "Running:");
        this.describe(builder, this.suspended.values(), indent, "Suspended:");
        this.describe(builder, this.restoring.values(), indent, "Restoring:");
        this.describe(builder, this.created.values(), indent, "New:");
        return builder.toString();
    }

    private void describe(StringBuilder builder, Collection<T> tasks, String indent, String name) {
        builder.append(indent).append(name);
        for (AbstractTask t : tasks) {
            builder.append(indent).append(t.toString(indent + "\t\t"));
        }
        builder.append("\n");
    }

    List<AbstractTask> allInitializedTasks() {
        ArrayList<AbstractTask> tasks = new ArrayList<AbstractTask>();
        tasks.addAll(this.running.values());
        tasks.addAll(this.suspended.values());
        tasks.addAll(this.restoring.values());
        return tasks;
    }

    Collection<T> suspendedTasks() {
        return this.suspended.values();
    }

    Collection<T> restoringTasks() {
        return Collections.unmodifiableCollection(this.restoring.values());
    }

    Collection<TaskId> allAssignedTaskIds() {
        ArrayList<TaskId> taskIds = new ArrayList<TaskId>();
        taskIds.addAll(this.running.keySet());
        taskIds.addAll(this.suspended.keySet());
        taskIds.addAll(this.restoring.keySet());
        taskIds.addAll(this.created.keySet());
        return taskIds;
    }

    void clear() {
        this.runningByPartition.clear();
        this.running.clear();
        this.created.clear();
        this.suspended.clear();
        this.restoredPartitions.clear();
    }

    Set<TaskId> previousTaskIds() {
        return this.previousActiveTasks;
    }

    void commit() {
        RuntimeException exception = this.applyToRunningTasks(new TaskAction<T>(){

            @Override
            public String name() {
                return "commit";
            }

            @Override
            public void apply(T task) {
                ((AbstractTask)task).commit();
            }
        }, false);
        if (exception != null) {
            throw exception;
        }
    }

    int process() {
        final AtomicInteger processed = new AtomicInteger(0);
        this.applyToRunningTasks(new TaskAction<T>(){

            @Override
            public String name() {
                return "process";
            }

            @Override
            public void apply(T task) {
                if (((AbstractTask)task).process()) {
                    processed.incrementAndGet();
                }
            }
        }, true);
        return processed.get();
    }

    void punctuateAndCommit(final Sensor commitTimeSensor, final Sensor punctuateTimeSensor) {
        final Latency latency = new Latency(this.time.milliseconds());
        RuntimeException exception = this.applyToRunningTasks(new TaskAction<T>(){
            String name;

            @Override
            public String name() {
                return this.name;
            }

            @Override
            public void apply(T task) {
                this.name = "punctuate";
                if (((AbstractTask)task).maybePunctuate()) {
                    punctuateTimeSensor.record((double)latency.compute(), latency.startTime);
                }
                if (((AbstractTask)task).commitNeeded()) {
                    this.name = "commit";
                    long beforeCommitMs = AssignedTasks.this.time.milliseconds();
                    ((AbstractTask)task).commit();
                    commitTimeSensor.record((double)latency.compute(), latency.startTime);
                    if (log.isDebugEnabled()) {
                        log.debug("{} Committed active task {} per user request in {} ms", new Object[]{AssignedTasks.this.logPrefix, ((AbstractTask)task).id(), latency.startTime - beforeCommitMs});
                    }
                }
            }
        }, false);
        if (exception != null) {
            throw exception;
        }
    }

    Collection<TaskId> suspendedTaskIds() {
        return this.suspended.keySet();
    }

    private RuntimeException applyToRunningTasks(TaskAction<T> action, boolean throwException) {
        RuntimeException firstException = null;
        Iterator<T> it = this.runningTasks().iterator();
        while (it.hasNext()) {
            AbstractTask task = (AbstractTask)it.next();
            try {
                action.apply(task);
            }
            catch (CommitFailedException e) {
                log.warn("{} Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", new Object[]{this.logPrefix, this.taskTypeName, task.id(), action.name()});
            }
            catch (ProducerFencedException e) {
                this.closeZombieTask(task);
                it.remove();
            }
            catch (RuntimeException t) {
                log.error("{} Failed to {} {} {} due to the following error:", new Object[]{this.logPrefix, action.name(), this.taskTypeName, task.id(), t});
                if (throwException) {
                    throw t;
                }
                if (firstException != null) continue;
                firstException = t;
            }
        }
        return firstException;
    }

    class Latency {
        private long startTime;

        Latency(long startTime) {
            this.startTime = startTime;
        }

        private long compute() {
            long previousTimeMs = this.startTime;
            this.startTime = AssignedTasks.this.time.milliseconds();
            return Math.max(this.startTime - previousTimeMs, 0L);
        }
    }

    static interface TaskAction<T extends AbstractTask> {
        public String name();

        public void apply(T var1);
    }
}

