/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.scheduledexecutor.impl;

import com.hazelcast.config.MergePolicyConfig;
import com.hazelcast.config.ScheduledExecutorConfig;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.MembershipAdapter;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.internal.cluster.Versions;
import com.hazelcast.internal.config.ConfigValidator;
import com.hazelcast.logging.ILogger;
import com.hazelcast.partition.PartitionLostEvent;
import com.hazelcast.partition.PartitionLostListener;
import com.hazelcast.scheduledexecutor.impl.ScheduledExecutorContainerHolder;
import com.hazelcast.scheduledexecutor.impl.ScheduledExecutorMemberBin;
import com.hazelcast.scheduledexecutor.impl.ScheduledExecutorPartition;
import com.hazelcast.scheduledexecutor.impl.ScheduledExecutorServiceProxy;
import com.hazelcast.scheduledexecutor.impl.ScheduledFutureProxy;
import com.hazelcast.scheduledexecutor.impl.ScheduledTaskDescriptor;
import com.hazelcast.scheduledexecutor.impl.operations.MergeOperation;
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.MigrationAwareService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.PartitionMigrationEvent;
import com.hazelcast.spi.PartitionReplicationEvent;
import com.hazelcast.spi.QuorumAwareService;
import com.hazelcast.spi.RemoteService;
import com.hazelcast.spi.SplitBrainHandlerService;
import com.hazelcast.spi.SplitBrainMergePolicy;
import com.hazelcast.spi.impl.executionservice.InternalExecutionService;
import com.hazelcast.spi.impl.merge.MergingHolders;
import com.hazelcast.spi.merge.MergingEntryHolder;
import com.hazelcast.spi.merge.SplitBrainMergePolicyProvider;
import com.hazelcast.spi.partition.MigrationEndpoint;
import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.ContextMutexFactory;
import com.hazelcast.util.ExceptionUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class DistributedScheduledExecutorService
implements ManagedService,
RemoteService,
MigrationAwareService,
QuorumAwareService,
SplitBrainHandlerService {
    public static final String SERVICE_NAME = "hz:impl:scheduledExecutorService";
    public static final int MEMBER_BIN = -1;
    private static final Object NULL_OBJECT = new Object();
    private NodeEngine nodeEngine;
    private ScheduledExecutorPartition[] partitions;
    private ScheduledExecutorMemberBin memberBin;
    private SplitBrainMergePolicyProvider mergePolicyProvider;
    private final ConcurrentMap<String, Boolean> shutdownExecutors = new ConcurrentHashMap<String, Boolean>();
    private final Set<ScheduledFutureProxy> lossListeners = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap()));
    private final AtomicBoolean migrationMode = new AtomicBoolean();
    private String partitionLostRegistration;
    private final ConcurrentMap<String, Object> quorumConfigCache = new ConcurrentHashMap<String, Object>();
    private final ContextMutexFactory quorumConfigCacheMutexFactory = new ContextMutexFactory();
    private final ConstructorFunction<String, Object> quorumConfigConstructor = new ConstructorFunction<String, Object>(){

        @Override
        public Object createNew(String name) {
            ScheduledExecutorConfig executorConfig = DistributedScheduledExecutorService.this.nodeEngine.getConfig().findScheduledExecutorConfig(name);
            String quorumName = executorConfig.getQuorumName();
            return quorumName == null ? NULL_OBJECT : quorumName;
        }
    };
    private String membershipListenerRegistration;

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        int partitionCount = nodeEngine.getPartitionService().getPartitionCount();
        this.nodeEngine = nodeEngine;
        this.partitions = new ScheduledExecutorPartition[partitionCount];
        this.mergePolicyProvider = nodeEngine.getSplitBrainMergePolicyProvider();
        this.reset();
    }

    public ScheduledExecutorPartition getPartition(int partitionId) {
        return this.partitions[partitionId];
    }

    public ScheduledExecutorContainerHolder getPartitionOrMemberBin(int id) {
        if (id == -1) {
            return this.memberBin;
        }
        return this.getPartition(id);
    }

    public NodeEngine getNodeEngine() {
        return this.nodeEngine;
    }

    @Override
    public void reset() {
        this.shutdown(true);
        this.memberBin = new ScheduledExecutorMemberBin(this.nodeEngine);
        if (this.partitionLostRegistration == null) {
            this.registerPartitionListener();
        }
        if (this.membershipListenerRegistration == null) {
            this.registerMembershipListener();
        }
        for (int partitionId = 0; partitionId < this.partitions.length; ++partitionId) {
            if (this.partitions[partitionId] != null) {
                this.partitions[partitionId].destroy();
            }
            this.partitions[partitionId] = new ScheduledExecutorPartition(this.nodeEngine, partitionId, this.mergePolicyProvider);
        }
    }

    @Override
    public void shutdown(boolean terminate) {
        this.shutdownExecutors.clear();
        if (this.memberBin != null) {
            this.memberBin.destroy();
        }
        this.lossListeners.clear();
        this.unRegisterPartitionListenerIfExists();
        this.unRegisterMembershipListenerIfExists();
        for (int partitionId = 0; partitionId < this.partitions.length; ++partitionId) {
            if (this.partitions[partitionId] == null) continue;
            this.partitions[partitionId].destroy();
        }
    }

    void addLossListener(ScheduledFutureProxy future) {
        this.lossListeners.add(future);
    }

    @Override
    public DistributedObject createDistributedObject(String name) {
        ScheduledExecutorConfig executorConfig = this.nodeEngine.getConfig().findScheduledExecutorConfig(name);
        ConfigValidator.checkScheduledExecutorConfig(executorConfig);
        return new ScheduledExecutorServiceProxy(name, this.nodeEngine, this);
    }

    @Override
    public void destroyDistributedObject(String name) {
        if (this.shutdownExecutors.remove(name) == null) {
            ((InternalExecutionService)this.nodeEngine.getExecutionService()).shutdownScheduledDurableExecutor(name);
        }
        this.resetPartitionOrMemberBinContainer(name);
        this.quorumConfigCache.remove(name);
    }

    public void shutdownExecutor(String name) {
        if (this.shutdownExecutors.putIfAbsent(name, Boolean.TRUE) == null) {
            ((InternalExecutionService)this.nodeEngine.getExecutionService()).shutdownScheduledDurableExecutor(name);
        }
    }

    public boolean isShutdown(String name) {
        return this.shutdownExecutors.containsKey(name);
    }

    @Override
    public Operation prepareReplicationOperation(PartitionReplicationEvent event) {
        int partitionId = event.getPartitionId();
        ScheduledExecutorPartition partition = this.partitions[partitionId];
        return partition.prepareReplicationOperation(event.getReplicaIndex(), this.migrationMode.get());
    }

    @Override
    public Runnable prepareMergeRunnable() {
        HashMap<Integer, Map<String, Collection<ScheduledTaskDescriptor>>> state = new HashMap<Integer, Map<String, Collection<ScheduledTaskDescriptor>>>();
        for (int partition = 0; partition < this.partitions.length; ++partition) {
            Map<String, Collection<ScheduledTaskDescriptor>> partitionSnapshot = this.partitions[partition].prepareOwnedSnapshot();
            if (partitionSnapshot.isEmpty()) continue;
            state.put(partition, partitionSnapshot);
        }
        return new Merger(state);
    }

    @Override
    public void beforeMigration(PartitionMigrationEvent event) {
        this.migrationMode.compareAndSet(false, true);
    }

    @Override
    public void commitMigration(PartitionMigrationEvent event) {
        int partitionId = event.getPartitionId();
        if (event.getMigrationEndpoint() == MigrationEndpoint.SOURCE) {
            this.discardReserved(partitionId, event.getNewReplicaIndex());
        } else if (event.getNewReplicaIndex() == 0) {
            ScheduledExecutorPartition partition = this.partitions[partitionId];
            partition.promoteSuspended();
        }
        this.migrationMode.set(false);
    }

    @Override
    public void rollbackMigration(PartitionMigrationEvent event) {
        int partitionId = event.getPartitionId();
        if (event.getMigrationEndpoint() == MigrationEndpoint.DESTINATION) {
            this.discardReserved(event.getPartitionId(), event.getCurrentReplicaIndex());
        } else if (event.getCurrentReplicaIndex() == 0) {
            ScheduledExecutorPartition partition = this.partitions[partitionId];
            partition.promoteSuspended();
        }
        this.migrationMode.set(false);
    }

    private void discardReserved(int partitionId, int thresholdReplicaIndex) {
        ScheduledExecutorPartition partition = this.partitions[partitionId];
        partition.disposeObsoleteReplicas(thresholdReplicaIndex);
    }

    private void resetPartitionOrMemberBinContainer(String name) {
        if (this.memberBin != null) {
            this.memberBin.destroyContainer(name);
        }
        for (ScheduledExecutorPartition partition : this.partitions) {
            partition.destroyContainer(name);
        }
    }

    private void registerPartitionListener() {
        this.partitionLostRegistration = this.getNodeEngine().getPartitionService().addPartitionLostListener(new PartitionLostListener(){

            @Override
            public void partitionLost(PartitionLostEvent event) {
                ScheduledFutureProxy[] futures;
                for (ScheduledFutureProxy future : futures = DistributedScheduledExecutorService.this.lossListeners.toArray(new ScheduledFutureProxy[DistributedScheduledExecutorService.this.lossListeners.size()])) {
                    future.notifyPartitionLost(event);
                }
            }
        });
    }

    private void unRegisterPartitionListenerIfExists() {
        block3: {
            if (this.partitionLostRegistration == null) {
                return;
            }
            try {
                this.getNodeEngine().getPartitionService().removePartitionLostListener(this.partitionLostRegistration);
            }
            catch (Exception ex) {
                if (!(ExceptionUtil.peel(ex, HazelcastInstanceNotActiveException.class, null) instanceof HazelcastInstanceNotActiveException)) break block3;
                throw ExceptionUtil.rethrow(ex);
            }
        }
        this.partitionLostRegistration = null;
    }

    private void registerMembershipListener() {
        this.membershipListenerRegistration = this.getNodeEngine().getClusterService().addMembershipListener(new MembershipAdapter(){

            @Override
            public void memberRemoved(MembershipEvent event) {
                ScheduledFutureProxy[] futures;
                for (ScheduledFutureProxy future : futures = DistributedScheduledExecutorService.this.lossListeners.toArray(new ScheduledFutureProxy[DistributedScheduledExecutorService.this.lossListeners.size()])) {
                    future.notifyMemberLost(event);
                }
            }
        });
    }

    private void unRegisterMembershipListenerIfExists() {
        block3: {
            if (this.membershipListenerRegistration == null) {
                return;
            }
            try {
                this.getNodeEngine().getClusterService().removeMembershipListener(this.membershipListenerRegistration);
            }
            catch (Exception ex) {
                if (!(ExceptionUtil.peel(ex, HazelcastInstanceNotActiveException.class, null) instanceof HazelcastInstanceNotActiveException)) break block3;
                throw ExceptionUtil.rethrow(ex);
            }
        }
        this.membershipListenerRegistration = null;
    }

    @Override
    public String getQuorumName(String name) {
        if (this.nodeEngine.getClusterService().getClusterVersion().isLessThan(Versions.V3_10)) {
            return null;
        }
        Object quorumName = ConcurrencyUtil.getOrPutSynchronized(this.quorumConfigCache, name, this.quorumConfigCacheMutexFactory, this.quorumConfigConstructor);
        return quorumName == NULL_OBJECT ? null : (String)quorumName;
    }

    private MergePolicyConfig getMergePolicyConfig(String name) {
        return this.getNodeEngine().getConfig().getScheduledExecutorConfig(name).getMergePolicyConfig();
    }

    private SplitBrainMergePolicy getMergePolicy(String name) {
        return this.mergePolicyProvider.getMergePolicy(this.getMergePolicyConfig(name).getPolicy());
    }

    private class Merger
    implements Runnable {
        private static final long TIMEOUT_FACTOR = 500L;
        private final ILogger logger;
        private final Semaphore semaphore;
        private final ExecutionCallback<Object> mergeCallback;
        private Map<Integer, Map<String, Collection<ScheduledTaskDescriptor>>> partitionsSnapshot;

        Merger(Map<Integer, Map<String, Collection<ScheduledTaskDescriptor>>> map) {
            this.logger = DistributedScheduledExecutorService.this.nodeEngine.getLogger(DistributedScheduledExecutorService.class);
            this.semaphore = new Semaphore(0);
            this.mergeCallback = new ExecutionCallback<Object>(){

                @Override
                public void onResponse(Object response) {
                    Merger.this.semaphore.release(1);
                }

                @Override
                public void onFailure(Throwable t) {
                    Merger.this.logger.warning("Error while running scheduled executor merge operation: " + t.getMessage());
                    Merger.this.semaphore.release(1);
                }
            };
            this.partitionsSnapshot = map;
        }

        @Override
        public void run() {
            if (DistributedScheduledExecutorService.this.nodeEngine.getClusterService().getClusterVersion().isLessThan(Versions.V3_10)) {
                this.logger.info("Cluster needs to run version " + Versions.V3_10 + " to merge scheduled executor instances");
                return;
            }
            int size = 0;
            int operationCount = 0;
            try {
                for (Map.Entry<Integer, Map<String, Collection<ScheduledTaskDescriptor>>> partition : this.partitionsSnapshot.entrySet()) {
                    int partitionId = partition.getKey();
                    Map<String, Collection<ScheduledTaskDescriptor>> containers = partition.getValue();
                    for (Map.Entry<String, Collection<ScheduledTaskDescriptor>> container : containers.entrySet()) {
                        String containerName = container.getKey();
                        Collection<ScheduledTaskDescriptor> tasks = container.getValue();
                        int batchSize = DistributedScheduledExecutorService.this.getMergePolicyConfig(containerName).getBatchSize();
                        SplitBrainMergePolicy mergePolicy = DistributedScheduledExecutorService.this.getMergePolicy(containerName);
                        ArrayList<MergingEntryHolder<String, ScheduledTaskDescriptor>> mergingEntries = new ArrayList<MergingEntryHolder<String, ScheduledTaskDescriptor>>();
                        for (ScheduledTaskDescriptor descriptor : tasks) {
                            MergingEntryHolder<String, ScheduledTaskDescriptor> mergingEntry = MergingHolders.createMergeHolder(descriptor);
                            mergingEntries.add(mergingEntry);
                            ++size;
                            if (mergingEntries.size() != batchSize) continue;
                            this.sendBatch(partitionId, containerName, mergePolicy, mergingEntries, this.mergeCallback);
                            mergingEntries = new ArrayList(batchSize);
                            ++operationCount;
                        }
                        tasks.clear();
                        if (mergingEntries.isEmpty()) continue;
                        this.sendBatch(partitionId, containerName, mergePolicy, mergingEntries, this.mergeCallback);
                        ++operationCount;
                    }
                }
                this.partitionsSnapshot.clear();
            }
            catch (Exception e) {
                this.logger.warning("Split-brain healing of scheduled executors didn't complete successfully...", e);
                throw ExceptionUtil.rethrow(e);
            }
            try {
                if (!this.semaphore.tryAcquire(operationCount, (long)size * 500L, TimeUnit.MILLISECONDS)) {
                    this.logger.warning("Split-brain healing for scheduled executors didn't finish within the timeout...");
                }
            }
            catch (InterruptedException e) {
                this.logger.finest("Interrupted while waiting for split-brain healing of scheduled executors...");
                Thread.currentThread().interrupt();
            }
        }

        private void sendBatch(int partitionId, String name, SplitBrainMergePolicy mergePolicy, List<MergingEntryHolder<String, ScheduledTaskDescriptor>> mergingEntries, ExecutionCallback<Object> mergeCallback) {
            MergeOperation operation = new MergeOperation(name, mergePolicy, mergingEntries);
            try {
                DistributedScheduledExecutorService.this.nodeEngine.getOperationService().invokeOnPartition(DistributedScheduledExecutorService.SERVICE_NAME, operation, partitionId).andThen(mergeCallback);
            }
            catch (Throwable t) {
                throw ExceptionUtil.rethrow(t);
            }
        }
    }
}

