/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.mapreduce.impl.task;

import com.hazelcast.mapreduce.JobPartitionState;
import com.hazelcast.mapreduce.KeyValueSource;
import com.hazelcast.mapreduce.LifecycleMapper;
import com.hazelcast.mapreduce.Mapper;
import com.hazelcast.mapreduce.PartitionIdAware;
import com.hazelcast.mapreduce.impl.MapReduceService;
import com.hazelcast.mapreduce.impl.MapReduceUtil;
import com.hazelcast.mapreduce.impl.notification.IntermediateChunkNotification;
import com.hazelcast.mapreduce.impl.notification.LastChunkNotification;
import com.hazelcast.mapreduce.impl.operation.PostPonePartitionProcessing;
import com.hazelcast.mapreduce.impl.operation.RequestMemberIdAssignment;
import com.hazelcast.mapreduce.impl.operation.RequestPartitionMapping;
import com.hazelcast.mapreduce.impl.operation.RequestPartitionProcessed;
import com.hazelcast.mapreduce.impl.operation.RequestPartitionReducing;
import com.hazelcast.mapreduce.impl.operation.RequestPartitionResult;
import com.hazelcast.mapreduce.impl.task.DefaultContext;
import com.hazelcast.mapreduce.impl.task.JobSupervisor;
import com.hazelcast.mapreduce.impl.task.JobTaskConfiguration;
import com.hazelcast.mapreduce.impl.task.KeyValueSourceFacade;
import com.hazelcast.mapreduce.impl.task.MappingPhase;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.util.ExceptionUtil;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

public class MapCombineTask<KeyIn, ValueIn, KeyOut, ValueOut, Chunk> {
    private final AtomicBoolean cancelled = new AtomicBoolean();
    private final Mapper<KeyIn, ValueIn, KeyOut, ValueOut> mapper;
    private final MappingPhase<KeyIn, ValueIn, KeyOut, ValueOut> mappingPhase;
    private final KeyValueSource<KeyIn, ValueIn> keyValueSource;
    private final MapReduceService mapReduceService;
    private final JobSupervisor supervisor;
    private final NodeEngine nodeEngine;
    private final String name;
    private final String jobId;
    private final int chunkSize;

    public MapCombineTask(JobTaskConfiguration configuration, JobSupervisor supervisor, MappingPhase<KeyIn, ValueIn, KeyOut, ValueOut> mappingPhase) {
        this.mappingPhase = mappingPhase;
        this.supervisor = supervisor;
        this.mapper = configuration.getMapper();
        this.name = configuration.getName();
        this.jobId = configuration.getJobId();
        this.chunkSize = configuration.getChunkSize();
        this.nodeEngine = configuration.getNodeEngine();
        this.mapReduceService = supervisor.getMapReduceService();
        this.keyValueSource = configuration.getKeyValueSource();
    }

    public String getName() {
        return this.name;
    }

    public String getJobId() {
        return this.jobId;
    }

    public int getChunkSize() {
        return this.chunkSize;
    }

    public void cancel() {
        this.cancelled.set(true);
        this.mappingPhase.cancel();
    }

    public void process() {
        ExecutorService es = this.mapReduceService.getExecutorService(this.name);
        if (this.keyValueSource instanceof PartitionIdAware) {
            es.submit(new PartitionProcessor());
        } else {
            es.submit(new SingleExecutionProcessor());
        }
    }

    public final void processMapping(int partitionId, DefaultContext<KeyOut, ValueOut> context, KeyValueSource<KeyIn, ValueIn> keyValueSource) throws Exception {
        context.setPartitionId(partitionId);
        if (this.mapper instanceof LifecycleMapper) {
            ((LifecycleMapper)this.mapper).initialize(context);
        }
        this.mappingPhase.executeMappingPhase(keyValueSource, this.mapper, context);
        if (this.mapper instanceof LifecycleMapper) {
            ((LifecycleMapper)this.mapper).finalized(context);
        }
        if (this.cancelled.get()) {
            return;
        }
    }

