/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.replicatedmap.impl;

import com.hazelcast.config.MergePolicyConfig;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.internal.cluster.Versions;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.replicatedmap.impl.PartitionContainer;
import com.hazelcast.replicatedmap.impl.ReplicatedMapService;
import com.hazelcast.replicatedmap.impl.operation.LegacyMergeOperation;
import com.hazelcast.replicatedmap.impl.operation.MergeOperationFactory;
import com.hazelcast.replicatedmap.impl.record.ReplicatedMapEntryView;
import com.hazelcast.replicatedmap.impl.record.ReplicatedRecord;
import com.hazelcast.replicatedmap.impl.record.ReplicatedRecordStore;
import com.hazelcast.replicatedmap.merge.MergePolicyProvider;
import com.hazelcast.replicatedmap.merge.ReplicatedMapMergePolicy;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.SplitBrainHandlerService;
import com.hazelcast.spi.SplitBrainMergePolicy;
import com.hazelcast.spi.impl.merge.MergingHolders;
import com.hazelcast.spi.merge.DiscardMergePolicy;
import com.hazelcast.spi.merge.MergingEntryHolder;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.MutableLong;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

class ReplicatedMapSplitBrainHandlerService
implements SplitBrainHandlerService {
    private final ReplicatedMapService service;
    private final MergePolicyProvider mergePolicyProvider;
    private final NodeEngine nodeEngine;
    private final IPartitionService partitionService;
    private final SerializationService serializationService;

    ReplicatedMapSplitBrainHandlerService(ReplicatedMapService service, MergePolicyProvider mergePolicyProvider) {
        this.service = service;
        this.mergePolicyProvider = mergePolicyProvider;
        this.nodeEngine = service.getNodeEngine();
        this.partitionService = this.nodeEngine.getPartitionService();
        this.serializationService = this.nodeEngine.getSerializationService();
    }

    @Override
    public Runnable prepareMergeRunnable() {
        HashMap<String, Collection<ReplicatedRecord>> recordMap = new HashMap<String, Collection<ReplicatedRecord>>();
        for (Integer partition : this.partitionService.getMemberPartitions(this.nodeEngine.getThisAddress())) {
            PartitionContainer partitionContainer = this.service.getPartitionContainer(partition);
            ConcurrentMap<String, ReplicatedRecordStore> stores = partitionContainer.getStores();
            for (ReplicatedRecordStore store : stores.values()) {
                String name = store.getName();
                Object mergePolicy = this.getMergePolicy(this.service.getReplicatedMapConfig(name).getMergePolicyConfig());
                if (mergePolicy instanceof DiscardMergePolicy) continue;
                Collection<ReplicatedRecord> records = recordMap.get(name);
                if (records == null) {
                    records = new ArrayList<ReplicatedRecord>();
                }
                Iterator<ReplicatedRecord> iterator = store.recordIterator();
                while (iterator.hasNext()) {
                    ReplicatedRecord record = iterator.next();
                    records.add(record);
                }
                recordMap.put(name, records);
                store.reset();
            }
        }
        return new Merger(recordMap);
    }

    private Object getMergePolicy(MergePolicyConfig mergePolicyConfig) {
        return this.mergePolicyProvider.getMergePolicy(mergePolicyConfig.getPolicy());
    }

    private class Merger
    implements Runnable {
        private static final long TIMEOUT_FACTOR = 500L;
        private final ILogger logger;
        private final Semaphore semaphore;
        private final Map<String, Collection<ReplicatedRecord>> recordMap;

        Merger(Map<String, Collection<ReplicatedRecord>> recordMap) {
            this.logger = ReplicatedMapSplitBrainHandlerService.this.nodeEngine.getLogger(ReplicatedMapSplitBrainHandlerService.class);
            this.semaphore = new Semaphore(0);
            this.recordMap = recordMap;
        }

        @Override
        public void run() {
            int recordCount = 0;
            for (Map.Entry<String, Collection<ReplicatedRecord>> entry : this.recordMap.entrySet()) {
                String name = entry.getKey();
                MergePolicyConfig mergePolicyConfig = ReplicatedMapSplitBrainHandlerService.this.service.getReplicatedMapConfig(name).getMergePolicyConfig();
                Object mergePolicy = ReplicatedMapSplitBrainHandlerService.this.getMergePolicy(mergePolicyConfig);
                if (mergePolicy instanceof SplitBrainMergePolicy) {
                    if (ReplicatedMapSplitBrainHandlerService.this.nodeEngine.getClusterService().getClusterVersion().isLessThan(Versions.V3_10)) {
                        this.logger.info("Cannot merge replicated map '" + name + "' with merge policy '" + mergePolicyConfig.getPolicy() + "' until cluster is running version " + Versions.V3_10);
                        continue;
                    }
                    int batchSize = mergePolicyConfig.getBatchSize();
                    recordCount += this.handleMerge(name, entry.getValue(), (SplitBrainMergePolicy)mergePolicy, batchSize);
                    continue;
                }
                recordCount += this.handleMerge(name, entry.getValue(), (ReplicatedMapMergePolicy)mergePolicy);
            }
            try {
                if (!this.semaphore.tryAcquire(recordCount, (long)recordCount * 500L, TimeUnit.MILLISECONDS)) {
                    this.logger.warning("Split-brain healing for replicated maps didn't finish within the timeout...");
                }
            }
            catch (InterruptedException e) {
                this.logger.finest("Interrupted while waiting for split-brain healing of replicated maps...");
                Thread.currentThread().interrupt();
            }
        }

        private int handleMerge(String name, Collection<ReplicatedRecord> recordList, SplitBrainMergePolicy mergePolicy, int batchSize) {
            int partitionCount = ReplicatedMapSplitBrainHandlerService.this.partitionService.getPartitionCount();
            Map<Address, List<Integer>> memberPartitionsMap = ReplicatedMapSplitBrainHandlerService.this.partitionService.getMemberPartitionsMap();
            MutableLong[] counterPerMember = new MutableLong[partitionCount];
            Address[] addresses = new Address[partitionCount];
            for (Map.Entry<Address, List<Integer>> addressListEntry : memberPartitionsMap.entrySet()) {
                MutableLong counter = new MutableLong();
                Address address = addressListEntry.getKey();
                for (int partitionId : addressListEntry.getValue()) {
                    counterPerMember[partitionId] = counter;
                    addresses[partitionId] = address;
                }
            }
            List[] entriesPerPartition = new List[partitionCount];
            int recordCount = 0;
            for (ReplicatedRecord replicatedRecord : recordList) {
                long currentSize;
                ++recordCount;
                int partitionId = ReplicatedMapSplitBrainHandlerService.this.partitionService.getPartitionId(replicatedRecord.getKeyInternal());
                LinkedList<MergingEntryHolder<Object, Object>> entries = entriesPerPartition[partitionId];
                if (entries == null) {
                    entriesPerPartition[partitionId] = entries = new LinkedList<MergingEntryHolder<Object, Object>>();
                }
                MergingEntryHolder<Object, Object> mergingEntry = MergingHolders.createMergeHolder(replicatedRecord);
                entries.add(mergingEntry);
                if ((currentSize = ++counterPerMember[partitionId].value) % (long)batchSize != 0L) continue;
                List<Integer> partitions = memberPartitionsMap.get(addresses[partitionId]);
                this.sendBatch(name, partitions, entriesPerPartition, mergePolicy);
            }
            for (Map.Entry entry : memberPartitionsMap.entrySet()) {
                this.sendBatch(name, (List)entry.getValue(), entriesPerPartition, mergePolicy);
            }
            return recordCount;
        }

        private void sendBatch(String name, List<Integer> memberPartitions, List<MergingEntryHolder<Object, Object>>[] entriesPerPartition, SplitBrainMergePolicy mergePolicy) {
            int size = memberPartitions.size();
            int[] partitions = new int[size];
            int index = 0;
            for (Integer partitionId : memberPartitions) {
                if (entriesPerPartition[partitionId] == null) continue;
                partitions[index++] = partitionId;
            }
            if (index == 0) {
                return;
            }
            if (index < size) {
                partitions = Arrays.copyOf(partitions, index);
                size = index;
            }
            List[] entries = new List[size];
            index = 0;
            int totalSize = 0;
            for (int partitionId : partitions) {
                int batchSize = entriesPerPartition[partitionId].size();
                entries[index++] = entriesPerPartition[partitionId];
                totalSize += batchSize;
                entriesPerPartition[partitionId] = null;
            }
            if (totalSize == 0) {
                return;
            }
            this.invokeMergeOperationFactory(name, mergePolicy, partitions, entries, totalSize);
        }

        private void invokeMergeOperationFactory(String name, SplitBrainMergePolicy mergePolicy, int[] partitions, List<MergingEntryHolder<Object, Object>>[] entries, int totalSize) {
            try {
                MergeOperationFactory factory = new MergeOperationFactory(name, partitions, entries, mergePolicy);
                ReplicatedMapSplitBrainHandlerService.this.nodeEngine.getOperationService().invokeOnPartitions("hz:impl:replicatedMapService", (OperationFactory)factory, partitions);
            }
            catch (Throwable t) {
                this.logger.warning("Error while running replicated map merge operation: " + t.getMessage());
                throw ExceptionUtil.rethrow(t);
            }
            finally {
                this.semaphore.release(totalSize);
            }
        }

        private int handleMerge(String name, Collection<ReplicatedRecord> recordList, ReplicatedMapMergePolicy mergePolicy) {
            ExecutionCallback<Object> mergeCallback = new ExecutionCallback<Object>(){

                @Override
                public void onResponse(Object response) {
                    Merger.this.semaphore.release(1);
                }

                @Override
                public void onFailure(Throwable t) {
                    Merger.this.logger.warning("Error while running replicated map merge operation: " + t.getMessage());
                    Merger.this.semaphore.release(1);
                }
            };
            int recordCount = 0;
            for (ReplicatedRecord record : recordList) {
                ++recordCount;
                ReplicatedMapEntryView entryView = this.createEntryView(record);
                LegacyMergeOperation operation = new LegacyMergeOperation(name, record.getKeyInternal(), entryView, mergePolicy);
                try {
                    int partitionId = ReplicatedMapSplitBrainHandlerService.this.partitionService.getPartitionId(record.getKeyInternal());
                    ReplicatedMapSplitBrainHandlerService.this.nodeEngine.getOperationService().invokeOnPartition("hz:impl:replicatedMapService", operation, partitionId).andThen(mergeCallback);
                }
                catch (Throwable t) {
                    throw ExceptionUtil.rethrow(t);
                }
            }
            return recordCount;
        }

        private ReplicatedMapEntryView createEntryView(ReplicatedRecord record) {
            return new ReplicatedMapEntryView().setKey(ReplicatedMapSplitBrainHandlerService.this.serializationService.toObject(record.getKeyInternal())).setValue(ReplicatedMapSplitBrainHandlerService.this.serializationService.toObject(record.getValueInternal())).setHits(record.getHits()).setTtl(record.getTtlMillis()).setLastAccessTime(record.getLastAccessTime()).setCreationTime(record.getCreationTime()).setLastUpdateTime(record.getUpdateTime());
        }
    }
}

