/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.map.impl;

import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MergePolicyConfig;
import com.hazelcast.core.EntryView;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.internal.cluster.Versions;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.impl.EntryViews;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.PartitionContainer;
import com.hazelcast.map.impl.operation.MapOperation;
import com.hazelcast.map.impl.operation.MapOperationProvider;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.map.merge.IgnoreMergingEntryMapMergePolicy;
import com.hazelcast.map.merge.MapMergePolicy;
import com.hazelcast.map.merge.MergePolicyProvider;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Disposable;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.SplitBrainHandlerService;
import com.hazelcast.spi.SplitBrainMergePolicy;
import com.hazelcast.spi.impl.merge.MergingHolders;
import com.hazelcast.spi.merge.DiscardMergePolicy;
import com.hazelcast.spi.merge.MergingEntryHolder;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.util.Clock;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.MapUtil;
import com.hazelcast.util.MutableLong;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

class MapSplitBrainHandlerService
implements SplitBrainHandlerService {
    protected static final long TIMEOUT_FACTOR = 500L;
    protected final int partitionCount;
    protected final ILogger logger;
    protected final NodeEngine nodeEngine;
    protected final OperationService operationService;
    protected final IPartitionService partitionService;
    protected final MapServiceContext mapServiceContext;
    protected final MergePolicyProvider mergePolicyProvider;

    MapSplitBrainHandlerService(MapServiceContext mapServiceContext) {
        this.mapServiceContext = mapServiceContext;
        this.nodeEngine = mapServiceContext.getNodeEngine();
        this.logger = this.nodeEngine.getLogger(this.getClass());
        this.partitionService = this.nodeEngine.getPartitionService();
        this.partitionCount = this.partitionService.getPartitionCount();
        this.mergePolicyProvider = mapServiceContext.getMergePolicyProvider();
        this.operationService = this.nodeEngine.getOperationService();
    }

    @Override
    public Runnable prepareMergeRunnable() {
        long now = Clock.currentTimeMillis();
        Map<String, MapContainer> mapContainers = this.mapServiceContext.getMapContainers();
        Map<MapContainer, Collection<Record>> recordMap = MapUtil.createHashMap(mapContainers.size());
        for (MapContainer mapContainer : mapContainers.values()) {
            MapConfig mapConfig = mapContainer.getMapConfig();
            InMemoryFormat inMemoryFormat = mapConfig.getInMemoryFormat();
            if (inMemoryFormat == InMemoryFormat.NATIVE && this.nodeEngine.getClusterService().getClusterVersion().isLessThan(Versions.V3_10)) {
                this.logger.warning("Split-brain recovery can not be applied NATIVE in-memory-formatted map [" + mapContainer.name + ']');
                continue;
            }
            Object mergePolicy = this.getMergePolicy(mapConfig.getMergePolicyConfig());
            boolean mergePartitionData = !(mergePolicy instanceof IgnoreMergingEntryMapMergePolicy) && !(mergePolicy instanceof DiscardMergePolicy);
            for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
                RecordStore<Record> recordStore = this.getOrNullRecordStore(mapContainer.name, inMemoryFormat, partitionId);
                if (recordStore == null || !mergePartitionData || !this.partitionService.isPartitionOwner(partitionId)) continue;
                Collection<Record> records = recordMap.get(mapContainer);
                if (records == null) {
                    records = new LinkedList<Record>();
                    recordMap.put(mapContainer, records);
                }
                Iterator<Record> iterator = recordStore.iterator(now, false);
                while (iterator.hasNext()) {
                    records.add(iterator.next());
                }
            }
        }
        return new Merger(recordMap);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void destroyRecordStores(Collection<RecordStore> recordStores) {
        Iterator<RecordStore> iterator = recordStores.iterator();
        while (iterator.hasNext()) {
            RecordStore recordStore = iterator.next();
            try {
                recordStore.getMapContainer().getIndexes(recordStore.getPartitionId()).clearIndexes();
                recordStore.destroy();
            }
            finally {
                iterator.remove();
            }
        }
    }

    protected RecordStore<Record> getOrNullRecordStore(String mapName, InMemoryFormat inMemoryFormat, int partitionId) {
        PartitionContainer partitionContainer = this.mapServiceContext.getPartitionContainer(partitionId);
        RecordStore recordStore = partitionContainer.getExistingRecordStore(mapName);
        if (recordStore == null) {
            return null;
        }
        return recordStore;
    }

    private Object getMergePolicy(MergePolicyConfig config) {
        return this.mergePolicyProvider.getMergePolicy(config.getPolicy());
    }

    private class Merger
    implements Runnable,
    Disposable {
        private final Semaphore semaphore = new Semaphore(0);
        private final ILogger logger;
        private final Map<MapContainer, Collection<Record>> recordMap;

        Merger(Map<MapContainer, Collection<Record>> recordMap) {
            this.logger = MapSplitBrainHandlerService.this.nodeEngine.getLogger(MapSplitBrainHandlerService.class);
            this.recordMap = recordMap;
        }

        @Override
        public void run() {
            int recordCount = 0;
            for (Map.Entry<MapContainer, Collection<Record>> recordMapEntry : this.recordMap.entrySet()) {
                MapContainer mapContainer = recordMapEntry.getKey();
                Collection<Record> recordList = recordMapEntry.getValue();
                String mapName = mapContainer.getName();
                MergePolicyConfig mergePolicyConfig = mapContainer.getMapConfig().getMergePolicyConfig();
                Object mergePolicy = MapSplitBrainHandlerService.this.getMergePolicy(mergePolicyConfig);
                if (mergePolicy instanceof SplitBrainMergePolicy) {
                    if (MapSplitBrainHandlerService.this.nodeEngine.getClusterService().getClusterVersion().isLessThan(Versions.V3_10)) {
                        this.logger.info("Cannot merge map '" + mapName + "' with merge policy '" + mergePolicyConfig.getPolicy() + "' until cluster is running version " + Versions.V3_10);
                        continue;
                    }
                    int batchSize = mergePolicyConfig.getBatchSize();
                    recordCount += this.handleMerge(mapName, recordList, (SplitBrainMergePolicy)mergePolicy, batchSize);
                    continue;
                }
                recordCount += this.handleMerge(mapName, recordList, (MapMergePolicy)mergePolicy);
            }
            this.recordMap.clear();
            try {
                if (!this.semaphore.tryAcquire(recordCount, (long)recordCount * 500L, TimeUnit.MILLISECONDS)) {
                    this.logger.warning("Split-brain healing for maps didn't finish within the timeout...");
                }
            }
            catch (InterruptedException e) {
                this.logger.finest("Interrupted while waiting for split-brain healing of maps...");
                Thread.currentThread().interrupt();
            }
        }

        private int handleMerge(String name, Collection<Record> recordList, SplitBrainMergePolicy mergePolicy, int batchSize) {
            Map<Address, List<Integer>> memberPartitionsMap = MapSplitBrainHandlerService.this.partitionService.getMemberPartitionsMap();
            MutableLong[] counterPerMember = new MutableLong[MapSplitBrainHandlerService.this.partitionCount];
            Address[] addresses = new Address[MapSplitBrainHandlerService.this.partitionCount];
            for (Map.Entry<Address, List<Integer>> addressListEntry : memberPartitionsMap.entrySet()) {
                MutableLong counter = new MutableLong();
                Address address = addressListEntry.getKey();
                for (int partitionId : addressListEntry.getValue()) {
                    counterPerMember[partitionId] = counter;
                    addresses[partitionId] = address;
                }
            }
            List[] entriesPerPartition = new List[MapSplitBrainHandlerService.this.partitionCount];
            int recordCount = 0;
            for (Record record : recordList) {
                long currentSize;
                ++recordCount;
                int partitionId = MapSplitBrainHandlerService.this.partitionService.getPartitionId(record.getKey());
                LinkedList<MergingEntryHolder<Data, Data>> entries = entriesPerPartition[partitionId];
                if (entries == null) {
                    entriesPerPartition[partitionId] = entries = new LinkedList<MergingEntryHolder<Data, Data>>();
                }
                Data dataValue = MapSplitBrainHandlerService.this.mapServiceContext.toData(record.getValue());
                MergingEntryHolder<Data, Data> mergingEntry = MergingHolders.createMergeHolder(record, dataValue);
                entries.add(mergingEntry);
                if ((currentSize = ++counterPerMember[partitionId].value) % (long)batchSize != 0L) continue;
                List<Integer> partitions = memberPartitionsMap.get(addresses[partitionId]);
                this.sendBatch(name, partitions, entriesPerPartition, mergePolicy);
            }
            for (Map.Entry entry : memberPartitionsMap.entrySet()) {
                this.sendBatch(name, (List)entry.getValue(), entriesPerPartition, mergePolicy);
            }
            return recordCount;
        }

        private int handleMerge(String name, Collection<Record> recordList, MapMergePolicy mergePolicy) {
            ExecutionCallback<Object> mergeCallback = new ExecutionCallback<Object>(){

                @Override
                public void onResponse(Object response) {
                    Merger.this.semaphore.release(1);
                }

                @Override
                public void onFailure(Throwable t) {
                    Merger.this.logger.warning("Error while running map merge operation: " + t.getMessage());
                    Merger.this.semaphore.release(1);
                }
            };
            MapOperationProvider operationProvider = MapSplitBrainHandlerService.this.mapServiceContext.getMapOperationProvider(name);
            int recordCount = 0;
            for (Record record : recordList) {
                ++recordCount;
                Data key = record.getKey();
                Data value = MapSplitBrainHandlerService.this.mapServiceContext.toData(record.getValue());
                EntryView<Data, Data> entryView = EntryViews.createSimpleEntryView(key, value, record);
                MapOperation operation = operationProvider.createLegacyMergeOperation(name, entryView, mergePolicy, false);
                try {
                    int partitionId = MapSplitBrainHandlerService.this.partitionService.getPartitionId(key);
                    MapSplitBrainHandlerService.this.operationService.invokeOnPartition("hz:impl:mapService", operation, partitionId).andThen(mergeCallback);
                }
                catch (Throwable t) {
                    throw ExceptionUtil.rethrow(t);
                }
            }
            return recordCount;
        }

        private void sendBatch(String name, List<Integer> memberPartitions, List<MergingEntryHolder<Data, Data>>[] entriesPerPartition, SplitBrainMergePolicy mergePolicy) {
            int size = memberPartitions.size();
            int[] partitions = new int[size];
            int index = 0;
            for (Integer partitionId : memberPartitions) {
                if (entriesPerPartition[partitionId] == null) continue;
                partitions[index++] = partitionId;
            }
            if (index == 0) {
                return;
            }
            if (index < size) {
                partitions = Arrays.copyOf(partitions, index);
                size = index;
            }
            List[] entries = new List[size];
            index = 0;
            int totalSize = 0;
            for (int partitionId : partitions) {
                int batchSize = entriesPerPartition[partitionId].size();
                entries[index++] = entriesPerPartition[partitionId];
                totalSize += batchSize;
                entriesPerPartition[partitionId] = null;
            }
            if (totalSize == 0) {
                return;
            }
            this.invokeMergeOperationFactory(name, mergePolicy, partitions, entries, totalSize);
        }

        private void invokeMergeOperationFactory(String name, SplitBrainMergePolicy mergePolicy, int[] partitions, List<MergingEntryHolder<Data, Data>>[] entries, int totalSize) {
            try {
                MapOperationProvider operationProvider = MapSplitBrainHandlerService.this.mapServiceContext.getMapOperationProvider(name);
                OperationFactory factory = operationProvider.createMergeOperationFactory(name, partitions, entries, mergePolicy);
                MapSplitBrainHandlerService.this.operationService.invokeOnPartitions("hz:impl:mapService", factory, partitions);
            }
            catch (Throwable t) {
                this.logger.warning("Error while running map merge operation: " + t.getMessage());
                throw ExceptionUtil.rethrow(t);
            }
            finally {
                this.semaphore.release(totalSize);
            }
        }

        @Override
        public void dispose() {
            for (int partitionId = 0; partitionId < MapSplitBrainHandlerService.this.partitionCount; ++partitionId) {
                PartitionContainer partitionContainer = MapSplitBrainHandlerService.this.mapServiceContext.getPartitionContainer(partitionId);
                Collection<RecordStore> recordStores = partitionContainer.getAllRecordStores();
                MapSplitBrainHandlerService.this.destroyRecordStores(recordStores);
            }
        }
    }
}

