/*
 * Decompiled with CFR 0.152.
 */
package com.apple.foundationdb.record.provider.foundationdb.indexing;

import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.async.AsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.record.IndexBuildProto;
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore;
import com.apple.foundationdb.record.provider.foundationdb.IndexingSubspaces;
import com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@API(value=API.Status.INTERNAL)
public class IndexingHeartbeat {
    @Nonnull
    private static final Logger logger = LoggerFactory.getLogger(IndexingHeartbeat.class);
    public static final String INVALID_HEARTBEAT_INFO = "<< Invalid Heartbeat >>";
    final UUID indexerId;
    final String info;
    final long createTimeMilliseconds;
    final long leaseLength;
    final boolean allowMutual;

    public IndexingHeartbeat(UUID indexerId, String info, long leaseLength, boolean allowMutual) {
        this.indexerId = indexerId;
        this.info = info;
        this.leaseLength = leaseLength;
        this.allowMutual = allowMutual;
        this.createTimeMilliseconds = IndexingHeartbeat.nowMilliseconds();
    }

    public UUID getIndexerId() {
        return this.indexerId;
    }

    public void updateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
        byte[] key = IndexingSubspaces.indexHeartbeatSubspaceBytes(store, index, this.indexerId);
        byte[] value = IndexBuildProto.IndexBuildHeartbeat.newBuilder().setInfo(this.info).setCreateTimeMilliseconds(this.createTimeMilliseconds).setHeartbeatTimeMilliseconds(IndexingHeartbeat.nowMilliseconds()).build().toByteArray();
        store.ensureContextActive().set(key, value);
    }

    public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
        if (this.allowMutual) {
            this.updateHeartbeat(store, index);
            return AsyncUtil.DONE;
        }
        AsyncIterator<KeyValue> iterator = IndexingHeartbeat.heartbeatsIterator(store, index);
        long now = IndexingHeartbeat.nowMilliseconds();
        return AsyncUtil.forEachRemaining(iterator, kv -> this.checkSingleHeartbeat(store, index, (KeyValue)kv, now), store.getExecutor()).thenRun(() -> this.updateHeartbeat(store, index));
    }

    private void checkSingleHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index, KeyValue kv, long now) {
        block3: {
            try {
                IndexBuildProto.IndexBuildHeartbeat otherHeartbeat;
                long age;
                UUID otherIndexerId = IndexingHeartbeat.heartbeatKeyToIndexerId(store, index, kv.getKey());
                if (!otherIndexerId.equals(this.indexerId) && (age = now - (otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue())).getHeartbeatTimeMilliseconds()) > TimeUnit.DAYS.toMillis(-1L) && age < this.leaseLength) {
                    throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.INDEXER_ID, this.indexerId}).addLogInfo(new Object[]{LogMessageKeys.EXISTING_INDEXER_ID, otherIndexerId}).addLogInfo(new Object[]{LogMessageKeys.AGE_MILLISECONDS, age}).addLogInfo(new Object[]{LogMessageKeys.TIME_LIMIT_MILLIS, this.leaseLength});
                }
            }
            catch (InvalidProtocolBufferException e) {
                if (!logger.isWarnEnabled()) break block3;
                logger.warn(KeyValueLogMessage.of("Bad indexing heartbeat item", new Object[]{LogMessageKeys.KEY, kv.getKey(), LogMessageKeys.VALUE, kv.getValue()}));
            }
        }
    }

    public void clearHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
        store.ensureContextActive().clear(IndexingSubspaces.indexHeartbeatSubspaceBytes(store, index, this.indexerId));
    }

    public static void clearAllHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index) {
        store.ensureContextActive().clear(IndexingSubspaces.indexHeartbeatSubspace(store, index).range());
    }

    public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>> getIndexingHeartbeats(FDBRecordStore store, Index index, int maxCount) {
        HashMap ret = new HashMap();
        AsyncIterator<KeyValue> iterator = IndexingHeartbeat.heartbeatsIterator(store, index);
        AtomicInteger iterationCount = new AtomicInteger(0);
        return AsyncUtil.whileTrue(() -> iterator.onHasNext().thenApply(hasNext -> {
            if (!hasNext.booleanValue()) {
                return false;
            }
            if (maxCount > 0 && maxCount < iterationCount.incrementAndGet()) {
                return false;
            }
            KeyValue kv = (KeyValue)iterator.next();
            UUID otherIndexerId = IndexingHeartbeat.heartbeatKeyToIndexerId(store, index, kv.getKey());
            try {
                IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
                ret.put(otherIndexerId, otherHeartbeat);
            }
            catch (InvalidProtocolBufferException e) {
                ret.put(otherIndexerId, IndexBuildProto.IndexBuildHeartbeat.newBuilder().setInfo(INVALID_HEARTBEAT_INFO).setCreateTimeMilliseconds(0L).setHeartbeatTimeMilliseconds(0L).build());
            }
            return true;
        }), store.getExecutor()).thenApply(ignore -> ret);
    }

    public static CompletableFuture<Integer> clearIndexingHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index, long minAgeMilliseconds, int maxIteration) {
        AsyncIterator<KeyValue> iterator = IndexingHeartbeat.heartbeatsIterator(store, index);
        AtomicInteger deleteCount = new AtomicInteger(0);
        AtomicInteger iterationCount = new AtomicInteger(0);
        long now = IndexingHeartbeat.nowMilliseconds();
        return AsyncUtil.whileTrue(() -> iterator.onHasNext().thenApply(hasNext -> {
            boolean shouldRemove;
            if (!hasNext.booleanValue()) {
                return false;
            }
            if (maxIteration > 0 && maxIteration < iterationCount.incrementAndGet()) {
                return false;
            }
            KeyValue kv = (KeyValue)iterator.next();
            try {
                IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
                shouldRemove = now >= otherHeartbeat.getHeartbeatTimeMilliseconds() + minAgeMilliseconds;
            }
            catch (InvalidProtocolBufferException e) {
                shouldRemove = true;
            }
            if (shouldRemove) {
                store.ensureContextActive().clear(kv.getKey());
                deleteCount.incrementAndGet();
            }
            return true;
        }), store.getExecutor()).thenApply(ignore -> deleteCount.get());
    }

    private static AsyncIterator<KeyValue> heartbeatsIterator(FDBRecordStore store, Index index) {
        return store.getContext().ensureActive().getRange(IndexingSubspaces.indexHeartbeatSubspace(store, index).range()).iterator();
    }

    private static UUID heartbeatKeyToIndexerId(FDBRecordStore store, Index index, byte[] key) {
        return IndexingSubspaces.indexHeartbeatSubspace(store, index).unpack(key).getUUID(0);
    }

    private static long nowMilliseconds() {
        return System.currentTimeMillis();
    }
}