    void onEmit(DefaultContext<KeyOut, ValueOut> context, int partitionId) {
        if (this.supervisor.getConfiguration().getReducerFactory() != null && context.getCollected() == this.chunkSize) {
            Map chunkMap = context.requestChunk();
            Map mapping = MapReduceUtil.mapResultToMember(this.supervisor, chunkMap);
            this.supervisor.registerReducerEventInterests(partitionId, mapping.keySet());
            for (Map.Entry entry : mapping.entrySet()) {
                this.mapReduceService.sendNotification(entry.getKey(), new IntermediateChunkNotification(entry.getKey(), this.name, this.jobId, entry.getValue(), partitionId));
            }
        }
    }

    private void finalizeMapping(int partitionId, DefaultContext<KeyOut, ValueOut> context) throws Exception {
        RequestPartitionResult result = (RequestPartitionResult)this.mapReduceService.processRequest(this.supervisor.getJobOwner(), new RequestPartitionReducing(this.name, this.jobId, partitionId));
        if (result.getResultState() == RequestPartitionResult.ResultState.SUCCESSFUL && this.supervisor.getConfiguration().getReducerFactory() != null) {
            Map chunkMap = context.finish();
            if (chunkMap.size() > 0) {
                this.sendLastChunkToAssignedReducers(partitionId, chunkMap);
            } else {
                this.finalizeProcessing(partitionId);
            }
        }
    }

    private void finalizeProcessing(int partitionId) throws Exception {
        RequestPartitionResult result = (RequestPartitionResult)this.mapReduceService.processRequest(this.supervisor.getJobOwner(), new RequestPartitionProcessed(this.name, this.jobId, partitionId, JobPartitionState.State.REDUCING));
        if (result.getResultState() != RequestPartitionResult.ResultState.SUCCESSFUL) {
            throw new RuntimeException("Could not finalize processing for partitionId " + partitionId);
        }
    }

    private void sendLastChunkToAssignedReducers(int partitionId, Map<KeyOut, Chunk> chunkMap) {
        Address sender = this.mapReduceService.getLocalAddress();
        Map<Address, Map<KeyOut, Chunk>> mapping = MapReduceUtil.mapResultToMember(this.supervisor, chunkMap);
        this.supervisor.registerReducerEventInterests(partitionId, mapping.keySet());
        for (Map.Entry<Address, Map<KeyOut, Chunk>> entry : mapping.entrySet()) {
            Address receiver = entry.getKey();
            Map<KeyOut, Chunk> chunk = entry.getValue();
            this.mapReduceService.sendNotification(receiver, new LastChunkNotification<KeyOut, Chunk>(receiver, this.name, this.jobId, sender, partitionId, chunk));
        }
        Set<Address> addresses = mapping.keySet();
        Collection<Address> reducerInterests = this.supervisor.getReducerEventInterests(partitionId);
        if (reducerInterests != null) {
            for (Address address : reducerInterests) {
                if (addresses.contains(address)) continue;
                this.mapReduceService.sendNotification(address, new LastChunkNotification(address, this.name, this.jobId, sender, partitionId, Collections.emptyMap()));
            }
        }
    }

    private void postponePartitionProcessing(int partitionId) throws Exception {
        RequestPartitionResult result = (RequestPartitionResult)this.mapReduceService.processRequest(this.supervisor.getJobOwner(), new PostPonePartitionProcessing(this.name, this.jobId, partitionId));
        if (result.getResultState() != RequestPartitionResult.ResultState.SUCCESSFUL) {
            throw new RuntimeException("Could not postpone processing for partitionId " + partitionId + " -> " + (Object)((Object)result.getResultState()));
        }
    }

    private void handleProcessorThrowable(Throwable t) {
        MapReduceUtil.notifyRemoteException(this.supervisor, t);
        if (t instanceof Error) {
            ExceptionUtil.sneakyThrow(t);
        }
    }

