/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.cache.impl;

import com.hazelcast.cache.CacheMergePolicy;
import com.hazelcast.cache.impl.CacheOperationProvider;
import com.hazelcast.cache.impl.CachePartitionSegment;
import com.hazelcast.cache.impl.CacheService;
import com.hazelcast.cache.impl.ICacheRecordStore;
import com.hazelcast.cache.impl.merge.entry.DefaultCacheEntryView;
import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.cache.impl.operation.CacheLegacyMergeOperation;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.internal.cluster.Versions;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Disposable;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.SplitBrainHandlerService;
import com.hazelcast.spi.SplitBrainMergePolicy;
import com.hazelcast.spi.impl.merge.MergingHolders;
import com.hazelcast.spi.merge.MergingEntryHolder;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.MapUtil;
import com.hazelcast.util.MutableLong;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

class CacheSplitBrainHandlerService
implements SplitBrainHandlerService {
    protected static final long TIMEOUT_FACTOR = 500L;
    protected final int partitionCount;
    protected final ILogger logger;
    protected final NodeEngine nodeEngine;
    protected final CacheService cacheService;
    protected final Map<String, CacheConfig> configs;
    protected final CachePartitionSegment[] segments;
    protected final OperationService operationService;
    protected final IPartitionService partitionService;
    protected final SerializationService serializationService;
    protected final CacheMergePolicyProvider mergePolicyProvider;

    CacheSplitBrainHandlerService(NodeEngine nodeEngine, Map<String, CacheConfig> configs, CachePartitionSegment[] segments) {
        this.nodeEngine = nodeEngine;
        this.configs = configs;
        this.segments = segments;
        this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);
        this.partitionService = nodeEngine.getPartitionService();
        this.partitionCount = this.partitionService.getPartitionCount();
        this.cacheService = (CacheService)nodeEngine.getService("hz:impl:cacheService");
        this.serializationService = nodeEngine.getSerializationService();
        this.operationService = nodeEngine.getOperationService();
        this.logger = nodeEngine.getLogger(this.getClass());
    }

    @Override
    public Runnable prepareMergeRunnable() {
        Map<String, Map<Data, CacheRecord>> recordMap = MapUtil.createHashMap(this.configs.size());
        for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
            if (!this.partitionService.isPartitionOwner(partitionId)) continue;
            CachePartitionSegment segment = this.segments[partitionId];
            List<Iterator<ICacheRecordStore>> iterators = this.iteratorsOf(segment);
            for (Iterator<ICacheRecordStore> iterator : iterators) {
                while (iterator.hasNext()) {
                    ICacheRecordStore cacheRecordStore = iterator.next();
                    String cacheName = cacheRecordStore.getName();
                    if (cacheRecordStore.getConfig().getInMemoryFormat() == InMemoryFormat.NATIVE && this.nodeEngine.getClusterService().getClusterVersion().isLessThan(Versions.V3_10)) {
                        this.logger.warning("Split-brain recovery can not be applied NATIVE in-memory-formatted cache [" + cacheName + ']');
                        continue;
                    }
                    Map<Data, CacheRecord<CacheRecord>> records = recordMap.get(cacheName);
                    if (records == null) {
                        records = MapUtil.createHashMap(cacheRecordStore.size());
                        recordMap.put(cacheName, records);
                    }
                    for (Map.Entry<Data, CacheRecord> cacheRecordEntry : cacheRecordStore.getReadOnlyRecords().entrySet()) {
                        Data key = cacheRecordEntry.getKey();
                        CacheRecord cacheRecord = cacheRecordEntry.getValue();
                        records.put(key, cacheRecord);
                    }
                }
            }
        }
        this.invalidateNearCaches(recordMap);
        return new CacheMerger(recordMap);
    }

    protected List<Iterator<ICacheRecordStore>> iteratorsOf(CachePartitionSegment segment) {
        return Collections.singletonList(segment.recordStoreIterator());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void destroySegment(CachePartitionSegment segment) {
        Collection recordStores = segment.recordStores.values();
        Iterator iterator = recordStores.iterator();
        while (iterator.hasNext()) {
            try {
                ICacheRecordStore recordStore = (ICacheRecordStore)iterator.next();
                recordStore.destroy();
            }
            finally {
                iterator.remove();
            }
        }
    }

    private void invalidateNearCaches(Map<String, Map<Data, CacheRecord>> recordMap) {
        for (String cacheName : recordMap.keySet()) {
            this.cacheService.sendInvalidationEvent(cacheName, null, "<NA>");
        }
    }

    private Object getCacheMergePolicy(String cacheName) {
        CacheConfig cacheConfig = this.configs.get(cacheName);
        String mergePolicyName = cacheConfig.getMergePolicy();
        return this.mergePolicyProvider.getMergePolicy(mergePolicyName);
    }

    private class CacheMerger
    implements Runnable,
    Disposable {
        private final Semaphore semaphore = new Semaphore(0);
        private final ILogger logger;
        private final Map<String, Map<Data, CacheRecord>> recordMap;

        CacheMerger(Map<String, Map<Data, CacheRecord>> recordMap) {
            this.logger = CacheSplitBrainHandlerService.this.nodeEngine.getLogger(CacheService.class);
            this.recordMap = recordMap;
        }

        @Override
        public void run() {
            int recordCount = 0;
            for (Map.Entry<String, Map<Data, CacheRecord>> recordMapEntry : this.recordMap.entrySet()) {
                String cacheName = recordMapEntry.getKey();
                Map<Data, CacheRecord> records = recordMapEntry.getValue();
                Object mergePolicy = CacheSplitBrainHandlerService.this.getCacheMergePolicy(cacheName);
                if (mergePolicy instanceof SplitBrainMergePolicy) {
                    if (CacheSplitBrainHandlerService.this.nodeEngine.getClusterService().getClusterVersion().isLessThan(Versions.V3_10)) {
                        this.logger.info("Cannot merge cache '" + cacheName + "' with merge policy '" + mergePolicy.getClass().getName() + "' until cluster is running version " + Versions.V3_10);
                        continue;
                    }
                    recordCount += this.handleMerge(cacheName, records, (SplitBrainMergePolicy)mergePolicy, 100);
                    continue;
                }
                recordCount += this.handleMerge(cacheName, records, (CacheMergePolicy)mergePolicy);
            }
            this.recordMap.clear();
            try {
                if (!this.semaphore.tryAcquire(recordCount, (long)recordCount * 500L, TimeUnit.MILLISECONDS)) {
                    this.logger.warning("Split-brain healing for caches didn't finish within the timeout...");
                }
            }
            catch (InterruptedException e) {
                this.logger.finest("Interrupted while waiting for split-brain healing of caches...");
                Thread.currentThread().interrupt();
            }
        }

        private int handleMerge(String name, Map<Data, CacheRecord> recordMap, SplitBrainMergePolicy mergePolicy, int batchSize) {
            Map<Address, List<Integer>> memberPartitionsMap = CacheSplitBrainHandlerService.this.partitionService.getMemberPartitionsMap();
            MutableLong[] counterPerMember = new MutableLong[CacheSplitBrainHandlerService.this.partitionCount];
            Address[] addresses = new Address[CacheSplitBrainHandlerService.this.partitionCount];
            for (Map.Entry<Address, List<Integer>> addressListEntry : memberPartitionsMap.entrySet()) {
                MutableLong counter = new MutableLong();
                Address address = addressListEntry.getKey();
                for (int partitionId : addressListEntry.getValue()) {
                    counterPerMember[partitionId] = counter;
                    addresses[partitionId] = address;
                }
            }
            List[] entriesPerPartition = new List[CacheSplitBrainHandlerService.this.partitionCount];
            int recordCount = 0;
            for (Map.Entry<Data, CacheRecord> entry : recordMap.entrySet()) {
                long currentSize;
                ++recordCount;
                Data key = entry.getKey();
                CacheRecord record = entry.getValue();
                int partitionId = CacheSplitBrainHandlerService.this.partitionService.getPartitionId(key);
                LinkedList<MergingEntryHolder<Data, Data>> entries = entriesPerPartition[partitionId];
                if (entries == null) {
                    entriesPerPartition[partitionId] = entries = new LinkedList<MergingEntryHolder<Data, Data>>();
                }
                Object dataValue = CacheSplitBrainHandlerService.this.serializationService.toData(record.getValue());
                MergingEntryHolder<Data, Data> mergingEntry = MergingHolders.createMergeHolder(key, dataValue, record);
                entries.add(mergingEntry);
                if ((currentSize = ++counterPerMember[partitionId].value) % (long)batchSize != 0L) continue;
                List<Integer> partitions = memberPartitionsMap.get(addresses[partitionId]);
                this.sendBatch(name, partitions, entriesPerPartition, mergePolicy);
            }
            for (Map.Entry<Object, Object> entry : memberPartitionsMap.entrySet()) {
                this.sendBatch(name, (List)entry.getValue(), entriesPerPartition, mergePolicy);
            }
            return recordCount;
        }

        private int handleMerge(String cacheName, Map<Data, CacheRecord> recordMap, CacheMergePolicy mergePolicy) {
            ExecutionCallback<Object> mergeCallback = new ExecutionCallback<Object>(){

                @Override
                public void onResponse(Object response) {
                    CacheMerger.this.semaphore.release(1);
                }

                @Override
                public void onFailure(Throwable t) {
                    CacheMerger.this.logger.warning("Error while running cache merge operation: " + t.getMessage());
                    CacheMerger.this.semaphore.release(1);
                }
            };
            int recordCount = 0;
            for (Map.Entry<Data, CacheRecord> entry : recordMap.entrySet()) {
                ++recordCount;
                Data key = entry.getKey();
                CacheRecord record = entry.getValue();
                DefaultCacheEntryView entryView = new DefaultCacheEntryView(key, (Data)CacheSplitBrainHandlerService.this.serializationService.toData(record.getValue()), record.getCreationTime(), record.getExpirationTime(), record.getLastAccessTime(), record.getAccessHit());
                CacheLegacyMergeOperation operation = new CacheLegacyMergeOperation(cacheName, key, entryView, mergePolicy);
                try {
                    int partitionId = CacheSplitBrainHandlerService.this.partitionService.getPartitionId(key);
                    CacheSplitBrainHandlerService.this.operationService.invokeOnPartition("hz:impl:cacheService", operation, partitionId).andThen(mergeCallback);
                }
                catch (Throwable t) {
                    throw ExceptionUtil.rethrow(t);
                }
            }
            return recordCount;
        }

        private void sendBatch(String name, List<Integer> memberPartitions, List<MergingEntryHolder<Data, Data>>[] entriesPerPartition, SplitBrainMergePolicy mergePolicy) {
            int size = memberPartitions.size();
            int[] partitions = new int[size];
            int index = 0;
            for (Integer partitionId : memberPartitions) {
                if (entriesPerPartition[partitionId] == null) continue;
                partitions[index++] = partitionId;
            }
            if (index == 0) {
                return;
            }
            if (index < size) {
                partitions = Arrays.copyOf(partitions, index);
                size = index;
            }
            List[] entries = new List[size];
            index = 0;
            int totalSize = 0;
            for (int partitionId : partitions) {
                int batchSize = entriesPerPartition[partitionId].size();
                entries[index++] = entriesPerPartition[partitionId];
                totalSize += batchSize;
                entriesPerPartition[partitionId] = null;
            }
            if (totalSize == 0) {
                return;
            }
            this.invokeMergeOperationFactory(name, mergePolicy, partitions, entries, totalSize);
        }

        private void invokeMergeOperationFactory(String name, SplitBrainMergePolicy mergePolicy, int[] partitions, List<MergingEntryHolder<Data, Data>>[] entries, int totalSize) {
            try {
                CacheConfig cacheConfig = CacheSplitBrainHandlerService.this.cacheService.getCacheConfig(name);
                CacheOperationProvider operationProvider = CacheSplitBrainHandlerService.this.cacheService.getCacheOperationProvider(name, cacheConfig.getInMemoryFormat());
                OperationFactory factory = operationProvider.createMergeOperationFactory(name, partitions, entries, mergePolicy);
                CacheSplitBrainHandlerService.this.operationService.invokeOnPartitions("hz:impl:cacheService", factory, partitions);
            }
            catch (Throwable t) {
                this.logger.warning("Error while running cache merge operation: " + t.getMessage());
                throw ExceptionUtil.rethrow(t);
            }
            finally {
                this.semaphore.release(totalSize);
            }
        }

        @Override
        public void dispose() {
            for (int partitionId = 0; partitionId < CacheSplitBrainHandlerService.this.partitionCount; ++partitionId) {
                CacheSplitBrainHandlerService.this.destroySegment(CacheSplitBrainHandlerService.this.segments[partitionId]);
            }
        }
    }
}

