/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.replicatedmap.impl.record;

import com.hazelcast.cluster.ClusterService;
import com.hazelcast.config.ReplicatedMapConfig;
import com.hazelcast.core.Member;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.monitor.impl.LocalReplicatedMapStatsImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.replicatedmap.impl.PreReplicationHook;
import com.hazelcast.replicatedmap.impl.ReplicationChannel;
import com.hazelcast.replicatedmap.impl.messages.MultiReplicationMessage;
import com.hazelcast.replicatedmap.impl.messages.ReplicationMessage;
import com.hazelcast.replicatedmap.impl.operation.ReplicatedMapClearOperation;
import com.hazelcast.replicatedmap.impl.operation.ReplicatedMapPostJoinOperation;
import com.hazelcast.replicatedmap.impl.record.AbstractBaseReplicatedRecordStore;
import com.hazelcast.replicatedmap.impl.record.InternalReplicatedMapStorage;
import com.hazelcast.replicatedmap.impl.record.RemoteProvisionTask;
import com.hazelcast.replicatedmap.impl.record.ReplicatedRecord;
import com.hazelcast.replicatedmap.impl.record.ReplicationCachedSenderTask;
import com.hazelcast.replicatedmap.impl.record.VectorClockTimestamp;
import com.hazelcast.replicatedmap.impl.record.WrappedExecutorService;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.EventService;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationService;
import com.hazelcast.util.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReplicationPublisher<K, V>
implements ReplicationChannel {
    private static final ILogger LOGGER = Logger.getLogger(ReplicationPublisher.class);
    private static final String SERVICE_NAME = "hz:impl:replicatedMapService";
    private static final String EVENT_TOPIC_NAME = "hz:impl:replicatedMapService.replication";
    private static final String EXECUTOR_NAME = "hz:replicated-map";
    private static final int MAX_MESSAGE_CACHE_SIZE = 1000;
    private static final int MAX_CLEAR_EXECUTION_RETRY = 5;
    private final List<ReplicationMessage> replicationMessageCache = new ArrayList<ReplicationMessage>();
    private final Lock replicationMessageCacheLock = new ReentrantLock();
    private final Random memberRandomizer = new Random();
    private final ScheduledExecutorService executorService;
    private final ExecutionService executionService;
    private final OperationService operationService;
    private final ClusterService clusterService;
    private final EventService eventService;
    private final NodeEngine nodeEngine;
    private final AbstractBaseReplicatedRecordStore<K, V> replicatedRecordStore;
    private final InternalReplicatedMapStorage<K, V> storage;
    private final ReplicatedMapConfig replicatedMapConfig;
    private final LocalReplicatedMapStatsImpl mapStats;
    private final Member localMember;
    private final String name;
    private final boolean allowReplicationHooks;
    private volatile PreReplicationHook preReplicationHook;

    ReplicationPublisher(AbstractBaseReplicatedRecordStore<K, V> replicatedRecordStore, NodeEngine nodeEngine) {
        this.replicatedRecordStore = replicatedRecordStore;
        this.nodeEngine = nodeEngine;
        this.name = replicatedRecordStore.getName();
        this.storage = replicatedRecordStore.storage;
        this.mapStats = replicatedRecordStore.mapStats;
        this.eventService = nodeEngine.getEventService();
        this.localMember = replicatedRecordStore.localMember;
        this.clusterService = nodeEngine.getClusterService();
        this.executionService = nodeEngine.getExecutionService();
        this.operationService = nodeEngine.getOperationService();
        this.replicatedMapConfig = replicatedRecordStore.replicatedMapConfig;
        this.executorService = this.getExecutorService(nodeEngine, this.replicatedMapConfig);
        this.allowReplicationHooks = Boolean.parseBoolean(System.getProperty("hazelcast.repmap.hooks.allowed", "false"));
    }

    @Override
    public void replicate(MultiReplicationMessage message) {
        this.distributeReplicationMessage(message, true);
    }

    @Override
    public void replicate(ReplicationMessage message) {
        this.distributeReplicationMessage(message, true);
    }

    public void setPreReplicationHook(PreReplicationHook preReplicationHook) {
        this.preReplicationHook = preReplicationHook;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void publishReplicatedMessage(ReplicationMessage message) {
        if (this.replicatedMapConfig.getReplicationDelayMillis() == 0L) {
            this.distributeReplicationMessage(message, false);
        } else {
            this.replicationMessageCacheLock.lock();
            try {
                this.replicationMessageCache.add(message);
                if (this.replicationMessageCache.size() == 1) {
                    ReplicationCachedSenderTask task = new ReplicationCachedSenderTask(this);
                    long replicationDelayMillis = this.replicatedMapConfig.getReplicationDelayMillis();
                    this.executorService.schedule(task, replicationDelayMillis, TimeUnit.MILLISECONDS);
                } else if (this.replicationMessageCache.size() > 1000) {
                    this.processMessageCache();
                }
            }
            finally {
                this.replicationMessageCacheLock.unlock();
            }
        }
    }

    public void queueUpdateMessage(final ReplicationMessage update) {
        Member origin = update.getOrigin();
        if (this.localMember.equals(origin)) {
            return;
        }
        this.executorService.execute(new Runnable(){

            @Override
            public void run() {
                ReplicationPublisher.this.processUpdateMessage(update);
            }
        });
    }

    public void queueUpdateMessages(final MultiReplicationMessage updates) {
        this.executorService.execute(new Runnable(){

            @Override
            public void run() {
                for (ReplicationMessage update : updates.getReplicationMessages()) {
                    ReplicationPublisher.this.processUpdateMessage(update);
                }
            }
        });
    }

    void destroy() {
        this.executorService.shutdownNow();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processMessageCache() {
        ReplicationMessage[] replicationMessages = null;
        this.replicationMessageCacheLock.lock();
        try {
            int size = this.replicationMessageCache.size();
            if (size > 0) {
                replicationMessages = this.replicationMessageCache.toArray(new ReplicationMessage[size]);
                this.replicationMessageCache.clear();
            }
        }
        finally {
            this.replicationMessageCacheLock.unlock();
        }
        if (replicationMessages != null) {
            MultiReplicationMessage message = new MultiReplicationMessage(this.name, replicationMessages);
            this.distributeReplicationMessage(message, false);
        }
    }

    void distributeReplicationMessage(final Object message, boolean forceSend) {
        final PreReplicationHook preReplicationHook = this.getPreReplicationHook();
        if (forceSend || preReplicationHook == null) {
            Collection<EventRegistration> eventRegistrations = this.eventService.getRegistrations(SERVICE_NAME, EVENT_TOPIC_NAME);
            Collection<EventRegistration> registrations = this.filterEventRegistrations(eventRegistrations);
            this.eventService.publishEvent(SERVICE_NAME, registrations, message, this.name.hashCode());
        } else {
            this.executionService.execute(EXECUTOR_NAME, new Runnable(){

                @Override
                public void run() {
                    if (message instanceof MultiReplicationMessage) {
                        preReplicationHook.preReplicateMultiMessage((MultiReplicationMessage)message, ReplicationPublisher.this);
                    } else {
                        preReplicationHook.preReplicateMessage((ReplicationMessage)message, ReplicationPublisher.this);
                    }
                }
            });
        }
    }

    public void queuePreProvision(Address callerAddress, int chunkSize) {
        RemoteProvisionTask<K, V> task = new RemoteProvisionTask<K, V>(this.replicatedRecordStore, this.nodeEngine, callerAddress, chunkSize);
        this.executionService.execute(EXECUTOR_NAME, task);
    }

    public void retryWithDifferentReplicationNode(Member member) {
        ArrayList<MemberImpl> members = new ArrayList<MemberImpl>(this.nodeEngine.getClusterService().getMemberList());
        members.remove(member);
        if (members.size() < 2) {
            return;
        }
        this.sendPreProvisionRequest(members);
    }

    public void distributeClear(boolean emptyReplicationQueue) {
        this.executeRemoteClear(emptyReplicationQueue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void emptyReplicationQueue() {
        this.replicationMessageCacheLock.lock();
        try {
            this.replicationMessageCache.clear();
        }
        finally {
            this.replicationMessageCacheLock.unlock();
        }
    }

    void sendPreProvisionRequest(List<MemberImpl> members) {
        if (members.size() == 0) {
            return;
        }
        int randomMember = this.memberRandomizer.nextInt(members.size());
        MemberImpl newMember = members.get(randomMember);
        ReplicatedMapPostJoinOperation.MemberMapPair[] memberMapPairs = new ReplicatedMapPostJoinOperation.MemberMapPair[]{new ReplicatedMapPostJoinOperation.MemberMapPair(newMember, this.name)};
        OperationService operationService = this.nodeEngine.getOperationService();
        int defaultChunkSize = 100;
        ReplicatedMapPostJoinOperation op = new ReplicatedMapPostJoinOperation(memberMapPairs, defaultChunkSize);
        operationService.send(op, newMember.getAddress());
    }

    private void executeRemoteClear(boolean emptyReplicationQueue) {
        ArrayList<MemberImpl> failedMembers = new ArrayList<MemberImpl>(this.clusterService.getMemberList());
        for (int i = 0; i < 5; ++i) {
            Map futures = this.executeClearOnMembers(failedMembers, emptyReplicationQueue);
            failedMembers.clear();
            for (Map.Entry future : futures.entrySet()) {
                try {
                    ((InternalCompletableFuture)future.getValue()).get();
                }
                catch (Exception e) {
                    this.nodeEngine.getLogger(ReplicationPublisher.class).finest(e);
                    failedMembers.add((MemberImpl)future.getKey());
                }
            }
            if (failedMembers.size() != 0) continue;
            return;
        }
        throw new OperationTimeoutException("ReplicatedMap::clear couldn't be finished, failed nodes: " + failedMembers);
    }

    private Map executeClearOnMembers(Collection<MemberImpl> members, boolean emptyReplicationQueue) {
        Address thisAddress = this.clusterService.getThisAddress();
        HashMap<MemberImpl, InternalCompletableFuture> futures = new HashMap<MemberImpl, InternalCompletableFuture>(members.size());
        for (MemberImpl member : members) {
            Address address = member.getAddress();
            if (thisAddress.equals(address)) continue;
            ReplicatedMapClearOperation operation = new ReplicatedMapClearOperation(this.name, emptyReplicationQueue);
            InvocationBuilder ib = this.operationService.createInvocationBuilder(SERVICE_NAME, (Operation)operation, address);
            futures.put(member, ib.invoke());
        }
        return futures;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processUpdateMessage(ReplicationMessage update) {
        String key;
        if (this.localMember.equals(update.getOrigin())) {
            return;
        }
        this.mapStats.incrementReceivedReplicationEvents();
        if (update.getKey() instanceof String && "hz:impl:replicatedMapService$CLEAR$MESSAGE$".equals(key = (String)update.getKey())) {
            this.storage.clear();
            return;
        }
        Object marshalledKey = this.replicatedRecordStore.marshallKey(update.getKey());
        Object object = this.replicatedRecordStore.getMutex(marshalledKey);
        synchronized (object) {
            ReplicatedRecord<K, V> localEntry = this.storage.get(marshalledKey);
            if (localEntry == null) {
                this.createLocalEntry(update, marshalledKey);
            } else {
                this.updateLocalEntry(localEntry, update);
            }
        }
    }

    private void updateLocalEntry(ReplicatedRecord<K, V> localEntry, ReplicationMessage update) {
        VectorClockTimestamp currentVectorClockTimestamp = localEntry.getVectorClockTimestamp();
        VectorClockTimestamp updateVectorClockTimestamp = update.getVectorClockTimestamp();
        if (this.isOldTombstone(localEntry)) {
            this.applyTheUpdate(update, localEntry);
        } else if (VectorClockTimestamp.happenedBefore(currentVectorClockTimestamp, updateVectorClockTimestamp)) {
            this.applyTheUpdate(update, localEntry);
        } else {
            if (VectorClockTimestamp.happenedBefore(updateVectorClockTimestamp, currentVectorClockTimestamp)) {
                return;
            }
            if (!updateVectorClockTimestamp.equals(currentVectorClockTimestamp)) {
                if (localEntry.getLatestUpdateHash() >= update.getUpdateHash()) {
                    this.applyTheUpdate(update, localEntry);
                } else {
                    VectorClockTimestamp newTimestamp = localEntry.applyAndIncrementVectorClock(updateVectorClockTimestamp, this.localMember);
                    Object key = update.getKey();
                    V v = localEntry.getValueInternal();
                    V value = v instanceof Data ? this.nodeEngine.toObject(v) : v;
                    long ttlMillis = update.getTtlMillis();
                    int latestUpdateHash = localEntry.getLatestUpdateHash();
                    ReplicationMessage message = new ReplicationMessage(this.name, key, value, newTimestamp, this.localMember, latestUpdateHash, ttlMillis);
                    this.distributeReplicationMessage(message, true);
                }
            } else {
                LOGGER.finest("Received an update with the same state of vector clock I currently have. This can happened during initialization. Ignoring the update.");
            }
        }
    }

    private boolean isOldTombstone(ReplicatedRecord<K, V> localEntry) {
        if (!localEntry.isTombstone()) {
            return false;
        }
        long updateTime = localEntry.getUpdateTime();
        long threshold = updateTime + 150000L;
        return Clock.currentTimeMillis() > threshold;
    }

    private void createLocalEntry(ReplicationMessage update, K marshalledKey) {
        Object marshalledValue = this.replicatedRecordStore.marshallValue(update.getValue());
        VectorClockTimestamp timestamp = update.getVectorClockTimestamp();
        int updateHash = update.getUpdateHash();
        long ttlMillis = update.getTtlMillis();
        this.storage.put(marshalledKey, new ReplicatedRecord<K, Object>(marshalledKey, marshalledValue, timestamp, updateHash, ttlMillis));
        if (ttlMillis > 0L) {
            this.replicatedRecordStore.scheduleTtlEntry(ttlMillis, marshalledKey, marshalledValue);
        } else {
            this.replicatedRecordStore.cancelTtlEntry(marshalledKey);
        }
        this.replicatedRecordStore.fireEntryListenerEvent(update.getKey(), null, update.getValue());
    }

    private void applyTheUpdate(ReplicationMessage<K, V> update, ReplicatedRecord<K, V> localEntry) {
        VectorClockTimestamp remoteVectorClockTimestamp = update.getVectorClockTimestamp();
        Object marshalledKey = this.replicatedRecordStore.marshallKey(update.getKey());
        Object marshalledValue = this.replicatedRecordStore.marshallValue(update.getValue());
        long ttlMillis = update.getTtlMillis();
        long oldTtlMillis = localEntry.getTtlMillis();
        Object oldValue = localEntry.setValueInternal(marshalledValue, update.getUpdateHash(), ttlMillis);
        localEntry.applyVectorClock(remoteVectorClockTimestamp);
        if (ttlMillis > 0L || update.isRemove()) {
            this.replicatedRecordStore.scheduleTtlEntry(ttlMillis, marshalledKey, null);
        } else {
            this.replicatedRecordStore.cancelTtlEntry(marshalledKey);
        }
        Object unmarshalledOldValue = this.replicatedRecordStore.unmarshallValue(oldValue);
        if (unmarshalledOldValue == null || !unmarshalledOldValue.equals(update.getValue()) || update.getTtlMillis() != oldTtlMillis) {
            this.replicatedRecordStore.fireEntryListenerEvent(update.getKey(), unmarshalledOldValue, update.getValue());
        }
    }

    private Collection<EventRegistration> filterEventRegistrations(Collection<EventRegistration> eventRegistrations) {
        Address address = ((MemberImpl)this.localMember).getAddress();
        ArrayList<EventRegistration> registrations = new ArrayList<EventRegistration>(eventRegistrations);
        Iterator iterator = registrations.iterator();
        while (iterator.hasNext()) {
            EventRegistration registration = (EventRegistration)iterator.next();
            if (!address.equals(registration.getSubscriber())) continue;
            iterator.remove();
        }
        return registrations;
    }

    private PreReplicationHook getPreReplicationHook() {
        if (!this.allowReplicationHooks) {
            return null;
        }
        return this.preReplicationHook;
    }

    private ScheduledExecutorService getExecutorService(NodeEngine nodeEngine, ReplicatedMapConfig replicatedMapConfig) {
        ScheduledExecutorService es = replicatedMapConfig.getReplicatorExecutorService();
        if (es == null) {
            es = nodeEngine.getExecutionService().getDefaultScheduledExecutor();
        }
        return new WrappedExecutorService(es);
    }
}