    private void processPartitionMapping(KeyValueSource<KeyIn, ValueIn> delegate, int partitionId) throws Exception {
        delegate.reset();
        if (delegate.open(this.nodeEngine)) {
            DefaultContext context = this.supervisor.getOrCreateContext(this);
            this.processMapping(partitionId, context, delegate);
            delegate.close();
            this.finalizeMapping(partitionId, context);
        } else {
            this.postponePartitionProcessing(partitionId);
        }
    }

    private class SingleExecutionProcessor
    implements Runnable {
        private SingleExecutionProcessor() {
        }

        @Override
        public void run() {
            try {
                MapReduceUtil.enforcePartitionTableWarmup(MapCombineTask.this.mapReduceService);
                RequestPartitionResult result = (RequestPartitionResult)MapCombineTask.this.mapReduceService.processRequest(MapCombineTask.this.supervisor.getJobOwner(), new RequestMemberIdAssignment(MapCombineTask.this.name, MapCombineTask.this.jobId));
                if (result.getResultState() == RequestPartitionResult.ResultState.NO_SUPERVISOR) {
                    return;
                }
                if (result.getResultState() == RequestPartitionResult.ResultState.NO_MORE_PARTITIONS) {
                    return;
                }
                int partitionId = result.getPartitionId();
                KeyValueSourceFacade delegate = MapCombineTask.this.keyValueSource;
                if (MapCombineTask.this.supervisor.getConfiguration().isCommunicateStats()) {
                    delegate = new KeyValueSourceFacade(MapCombineTask.this.keyValueSource, MapCombineTask.this.supervisor);
                }
                MapCombineTask.this.processPartitionMapping(delegate, partitionId);
            }
            catch (Throwable t) {
                MapCombineTask.this.handleProcessorThrowable(t);
            }
        }
    }

    private class PartitionProcessor
    implements Runnable {
        private PartitionProcessor() {
        }

        @Override
        public void run() {
            KeyValueSourceFacade delegate = MapCombineTask.this.keyValueSource;
            if (MapCombineTask.this.supervisor.getConfiguration().isCommunicateStats()) {
                delegate = new KeyValueSourceFacade(MapCombineTask.this.keyValueSource, MapCombineTask.this.supervisor);
            }
            try {
                MapReduceUtil.enforcePartitionTableWarmup(MapCombineTask.this.mapReduceService);
            }
            catch (TimeoutException e) {
                MapCombineTask.this.handleProcessorThrowable(e);
            }
            this.processPartitions(delegate);
        }

        private void processPartitions(KeyValueSource<KeyIn, ValueIn> delegate) {
            while (!MapCombineTask.this.cancelled.get()) {
                Integer partitionId = this.findNewPartitionProcessing();
                if (partitionId == null) {
                    return;
                }
                if (partitionId == -1) continue;
                try {
                    ((PartitionIdAware)((Object)MapCombineTask.this.keyValueSource)).setPartitionId(partitionId);
                    MapCombineTask.this.processPartitionMapping(delegate, partitionId);
                    continue;
                }
                catch (Throwable t) {
                    MapCombineTask.this.handleProcessorThrowable(t);
                    continue;
                }
                break;
            }
            return;
        }

        private Integer findNewPartitionProcessing() {
            try {
                RequestPartitionResult result = (RequestPartitionResult)MapCombineTask.this.mapReduceService.processRequest(MapCombineTask.this.supervisor.getJobOwner(), new RequestPartitionMapping(MapCombineTask.this.name, MapCombineTask.this.jobId));
                if (result.getResultState() == RequestPartitionResult.ResultState.NO_SUPERVISOR) {
                    return null;
                }
                if (result.getResultState() == RequestPartitionResult.ResultState.CHECK_STATE_FAILED) {
                    return -1;
                }
                if (result.getResultState() == RequestPartitionResult.ResultState.NO_MORE_PARTITIONS) {
                    return null;
                }
                return result.getPartitionId();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

