/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.map.writebehind;

import com.hazelcast.cluster.ClusterService;
import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.core.MapStore;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.MapService;
import com.hazelcast.map.PartitionContainer;
import com.hazelcast.map.RecordStore;
import com.hazelcast.map.writebehind.DelayedEntry;
import com.hazelcast.map.writebehind.WriteBehindManager;
import com.hazelcast.map.writebehind.WriteBehindQueue;
import com.hazelcast.map.writebehind.store.MapStoreManager;
import com.hazelcast.map.writebehind.store.MapStoreManagers;
import com.hazelcast.map.writebehind.store.StoreListener;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.partition.InternalPartition;
import com.hazelcast.partition.InternalPartitionService;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.util.Clock;
import com.hazelcast.util.executor.ExecutorType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

class WriteBehindQueueManager
implements WriteBehindManager {
    private static final String EXECUTOR_NAME_PREFIX = "hz:scheduled:mapstore:";
    private static final int EXECUTOR_DEFAULT_QUEUE_CAPACITY = 10000;
    private static final Comparator<DelayedEntry> DELAYED_ENTRY_COMPARATOR = new Comparator<DelayedEntry>(){

        @Override
        public int compare(DelayedEntry o1, DelayedEntry o2) {
            long s2;
            long s1 = o1.getStoreTime();
            return s1 < (s2 = o2.getStoreTime()) ? -1 : (s1 == s2 ? 0 : 1);
        }
    };
    private final ScheduledExecutorService scheduledExecutor;
    private final StoreProcessor processor;
    private final MapService mapService;
    private final MapStoreManager<DelayedEntry> mapStoreManager;
    private final List<StoreListener> listeners;
    private final ILogger logger;

    WriteBehindQueueManager(String mapName, MapService mapService, MapStore mapStore, MapStoreConfig mapStoreConfig) {
        this.scheduledExecutor = this.getScheduledExecutorService(mapName, mapService);
        this.mapService = mapService;
        this.logger = mapService.getNodeEngine().getLogger(WriteBehindQueueManager.class);
        this.listeners = new ArrayList<StoreListener>(2);
        this.mapStoreManager = MapStoreManagers.newMapStoreManager(mapService, mapStore, this.listeners);
        this.processor = new StoreProcessor(mapName, mapService, this.mapStoreManager, mapStoreConfig);
    }

    @Override
    public void start() {
        this.scheduledExecutor.scheduleAtFixedRate(this.processor, 1L, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void stop() {
        this.scheduledExecutor.shutdown();
    }

    @Override
    public void addStoreListener(StoreListener storeListener) {
        this.listeners.add(storeListener);
    }

    @Override
    public Collection<Data> flush(WriteBehindQueue<DelayedEntry> queue) {
        if (queue.size() == 0) {
            return Collections.emptyList();
        }
        List<DelayedEntry> sortedDelayedEntries = queue.removeAll();
        return this.flush0(sortedDelayedEntries);
    }

    private Collection<Data> flush0(List<DelayedEntry> delayedEntries) {
        Collections.sort(delayedEntries, DELAYED_ENTRY_COMPARATOR);
        Map<Integer, List<DelayedEntry>> failedStoreOpPerPartition = this.mapStoreManager.process(delayedEntries);
        if (failedStoreOpPerPartition.size() > 0) {
            this.printErrorLog(failedStoreOpPerPartition);
        }
        return this.getDataKeys(delayedEntries);
    }

    private List<Data> getDataKeys(List<DelayedEntry> sortedDelayedEntries) {
        if (sortedDelayedEntries == null || sortedDelayedEntries.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<Data> keys = new ArrayList<Data>(sortedDelayedEntries.size());
        for (DelayedEntry entry : sortedDelayedEntries) {
            keys.add(this.mapService.toData(entry.getKey()));
        }
        return keys;
    }

    @Override
    public ScheduledExecutorService getScheduler() {
        return this.scheduledExecutor;
    }

    private void printErrorLog(Map<Integer, List<DelayedEntry>> failsPerPartition) {
        int size = 0;
        Collection<List<DelayedEntry>> values = failsPerPartition.values();
        for (Collection collection : values) {
            size += collection.size();
        }
        String logMessage = String.format("Map store flush operation can not be done for %d entries", size);
        this.logger.severe(logMessage);
    }

    private static List<DelayedEntry> filterItemsLessThanOrEqualToTime(WriteBehindQueue<DelayedEntry> queue, long now) {
        DelayedEntry e;
        if (queue == null || queue.size() == 0) {
            return Collections.emptyList();
        }
        List<DelayedEntry> delayedEntries = Collections.emptyList();
        int i = 0;
        while ((e = queue.get(i)) != null) {
            if (i == 0) {
                delayedEntries = new ArrayList<DelayedEntry>();
            }
            if (e.getStoreTime() <= now) {
                delayedEntries.add(e);
            }
            ++i;
        }
        return delayedEntries;
    }

    private ScheduledExecutorService getScheduledExecutorService(String mapName, MapService mapService) {
        NodeEngine nodeEngine = mapService.getNodeEngine();
        ExecutionService executionService = nodeEngine.getExecutionService();
        String executorName = EXECUTOR_NAME_PREFIX + mapName;
        executionService.register(executorName, 1, 10000, ExecutorType.CACHED);
        return executionService.getScheduledExecutor(executorName);
    }

    private static final class StoreProcessor
    implements Runnable {
        private final String mapName;
        private final MapService mapService;
        private final MapStoreManager mapStoreManager;
        private final long backupRunIntervalTime;
        private long lastRunTime;
        private final int writeBatchSize;

        private StoreProcessor(String mapName, MapService mapService, MapStoreManager mapStoreManager, MapStoreConfig mapStoreConfig) {
            this.mapName = mapName;
            this.mapService = mapService;
            this.mapStoreManager = mapStoreManager;
            this.backupRunIntervalTime = this.getReplicaWaitTime();
            this.lastRunTime = Clock.currentTimeMillis();
            this.writeBatchSize = mapStoreConfig.getWriteBatchSize();
        }

        private long getReplicaWaitTime() {
            return TimeUnit.SECONDS.toMillis(this.mapService.getNodeEngine().getGroupProperties().MAP_REPLICA_SCHEDULED_TASK_DELAY_SECONDS.getInteger());
        }

        @Override
        public void run() {
            MapService mapService = this.mapService;
            long now = Clock.currentTimeMillis();
            NodeEngine nodeEngine = mapService.getNodeEngine();
            ClusterService clusterService = nodeEngine.getClusterService();
            InternalPartitionService partitionService = nodeEngine.getPartitionService();
            Address thisAddress = clusterService.getThisAddress();
            int partitionCount = partitionService.getPartitionCount();
            Map<Integer, Integer> partitionToEntryCountHolder = Collections.emptyMap();
            List<DelayedEntry> sortedDelayedEntries = Collections.emptyList();
            boolean createLazy = true;
            for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
                WriteBehindQueue<DelayedEntry> queue;
                List delayedEntries;
                RecordStore recordStore;
                InternalPartition partition = partitionService.getPartition(partitionId);
                Address owner = partition.getOwnerOrNull();
                if (owner == null || (recordStore = this.getRecordStoreOrNull(this.mapName, partitionId)) == null || (delayedEntries = WriteBehindQueueManager.filterItemsLessThanOrEqualToTime(queue = recordStore.getWriteBehindQueue(), now)).isEmpty()) continue;
                if (!owner.equals(thisAddress)) {
                    if (now >= this.lastRunTime + this.backupRunIntervalTime) continue;
                    this.doInBackup(queue, delayedEntries, partitionId);
                    continue;
                }
                if (createLazy) {
                    partitionToEntryCountHolder = new HashMap<Integer, Integer>();
                    sortedDelayedEntries = new ArrayList<DelayedEntry>();
                    createLazy = false;
                }
                partitionToEntryCountHolder.put(partitionId, delayedEntries.size());
                sortedDelayedEntries.addAll(delayedEntries);
            }
            if (sortedDelayedEntries.isEmpty()) {
                return;
            }
            Collections.sort(sortedDelayedEntries, DELAYED_ENTRY_COMPARATOR);
            if (this.writeBatchSize > 1) {
                this.doStoreUsingBatchSize(sortedDelayedEntries, partitionToEntryCountHolder);
            } else {
                this.doStore(sortedDelayedEntries, partitionToEntryCountHolder);
            }
            this.lastRunTime = now;
        }

        private void doStoreUsingBatchSize(List<DelayedEntry> sortedDelayedEntries, Map<Integer, Integer> countHolder) {
            List<DelayedEntry> delayedEntryList;
            int page = 0;
            while ((delayedEntryList = this.getBatchChunk(sortedDelayedEntries, this.writeBatchSize, page++)) != null) {
                this.doStore(delayedEntryList, countHolder);
            }
        }

        private void doStore(List<DelayedEntry> sortedDelayedEntries, Map<Integer, Integer> countHolder) {
            Map<Integer, List<DelayedEntry>> failsPerPartition = this.mapStoreManager.process(sortedDelayedEntries);
            this.removeProcessedEntries(this.mapName, countHolder);
            this.addFailsToQueue(this.mapName, failsPerPartition);
        }

        private List<DelayedEntry> getBatchChunk(List<DelayedEntry> list, int batchSize, int chunkNumber) {
            if (list == null || list.isEmpty()) {
                return null;
            }
            int start = chunkNumber * batchSize;
            int end = Math.min(start + batchSize, list.size());
            if (start >= end) {
                return null;
            }
            return list.subList(start, end);
        }

        private void doInBackup(WriteBehindQueue queue, List<DelayedEntry> delayedEntries, int partitionId) {
            NodeEngine nodeEngine = this.mapService.getNodeEngine();
            ClusterService clusterService = nodeEngine.getClusterService();
            InternalPartitionService partitionService = nodeEngine.getPartitionService();
            Address thisAddress = clusterService.getThisAddress();
            InternalPartition partition = partitionService.getPartition(partitionId);
            Address owner = partition.getOwnerOrNull();
            if (owner != null && !owner.equals(thisAddress)) {
                this.mapStoreManager.callBeforeStoreListeners(delayedEntries);
                this.removeProcessed(queue, delayedEntries.size());
                this.mapStoreManager.callAfterStoreListeners(delayedEntries);
            }
        }

        private void removeProcessedEntries(String mapName, Map<Integer, Integer> partitionToEntryCountHolder) {
            for (Map.Entry<Integer, Integer> entry : partitionToEntryCountHolder.entrySet()) {
                Integer partitionId = entry.getKey();
                RecordStore recordStore = this.getRecordStoreOrNull(mapName, partitionId);
                if (recordStore == null) continue;
                WriteBehindQueue<DelayedEntry> queue = recordStore.getWriteBehindQueue();
                this.removeProcessed(queue, partitionToEntryCountHolder.get(partitionId));
            }
        }

        private RecordStore getRecordStoreOrNull(String mapName, int partitionId) {
            PartitionContainer partitionContainer = this.mapService.getPartitionContainer(partitionId);
            return partitionContainer.getExistingRecordStore(mapName);
        }

        private void addFailsToQueue(String mapName, Map<Integer, List<DelayedEntry>> failsPerPartition) {
            if (failsPerPartition.isEmpty()) {
                return;
            }
            for (Map.Entry<Integer, List<DelayedEntry>> entry : failsPerPartition.entrySet()) {
                RecordStore recordStore;
                Integer partitionId = entry.getKey();
                List<DelayedEntry> fails = failsPerPartition.get(partitionId);
                if (fails == null || fails.isEmpty() || (recordStore = this.getRecordStoreOrNull(mapName, partitionId)) == null) continue;
                WriteBehindQueue<DelayedEntry> queue = recordStore.getWriteBehindQueue();
                Collections.sort(fails, DELAYED_ENTRY_COMPARATOR);
                queue.addFront(fails);
            }
        }

        private void removeProcessed(WriteBehindQueue<DelayedEntry> queue, int numberOfEntriesProcessed) {
            if (queue == null || queue.size() == 0 || numberOfEntriesProcessed < 1) {
                return;
            }
            for (int j = 0; j < numberOfEntriesProcessed; ++j) {
                queue.removeFirst();
            }
        }
    }
}

