/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.ClusterServiceImpl;
import com.hazelcast.internal.cluster.impl.MembershipManager;
import com.hazelcast.internal.cluster.impl.operations.TriggerMemberListPublishOp;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.collectors.MetricsCollector;
import com.hazelcast.internal.metrics.impl.MetricsCompressor;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.JetProperties;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.impl.JobMetricsUtil;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.ProcessorClassLoaderTLHolder;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.deployment.ChildFirstClassLoader;
import com.hazelcast.jet.impl.deployment.JetClassLoader;
import com.hazelcast.jet.impl.deployment.JetDelegatingClassLoader;
import com.hazelcast.jet.impl.exception.ExecutionNotFoundException;
import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.execution.TaskletExecutionService;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.metrics.RawJobMetrics;
import com.hazelcast.jet.impl.operation.CheckLightJobsOperation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.exception.TargetNotMemberException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.AccessController;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class JobExecutionService
implements DynamicMetricsProvider {
    private static final long UNINITIALIZED_CONTEXT_MAX_AGE_NS = TimeUnit.MINUTES.toNanos(5L);
    private final Object mutex = new Object();
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final TaskletExecutionService taskletExecutionService;
    private final JobRepository jobRepository;
    private final Set<Long> executionContextJobIds = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ConcurrentMap<Long, ExecutionContext> executionContexts = new ConcurrentHashMap<Long, ExecutionContext>();
    private final ConcurrentHashMap<Long, JetDelegatingClassLoader> classLoaders = new ConcurrentHashMap();
    private final ConcurrentHashMap<Long, Map<String, ClassLoader>> processorCls = new ConcurrentHashMap();
    @Probe(name="jobs.executionStarted")
    private final Counter executionStarted = MwCounter.newMwCounter();
    @Probe(name="jobs.executionCompleted")
    private final Counter executionCompleted = MwCounter.newMwCounter();
    private final Function<? super Long, ? extends ExecutionContext> newLightJobExecutionContextFunction;
    private final ScheduledFuture<?> lightExecutionsCheckerFuture;

    JobExecutionService(NodeEngineImpl nodeEngine, TaskletExecutionService taskletExecutionService, JobRepository jobRepository) {
        this.nodeEngine = nodeEngine;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.taskletExecutionService = taskletExecutionService;
        this.jobRepository = jobRepository;
        this.newLightJobExecutionContextFunction = execId -> new ExecutionContext(nodeEngine, (long)execId, (long)execId, true);
        MetricsRegistry registry = nodeEngine.getMetricsRegistry();
        MetricDescriptor descriptor = registry.newMetricDescriptor().withTag("module", "jet");
        registry.registerStaticMetrics(descriptor, this);
        this.lightExecutionsCheckerFuture = nodeEngine.getExecutionService().scheduleWithRepetition(this::checkLightExecutions, 0L, 1L, TimeUnit.SECONDS);
    }

    public Long getExecutionIdForJobId(long jobId) {
        return this.executionContexts.values().stream().filter(ec -> ec.jobId() == jobId).findAny().map(ExecutionContext::executionId).orElse(null);
    }

    public ClassLoader getClassLoader(JobConfig config, long jobId) {
        JetConfig jetConfig = this.nodeEngine.getConfig().getJetConfig();
        return this.classLoaders.computeIfAbsent(jobId, k -> AccessController.doPrivileged(() -> {
            ClassLoader parent = this.parentClassLoader(config);
            if (!jetConfig.isResourceUploadEnabled()) {
                return new JetDelegatingClassLoader(parent);
            }
            return new JetClassLoader(this.nodeEngine, parent, config.getName(), jobId, this.jobRepository);
        }));
    }

    private ClassLoader parentClassLoader(JobConfig config) {
        return config.getClassLoaderFactory() != null ? config.getClassLoaderFactory().getJobClassLoader() : this.nodeEngine.getConfigClassLoader();
    }

    public void prepareProcessorClassLoaders(long jobId, JobConfig jobConfig) {
        ProcessorClassLoaderTLHolder.putAll(this.getProcessorClassLoaders(jobId, jobConfig));
    }

    public void clearProcessorClassLoaders() {
        ProcessorClassLoaderTLHolder.remove();
    }

    public ClassLoader getProcessorClassLoader(long jobId, String name) {
        Map<String, ClassLoader> processorClsForJob = this.processorCls.get(jobId);
        if (processorClsForJob != null) {
            return processorClsForJob.get(name);
        }
        throw new HazelcastException("Processor classloader for jobId=" + jobId + " requested, but it does not exists");
    }

    public Map<String, ClassLoader> getProcessorClassLoaders(long jobId, JobConfig jobConfig) {
        return this.processorCls.computeIfAbsent(jobId, key -> this.createProcessorClassLoaders(jobId, jobConfig));
    }

    private Map<String, ClassLoader> createProcessorClassLoaders(long jobId, JobConfig jobConfig) {
        String customLibDir = this.nodeEngine.getConfig().getProperty(JetProperties.PROCESSOR_CUSTOM_LIB_DIR.getName());
        HashMap<String, ChildFirstClassLoader> classLoaderMap = new HashMap<String, ChildFirstClassLoader>();
        ClassLoader parent = this.getClassLoader(jobConfig, jobId);
        for (Map.Entry<String, List<String>> entry : jobConfig.getCustomClassPaths().entrySet()) {
            List<URL> list = entry.getValue().stream().map(jar -> {
                try {
                    Path path = Paths.get(customLibDir, jar);
                    return path.toUri().toURL();
                }
                catch (MalformedURLException e) {
                    throw new JetException(e);
                }
            }).collect(Collectors.toList());
            URL[] urls = list.toArray(new URL[0]);
            classLoaderMap.put(entry.getKey(), new ChildFirstClassLoader(urls, parent));
        }
        return Collections.unmodifiableMap(classLoaderMap);
    }

    public ExecutionContext getExecutionContext(long executionId) {
        return (ExecutionContext)this.executionContexts.get(executionId);
    }

    public ExecutionContext getOrCreateExecutionContext(long executionId) {
        return this.executionContexts.computeIfAbsent(executionId, this.newLightJobExecutionContextFunction);
    }

    public Collection<ExecutionContext> getExecutionContexts() {
        return this.executionContexts.values();
    }

    Map<ExecutionContext.SenderReceiverKey, SenderTasklet> getSenderMap(long executionId) {
        ExecutionContext ctx = (ExecutionContext)this.executionContexts.get(executionId);
        return ctx != null ? ctx.senderMap() : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        this.lightExecutionsCheckerFuture.cancel(false);
        Object object = this.mutex;
        synchronized (object) {
            this.cancelAllExecutions("Node is shutting down");
        }
    }

    public void reset() {
        this.cancelAllExecutions("reset");
    }

    public void cancelAllExecutions(String reason) {
        for (ExecutionContext exeCtx : this.executionContexts.values()) {
            LoggingUtil.logFine(this.logger, "Completing %s locally. Reason: %s", exeCtx.jobNameAndExecutionId(), reason);
            this.terminateExecution0(exeCtx, null);
        }
    }

    void onMemberRemoved(Address address) {
        this.executionContexts.values().stream().filter(exeCtx -> exeCtx.coordinator() != null && (exeCtx.coordinator().equals(address) || exeCtx.hasParticipant(address))).forEach(exeCtx -> {
            LoggingUtil.logFine(this.logger, "Completing %s locally. Reason: Member %s left the cluster", exeCtx.jobNameAndExecutionId(), address);
            this.terminateExecution0((ExecutionContext)exeCtx, null);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<RawJobMetrics> runLightJob(long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion, Set<MemberInfo> participants, ExecutionPlan plan) {
        ExecutionContext execCtx;
        assert (executionId == jobId) : "executionId(" + com.hazelcast.jet.Util.idToString(executionId) + ") != jobId(" + com.hazelcast.jet.Util.idToString(jobId) + ")";
        this.verifyClusterInformation(jobId, executionId, coordinator, coordinatorMemberListVersion, participants);
        this.failIfNotRunning();
        Object object = this.mutex;
        synchronized (object) {
            this.addExecutionContextJobId(jobId, executionId, coordinator);
            execCtx = this.executionContexts.computeIfAbsent(executionId, x -> new ExecutionContext(this.nodeEngine, jobId, executionId, true));
        }
        try {
            this.prepareProcessorClassLoaders(jobId, plan.getJobConfig());
            Set addresses = participants.stream().map(MemberInfo::getAddress).collect(Collectors.toSet());
            ClassLoader jobCl = this.getClassLoader(plan.getJobConfig(), jobId);
            Util.doWithClassLoader(jobCl, () -> execCtx.initialize(coordinator, addresses, plan));
        }
        catch (Throwable e) {
            this.completeExecution(execCtx, new CancellationException());
            throw e;
        }
        finally {
            this.clearProcessorClassLoaders();
        }
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Execution plan for light job ID=" + com.hazelcast.jet.Util.idToString(jobId) + ", jobName=" + (execCtx.jobName() != null ? '\'' + execCtx.jobName() + '\'' : "null") + ", executionId=" + com.hazelcast.jet.Util.idToString(executionId) + " initialized, will start the execution");
        }
        return this.beginExecution0(execCtx, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initExecution(long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion, Set<MemberInfo> participants, ExecutionPlan plan) {
        ExecutionContext execCtx;
        this.assertIsMaster(jobId, executionId, coordinator);
        this.verifyClusterInformation(jobId, executionId, coordinator, coordinatorMemberListVersion, participants);
        this.failIfNotRunning();
        Object object = this.mutex;
        synchronized (object) {
            this.addExecutionContextJobId(jobId, executionId, coordinator);
            execCtx = new ExecutionContext(this.nodeEngine, jobId, executionId, false);
            ExecutionContext oldContext = this.executionContexts.put(executionId, execCtx);
            if (oldContext != null) {
                throw new RuntimeException("Duplicate ExecutionContext for execution " + com.hazelcast.jet.Util.idToString(executionId));
            }
        }
        try {
            this.prepareProcessorClassLoaders(jobId, plan.getJobConfig());
            Set addresses = participants.stream().map(MemberInfo::getAddress).collect(Collectors.toSet());
            ClassLoader jobCl = this.getClassLoader(plan.getJobConfig(), jobId);
            Util.doWithClassLoader(jobCl, () -> execCtx.initialize(coordinator, addresses, plan));
        }
        finally {
            this.clearProcessorClassLoaders();
        }
        this.logger.info("Execution plan for jobId=" + com.hazelcast.jet.Util.idToString(jobId) + ", jobName=" + (execCtx.jobName() != null ? '\'' + execCtx.jobName() + '\'' : "null") + ", executionId=" + com.hazelcast.jet.Util.idToString(executionId) + " initialized");
    }

    private void addExecutionContextJobId(long jobId, long executionId, Address coordinator) {
        if (!this.executionContextJobIds.add(jobId)) {
            ExecutionContext current = (ExecutionContext)this.executionContexts.get(executionId);
            if (current != null) {
                throw new IllegalStateException(String.format("Execution context for %s for coordinator %s already exists for coordinator %s", current.jobNameAndExecutionId(), coordinator, current.coordinator()));
            }
            if (this.logger.isFineEnabled()) {
                this.executionContexts.values().stream().filter(e -> e.jobId() == jobId).forEach(e -> this.logger.fine(String.format("Execution context for job %s for coordinator %s already exists with local execution %s for coordinator %s", com.hazelcast.jet.Util.idToString(jobId), coordinator, com.hazelcast.jet.Util.idToString(e.executionId()), e.coordinator())));
            }
            throw new RetryableHazelcastException();
        }
    }

    private void assertIsMaster(long jobId, long executionId, Address coordinator) {
        Address masterAddress = this.nodeEngine.getMasterAddress();
        if (!coordinator.equals(masterAddress)) {
            this.failIfNotRunning();
            throw new IllegalStateException(String.format("Coordinator %s cannot initialize %s. Reason: it is not the master, the master is %s", coordinator, Util.jobIdAndExecutionId(jobId, executionId), masterAddress));
        }
    }

    private void verifyClusterInformation(long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion, Set<MemberInfo> participants) {
        Address masterAddress = this.nodeEngine.getMasterAddress();
        ClusterServiceImpl clusterService = (ClusterServiceImpl)this.nodeEngine.getClusterService();
        MembershipManager membershipManager = clusterService.getMembershipManager();
        int localMemberListVersion = membershipManager.getMemberListVersion();
        Address thisAddress = this.nodeEngine.getThisAddress();
        if (coordinatorMemberListVersion > localMemberListVersion) {
            assert (!masterAddress.equals(thisAddress)) : String.format("Local node: %s is master but InitOperation has coordinator member list version: %s larger than  local member list version: %s", thisAddress, coordinatorMemberListVersion, localMemberListVersion);
            this.nodeEngine.getOperationService().send(new TriggerMemberListPublishOp(), masterAddress);
            throw new RetryableHazelcastException(String.format("Cannot initialize %s for coordinator %s, local member list version %s, coordinator member list version %s", Util.jobIdAndExecutionId(jobId, executionId), coordinator, localMemberListVersion, coordinatorMemberListVersion));
        }
        boolean isLocalMemberParticipant = false;
        for (MemberInfo participant : participants) {
            if (participant.getAddress().equals(thisAddress)) {
                isLocalMemberParticipant = true;
            }
            if (membershipManager.getMember(participant.getAddress(), participant.getUuid()) != null) continue;
            throw new TopologyChangedException(String.format("Cannot initialize %s for coordinator %s: participant %s not found in local member list. Local member list version: %s, coordinator member list version: %s", Util.jobIdAndExecutionId(jobId, executionId), coordinator, participant, localMemberListVersion, coordinatorMemberListVersion));
        }
        if (!isLocalMemberParticipant) {
            throw new IllegalArgumentException(String.format("Cannot initialize %s since member %s is not in participants: %s", Util.jobIdAndExecutionId(jobId, executionId), thisAddress, participants));
        }
    }

    private void failIfNotRunning() {
        if (!this.nodeEngine.isRunning()) {
            throw new HazelcastInstanceNotActiveException();
        }
    }

    @Nonnull
    public ExecutionContext assertExecutionContext(Address callerAddress, long jobId, long executionId, String callerOpName) {
        Address masterAddress = this.nodeEngine.getMasterAddress();
        if (!callerAddress.equals(masterAddress)) {
            this.failIfNotRunning();
            throw new IllegalStateException(String.format("Caller %s cannot do '%s' for %s: it is not the master, the master is %s", callerAddress, callerOpName, Util.jobIdAndExecutionId(jobId, executionId), masterAddress));
        }
        this.failIfNotRunning();
        ExecutionContext executionContext = (ExecutionContext)this.executionContexts.get(executionId);
        if (executionContext == null) {
            throw new ExecutionNotFoundException(String.format("%s not found for coordinator %s for '%s'", Util.jobIdAndExecutionId(jobId, executionId), callerAddress, callerOpName));
        }
        if (!executionContext.coordinator().equals(callerAddress) || executionContext.jobId() != jobId) {
            throw new IllegalStateException(String.format("%s, originally from coordinator %s, cannot do '%s' by coordinator %s and execution %s", executionContext.jobNameAndExecutionId(), executionContext.coordinator(), callerOpName, callerAddress, com.hazelcast.jet.Util.idToString(executionId)));
        }
        return executionContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void completeExecution(@Nonnull ExecutionContext executionContext, Throwable error) {
        this.executionContexts.remove(executionContext.executionId());
        JetDelegatingClassLoader removedClassLoader = this.classLoaders.remove(executionContext.jobId());
        try {
            Util.doWithClassLoader((ClassLoader)removedClassLoader, () -> executionContext.completeExecution(error));
        }
        finally {
            this.processorCls.remove(executionContext.jobId());
            this.executionCompleted.inc();
            if (removedClassLoader != null) {
                removedClassLoader.shutdown();
            }
            this.executionContextJobIds.remove(executionContext.jobId());
            this.logger.fine("Completed execution of " + executionContext.jobNameAndExecutionId());
        }
    }

    public void updateMetrics(@Nonnull Long executionId, RawJobMetrics metrics) {
        ExecutionContext executionContext = (ExecutionContext)this.executionContexts.get(executionId);
        if (executionContext != null) {
            executionContext.setJobMetrics(metrics);
        }
    }

    public CompletableFuture<RawJobMetrics> beginExecution(Address coordinator, long jobId, long executionId, boolean collectMetrics) {
        ExecutionContext execCtx = this.assertExecutionContext(coordinator, jobId, executionId, "StartExecutionOperation");
        assert (!execCtx.isLightJob()) : "StartExecutionOperation received for a light job " + com.hazelcast.jet.Util.idToString(jobId);
        this.logger.info("Start execution of " + execCtx.jobNameAndExecutionId() + " from coordinator " + coordinator);
        return this.beginExecution0(execCtx, collectMetrics);
    }

    public CompletableFuture<RawJobMetrics> beginExecution0(ExecutionContext execCtx, boolean collectMetrics) {
        this.executionStarted.inc();
        return ((CompletableFuture)execCtx.beginExecution(this.taskletExecutionService).thenApply(r -> {
            RawJobMetrics terminalMetrics;
            if (collectMetrics) {
                JobMetricsCollector metricsRenderer = new JobMetricsCollector(execCtx.executionId(), this.nodeEngine.getLocalMember(), this.logger);
                this.nodeEngine.getMetricsRegistry().collect(metricsRenderer);
                terminalMetrics = metricsRenderer.getMetrics();
            } else {
                terminalMetrics = null;
            }
            return terminalMetrics;
        })).whenCompleteAsync(ExceptionUtil.withTryCatch(this.logger, (i, e) -> {
            this.completeExecution(execCtx, ExceptionUtil.peel(e));
            if (e instanceof CancellationException) {
                this.logger.fine("Execution of " + execCtx.jobNameAndExecutionId() + " was cancelled");
            } else if (e != null) {
                this.logger.fine("Execution of " + execCtx.jobNameAndExecutionId() + " completed with failure", (Throwable)e);
            } else {
                this.logger.fine("Execution of " + execCtx.jobNameAndExecutionId() + " completed");
            }
        }));
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        try {
            descriptor.withTag("module", "jet");
            this.executionContexts.forEach((id, ctx) -> ctx.provideDynamicMetrics(descriptor.copy(), context));
        }
        catch (Throwable t) {
            this.logger.warning("Dynamic metric collection failed", t);
            throw t;
        }
    }

    private void checkLightExecutions() {
        try {
            long uninitializedContextThreshold = System.nanoTime() - UNINITIALIZED_CONTEXT_MAX_AGE_NS;
            HashMap<Address, List> executionsPerMember = new HashMap<Address, List>();
            for (ExecutionContext executionContext : this.executionContexts.values()) {
                if (!executionContext.isLightJob()) continue;
                Address coordinator = executionContext.coordinator();
                if (coordinator != null) {
                    executionsPerMember.computeIfAbsent(coordinator, k -> new ArrayList()).add(executionContext.executionId());
                    continue;
                }
                if (executionContext.getCreatedOn() > uninitializedContextThreshold) continue;
                LoggingUtil.logFine(this.logger, "Terminating light job %s because it wasn't initialized during %d seconds", com.hazelcast.jet.Util.idToString(executionContext.executionId()), TimeUnit.NANOSECONDS.toSeconds(UNINITIALIZED_CONTEXT_MAX_AGE_NS));
                this.terminateExecution0(executionContext, TerminationMode.CANCEL_FORCEFUL);
            }
            for (Map.Entry entry : executionsPerMember.entrySet()) {
                long[] executionIds = ((List)entry.getValue()).stream().mapToLong(Long::longValue).toArray();
                CheckLightJobsOperation op = new CheckLightJobsOperation(executionIds);
                InvocationFuture future = this.nodeEngine.getOperationService().createInvocationBuilder("hz:impl:jetService", (Operation)op, (Address)entry.getKey()).invoke();
                future.whenComplete((r, t) -> {
                    if (t instanceof TargetNotMemberException) {
                        r = executionIds;
                    } else if (t != null) {
                        this.logger.warning("Failed to check light job state with coordinator " + en.getKey() + ": " + t, (Throwable)t);
                        return;
                    }
                    assert (r != null);
                    for (long executionId : r) {
                        ExecutionContext execCtx = (ExecutionContext)this.executionContexts.get(executionId);
                        if (execCtx == null) continue;
                        this.logger.fine("Terminating light job " + com.hazelcast.jet.Util.idToString(executionId) + " because the coordinator doesn't know it");
                        this.terminateExecution0(execCtx, TerminationMode.CANCEL_FORCEFUL);
                    }
                });
            }
        }
        catch (Throwable e) {
            this.logger.severe("Failed to query live light executions: " + e, e);
        }
    }

    public void terminateExecution(long jobId, long executionId, Address callerAddress, TerminationMode mode) {
        Address masterAddress;
        this.failIfNotRunning();
        ExecutionContext executionContext = (ExecutionContext)this.executionContexts.get(executionId);
        if (executionContext == null) {
            return;
        }
        if (!executionContext.isLightJob() && !callerAddress.equals(masterAddress = this.nodeEngine.getMasterAddress())) {
            this.failIfNotRunning();
            throw new IllegalStateException(String.format("Caller %s cannot do '%s' for terminateExecution: it is not the master, the master is %s", callerAddress, Util.jobIdAndExecutionId(jobId, executionId), masterAddress));
        }
        Address coordinator = executionContext.coordinator();
        if (coordinator == null) {
            assert (executionContext.isLightJob()) : "null coordinator for non-light job";
        } else if (!coordinator.equals(callerAddress)) {
            throw new IllegalStateException(String.format("%s, originally from coordinator %s, cannot do 'terminateExecution' by coordinator %s and execution %s", executionContext.jobNameAndExecutionId(), coordinator, callerAddress, com.hazelcast.jet.Util.idToString(executionId)));
        }
        this.terminateExecution0(executionContext, mode);
    }

    public void terminateExecution0(ExecutionContext executionContext, TerminationMode mode) {
        if (!executionContext.terminateExecution(mode)) {
            this.logger.fine(executionContext.jobNameAndExecutionId() + " calling completeExecution because execution terminated before it started");
            this.completeExecution(executionContext, new CancellationException());
        }
    }

    public void waitAllExecutionsTerminated() {
        for (ExecutionContext ctx : this.executionContexts.values()) {
            try {
                ctx.getExecutionFuture().join();
            }
            catch (Throwable throwable) {}
        }
    }

    private static class JobMetricsCollector
    implements MetricsCollector {
        private final Long executionId;
        private final MetricsCompressor compressor;
        private final ILogger logger;
        private final UnaryOperator<MetricDescriptor> addPrefixFn;

        JobMetricsCollector(long executionId, @Nonnull Member member, @Nonnull ILogger logger) {
            Objects.requireNonNull(member, "member");
            this.logger = Objects.requireNonNull(logger, "logger");
            this.executionId = executionId;
            this.addPrefixFn = JobMetricsUtil.addMemberPrefixFn(member);
            this.compressor = new MetricsCompressor();
        }

        @Override
        public void collectLong(MetricDescriptor descriptor, long value) {
            System.out.println("bbb: " + descriptor + ", v=" + value);
            Long executionId = JobMetricsUtil.getExecutionIdFromMetricsDescriptor(descriptor);
            if (this.executionId.equals(executionId)) {
                System.out.println("taken");
                this.compressor.addLong((MetricDescriptor)this.addPrefixFn.apply(descriptor), value);
            }
        }

        @Override
        public void collectDouble(MetricDescriptor descriptor, double value) {
            Long executionId = JobMetricsUtil.getExecutionIdFromMetricsDescriptor(descriptor);
            if (this.executionId.equals(executionId)) {
                this.compressor.addDouble((MetricDescriptor)this.addPrefixFn.apply(descriptor), value);
            }
        }

        @Override
        public void collectException(MetricDescriptor descriptor, Exception e) {
            Long executionId = JobMetricsUtil.getExecutionIdFromMetricsDescriptor(descriptor);
            if (this.executionId.equals(executionId)) {
                this.logger.warning("Exception when rendering job metrics: " + e, e);
            }
        }

        @Override
        public void collectNoValue(MetricDescriptor descriptor) {
        }

        @Nonnull
        public RawJobMetrics getMetrics() {
            return RawJobMetrics.of(this.compressor.getBlobAndReset());
        }
    }
}

