/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReaderNotInReaderGroupException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.impl.CheckpointFailedException;
import io.pravega.client.stream.impl.PositionInternal;
import io.pravega.client.stream.impl.ReaderGroupImpl;
import io.pravega.client.stream.impl.ReaderGroupState;
import io.pravega.client.stream.impl.SegmentWithRange;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.client.stream.impl.StreamSegmentsWithPredecessors;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.HashHelper;
import io.pravega.shared.NameUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReaderGroupStateManager {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReaderGroupStateManager.class);
    static final Duration TIME_UNIT = Duration.ofMillis(1000L);
    static final Duration UPDATE_WINDOW = Duration.ofMillis(30000L);
    static final Duration UPDATE_CONFIG_WINDOW = Duration.ofMillis(10000L);
    private static final double COMPACTION_PROBABILITY = 0.05;
    private static final int MIN_BYTES_BETWEEN_COMPACTIONS = 524288;
    private final Object decisionLock = new Object();
    private final HashHelper hashHelper;
    private final String readerId;
    private final String scope;
    private final String groupName;
    private final StateSynchronizer<ReaderGroupState> sync;
    private final Controller controller;
    private final TimeoutTimer releaseTimer;
    private final TimeoutTimer acquireTimer;
    private final TimeoutTimer fetchStateTimer;
    private final TimeoutTimer checkpointTimer;
    private final TimeoutTimer lagUpdateTimer;
    private final TimeoutTimer updateConfigTimer;

    ReaderGroupStateManager(String scope, String groupName, String readerId, StateSynchronizer<ReaderGroupState> sync, Controller controller, Supplier<Long> nanoClock) {
        Preconditions.checkNotNull((Object)readerId);
        Preconditions.checkNotNull(sync);
        Preconditions.checkNotNull((Object)controller);
        this.readerId = readerId;
        this.scope = scope;
        this.groupName = groupName;
        this.hashHelper = HashHelper.seededWith((String)readerId);
        this.sync = sync;
        this.controller = controller;
        if (nanoClock == null) {
            nanoClock = System::nanoTime;
        }
        this.releaseTimer = new TimeoutTimer(TIME_UNIT, nanoClock);
        this.acquireTimer = new TimeoutTimer(Duration.ZERO, nanoClock);
        this.fetchStateTimer = new TimeoutTimer(Duration.ZERO, nanoClock);
        this.checkpointTimer = new TimeoutTimer(TIME_UNIT, nanoClock);
        this.lagUpdateTimer = new TimeoutTimer(TIME_UNIT, nanoClock);
        this.updateConfigTimer = new TimeoutTimer(TIME_UNIT, nanoClock);
    }

    void initializeReader(long initialAllocationDelay) {
        boolean alreadyAdded = (Boolean)this.sync.updateState((state, updates) -> {
            if (state.getSegments(this.readerId) == null) {
                log.debug("Adding reader {} to reader group. CurrentState is: {}", (Object)this.readerId, state);
                updates.add(new ReaderGroupState.AddReader(this.readerId));
                return false;
            }
            return true;
        });
        if (alreadyAdded) {
            throw new IllegalStateException("The requested reader: " + this.readerId + " cannot be added to the group because it is already in the group. Perhaps close() was not called?");
        }
        long randomDelay = (long)(Math.random() * (double)Math.min(initialAllocationDelay, this.sync.getState().getConfig().getGroupRefreshTimeMillis()));
        this.acquireTimer.reset(Duration.ofMillis(initialAllocationDelay + randomDelay));
    }

    void readerShutdown(Position lastPosition) {
        ReaderGroupStateManager.readerShutdown(this.readerId, lastPosition, this.sync);
    }

    static void readerShutdown(String readerId, Position lastPosition, StateSynchronizer<ReaderGroupState> sync) {
        sync.fetchUpdates();
        sync.updateState((state, updates) -> {
            Set<Segment> segments = state.getSegments(readerId);
            if (segments == null) {
                return;
            }
            log.debug("Removing reader {} from reader group. CurrentState is: {}. Position is: {}.", new Object[]{readerId, state, lastPosition});
            updates.add(new ReaderGroupState.RemoveReader(readerId, lastPosition == null ? Collections.emptyMap() : lastPosition.asImpl().getOwnedSegmentsWithOffsets()));
        });
    }

    void close() {
        this.sync.close();
    }

    boolean handleEndOfSegment(SegmentWithRange segmentCompleted) throws ReaderNotInReaderGroupException {
        Map<Object, Object> segmentToPredecessor;
        if (this.sync.getState().getEndSegments().containsKey(segmentCompleted.getSegment())) {
            segmentToPredecessor = Collections.emptyMap();
        } else {
            StreamSegmentsWithPredecessors successors = (StreamSegmentsWithPredecessors)Futures.getAndHandleExceptions(this.controller.getSuccessors(segmentCompleted.getSegment()), RuntimeException::new);
            segmentToPredecessor = successors.getSegmentToPredecessor();
        }
        AtomicBoolean reinitRequired = new AtomicBoolean(false);
        boolean result = (Boolean)this.sync.updateState((state, updates) -> {
            if (!state.isReaderOnline(this.readerId)) {
                log.error("Reader " + this.readerId + " is offline according to the state but is attempting to update it.");
                reinitRequired.set(true);
                return false;
            }
            if (!state.getSegments(this.readerId).contains(segmentCompleted.getSegment())) {
                log.error("Reader " + this.readerId + " is does not own the segment " + segmentCompleted + "but is attempting to release it.");
                reinitRequired.set(true);
                return false;
            }
            log.debug("Marking segment {} as completed in reader group. CurrentState is: {}", (Object)segmentCompleted, state);
            reinitRequired.set(false);
            if (state.getCheckpointForReader(this.readerId) == null) {
                updates.add(new ReaderGroupState.SegmentCompleted(this.readerId, segmentCompleted, segmentToPredecessor));
                return true;
            }
            return false;
        });
        if (reinitRequired.get()) {
            throw new ReaderNotInReaderGroupException(this.readerId);
        }
        this.acquireTimer.zero();
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Segment findSegmentToReleaseIfRequired() {
        this.fetchUpdatesIfNeeded();
        Segment segment = null;
        Object object = this.decisionLock;
        synchronized (object) {
            if (!this.releaseTimer.hasRemaining() && this.sync.getState().getCheckpointForReader(this.readerId) == null && this.doesReaderOwnTooManySegments(this.sync.getState()) && (segment = this.findSegmentToRelease()) != null) {
                this.releaseTimer.reset(UPDATE_WINDOW);
                this.acquireTimer.reset(UPDATE_WINDOW);
            }
        }
        return segment;
    }

    private boolean doesReaderOwnTooManySegments(ReaderGroupState state) {
        Map<String, Double> sizesOfAssignemnts = state.getRelativeSizes();
        Set<Segment> assignedSegments = state.getSegments(this.readerId);
        if (sizesOfAssignemnts.isEmpty() || assignedSegments == null || assignedSegments.size() <= 1) {
            return false;
        }
        double min = sizesOfAssignemnts.values().stream().min(Double::compareTo).get();
        return sizesOfAssignemnts.get(this.readerId) > min + (double)Math.max(1, state.getNumberOfUnassignedSegments());
    }

    private Segment findSegmentToRelease() {
        Set<Segment> segments = this.sync.getState().getSegments(this.readerId);
        return segments.stream().max((s1, s2) -> Double.compare(this.hashHelper.hashToRange(s1.getScopedName()), this.hashHelper.hashToRange(s2.getScopedName()))).orElse(null);
    }

    long getEndOffsetForSegment(Segment segment) {
        return Optional.ofNullable(this.sync.getState().getEndSegments().get(segment)).orElse(Long.MAX_VALUE);
    }

    boolean reachedEndOfStream() {
        this.fetchUpdatesIfNeeded();
        return this.sync.getState().isEndOfData();
    }

    boolean releaseSegment(Segment segment, long lastOffset, long timeLag, Position position) throws ReaderNotInReaderGroupException {
        this.sync.updateState((state, updates) -> {
            Set<Segment> segments = state.getSegments(this.readerId);
            if (segments != null && segments.contains(segment) && state.getCheckpointForReader(this.readerId) == null && this.doesReaderOwnTooManySegments((ReaderGroupState)state)) {
                updates.add(new ReaderGroupState.ReleaseSegment(this.readerId, segment, lastOffset));
                updates.add(new ReaderGroupState.UpdateDistanceToTail(this.readerId, timeLag, position.asImpl().getOwnedSegmentRangesWithOffsets()));
            }
        });
        ReaderGroupState state2 = this.sync.getState();
        this.releaseTimer.reset(ReaderGroupStateManager.calculateReleaseTime(this.readerId, state2));
        this.acquireTimer.reset(ReaderGroupStateManager.calculateAcquireTime(this.readerId, state2));
        this.resetLagUpdateTimer();
        if (!state2.isReaderOnline(this.readerId)) {
            throw new ReaderNotInReaderGroupException(this.readerId);
        }
        return !state2.getSegments(this.readerId).contains(segment);
    }

    @VisibleForTesting
    static Duration calculateReleaseTime(String readerId, ReaderGroupState state) {
        return TIME_UNIT.multipliedBy(1 + state.getRanking(readerId));
    }

    Map<SegmentWithRange, Long> acquireNewSegmentsIfNeeded(long timeLag, Position position) throws ReaderNotInReaderGroupException {
        this.fetchUpdatesIfNeeded();
        if (this.shouldAcquireSegment()) {
            return this.acquireSegment(timeLag, position);
        }
        return Collections.emptyMap();
    }

    boolean canUpdateLagIfNeeded() {
        return !this.fetchStateTimer.hasRemaining();
    }

    boolean updateLagIfNeeded(long timeLag, Position position) {
        if (!this.lagUpdateTimer.hasRemaining()) {
            log.debug("Update lag for reader {}", (Object)this.readerId);
            this.resetLagUpdateTimer();
            this.sync.updateStateUnconditionally(new ReaderGroupState.UpdateDistanceToTail(this.readerId, timeLag, position.asImpl().getOwnedSegmentRangesWithOffsets()));
            this.resetFetchUpdateTimer();
            this.sync.fetchUpdates();
            return true;
        }
        return false;
    }

    private void resetFetchUpdateTimer() {
        long groupRefreshTimeMillis = this.sync.getState().getConfig().getGroupRefreshTimeMillis();
        this.fetchStateTimer.reset(Duration.ofMillis(groupRefreshTimeMillis));
    }

    private void resetLagUpdateTimer() {
        this.lagUpdateTimer.reset(TIME_UNIT.multipliedBy(this.sync.getState().getOnlineReaders().size() + 1));
    }

    private void fetchUpdatesIfNeeded() {
        if (!this.fetchStateTimer.hasRemaining()) {
            log.debug("Update group state for reader {}", (Object)this.readerId);
            this.sync.fetchUpdates();
            this.resetFetchUpdateTimer();
            this.compactIfNeeded();
        }
    }

    void updateConfigIfNeeded() {
        if (!this.updateConfigTimer.hasRemaining()) {
            this.fetchUpdatesIfNeeded();
            ReaderGroupState state = this.sync.getState();
            if (state.isUpdatingConfig()) {
                ReaderGroupConfig controllerConfig = (ReaderGroupConfig)Futures.getThrowingException(this.controller.getReaderGroupConfig(this.scope, this.groupName));
                log.debug("Updating the readergroup {} with the new config {} obtained from the controller", (Object)this.groupName, (Object)controllerConfig);
                Map<SegmentWithRange, Long> segments = ReaderGroupImpl.getSegmentsForStreams(this.controller, controllerConfig);
                this.sync.updateState((s, updates) -> {
                    if (s.getConfig().getGeneration() < controllerConfig.getGeneration()) {
                        updates.add(new ReaderGroupState.ReaderGroupStateInit(controllerConfig, segments, ReaderGroupImpl.getEndSegmentsForStreams(controllerConfig), false));
                    }
                });
            }
            this.updateConfigTimer.reset(UPDATE_CONFIG_WINDOW);
        }
    }

    private void compactIfNeeded() {
        if (this.sync.bytesWrittenSinceCompaction() > 524288L && Math.random() < 0.05) {
            log.debug("Compacting reader group state {}", (Object)this.sync.getState());
            this.sync.compact(ReaderGroupState.CompactReaderGroupState::new);
        }
    }

    boolean canAcquireSegmentIfNeeded() {
        return !this.acquireTimer.hasRemaining();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean shouldAcquireSegment() throws ReaderNotInReaderGroupException {
        Object object = this.decisionLock;
        synchronized (object) {
            if (this.acquireTimer.hasRemaining()) {
                return false;
            }
            ReaderGroupState state = this.sync.getState();
            if (!state.isReaderOnline(this.readerId)) {
                throw new ReaderNotInReaderGroupException(this.readerId);
            }
            if (state.getCheckpointForReader(this.readerId) != null) {
                return false;
            }
            if (state.getNumberOfUnassignedSegments() == 0) {
                this.acquireTimer.reset(ReaderGroupStateManager.calculateAcquireTime(this.readerId, state));
                return false;
            }
            this.acquireTimer.reset(UPDATE_WINDOW);
            this.releaseTimer.reset(UPDATE_WINDOW);
            return true;
        }
    }

    private Map<SegmentWithRange, Long> acquireSegment(long timeLag, Position position) throws ReaderNotInReaderGroupException {
        AtomicBoolean reinitRequired = new AtomicBoolean();
        Map result = (Map)this.sync.updateState((state, updates) -> {
            if (!state.isReaderOnline(this.readerId)) {
                reinitRequired.set(true);
                return Collections.emptyMap();
            }
            reinitRequired.set(false);
            if (state.getCheckpointForReader(this.readerId) != null) {
                return Collections.emptyMap();
            }
            int toAcquire = this.calculateNumSegmentsToAcquire((ReaderGroupState)state);
            if (toAcquire == 0) {
                return Collections.emptyMap();
            }
            Map<SegmentWithRange, Long> unassignedSegments = state.getUnassignedSegments();
            HashMap<SegmentWithRange, Long> acquired = new HashMap<SegmentWithRange, Long>(toAcquire);
            Iterator<Map.Entry<SegmentWithRange, Long>> iter = unassignedSegments.entrySet().iterator();
            for (int i = 0; i < toAcquire; ++i) {
                assert (iter.hasNext());
                Map.Entry<SegmentWithRange, Long> segment = iter.next();
                acquired.put(segment.getKey(), segment.getValue());
                updates.add(new ReaderGroupState.AcquireSegment(this.readerId, segment.getKey().getSegment()));
            }
            updates.add(new ReaderGroupState.UpdateDistanceToTail(this.readerId, timeLag, position.asImpl().getOwnedSegmentRangesWithOffsets()));
            return acquired;
        });
        if (reinitRequired.get()) {
            throw new ReaderNotInReaderGroupException(this.readerId);
        }
        this.releaseTimer.reset(ReaderGroupStateManager.calculateReleaseTime(this.readerId, this.sync.getState()));
        this.acquireTimer.reset(ReaderGroupStateManager.calculateAcquireTime(this.readerId, this.sync.getState()));
        this.resetLagUpdateTimer();
        return result;
    }

    private int calculateNumSegmentsToAcquire(ReaderGroupState state) {
        int unassignedSegments = state.getNumberOfUnassignedSegments();
        if (unassignedSegments == 0) {
            return 0;
        }
        int numSegments = state.getNumberOfSegments();
        int segmentsOwned = state.getSegments(this.readerId).size();
        int numReaders = state.getNumberOfReaders();
        int equallyDistributed = unassignedSegments / numReaders;
        int fairlyDistributed = Math.min(unassignedSegments, Math.round((float)numSegments / (float)numReaders) - segmentsOwned);
        return Math.max(Math.max(equallyDistributed, fairlyDistributed), 1);
    }

    @VisibleForTesting
    static Duration calculateAcquireTime(String readerId, ReaderGroupState state) {
        int multiplier = state.getNumberOfReaders() - state.getRanking(readerId);
        Preconditions.checkArgument((multiplier >= 1 ? 1 : 0) != 0, (Object)"Invalid acquire timer multiplier");
        return TIME_UNIT.multipliedBy(multiplier);
    }

    String getCheckpoint() throws ReaderNotInReaderGroupException {
        this.fetchUpdatesIfNeeded();
        ReaderGroupState state = this.sync.getState();
        long automaticCpInterval = state.getConfig().getAutomaticCheckpointIntervalMillis();
        if (!state.isReaderOnline(this.readerId)) {
            throw new ReaderNotInReaderGroupException(this.readerId);
        }
        String checkpoint = state.getCheckpointForReader(this.readerId);
        if (checkpoint != null) {
            this.checkpointTimer.reset(Duration.ofMillis(automaticCpInterval));
            return checkpoint;
        }
        if (automaticCpInterval <= 0L || this.checkpointTimer.hasRemaining() || state.hasOngoingCheckpoint()) {
            return null;
        }
        this.sync.updateState((s, u) -> {
            if (!s.hasOngoingCheckpoint()) {
                ReaderGroupState.CreateCheckpoint newCp = new ReaderGroupState.CreateCheckpoint();
                u.add(newCp);
                u.add(new ReaderGroupState.ClearCheckpointsBefore(newCp.getCheckpointId()));
                log.debug("Created new checkpoint: {} currentState is: {}", (Object)newCp, s);
            }
        });
        this.checkpointTimer.reset(Duration.ofMillis(automaticCpInterval));
        return state.getCheckpointForReader(this.readerId);
    }

    void checkpoint(String checkpointName, PositionInternal lastPosition) throws ReaderNotInReaderGroupException {
        AtomicBoolean reinitRequired = new AtomicBoolean(false);
        this.sync.updateState((state, updates) -> {
            if (!state.isReaderOnline(this.readerId)) {
                reinitRequired.set(true);
            } else {
                reinitRequired.set(false);
                String cpName = state.getCheckpointForReader(this.readerId);
                if (checkpointName.equals(cpName)) {
                    updates.add(new ReaderGroupState.CheckpointReader(checkpointName, this.readerId, lastPosition.getOwnedSegmentsWithOffsets()));
                } else {
                    log.warn("{} was asked to checkpoint for {} but the state says its next checkpoint should be {}", new Object[]{this.readerId, checkpointName, cpName});
                }
            }
        });
        if (reinitRequired.get()) {
            throw new ReaderNotInReaderGroupException(this.readerId);
        }
    }

    boolean isCheckpointSilent(String atCheckpoint) {
        return this.sync.getState().getCheckpointState().isCheckpointSilent(atCheckpoint);
    }

    void updateTruncationStreamCutIfNeeded() {
        ReaderGroupState state = this.sync.getState();
        if (state.getConfig().getRetentionType().equals((Object)ReaderGroupConfig.StreamDataRetention.AUTOMATIC_RELEASE_AT_LAST_CHECKPOINT) && !state.getCheckpointState().isLastCheckpointPublished()) {
            Optional<Map<Stream, Map<Segment, Long>>> cuts = state.getPositionsForLastCompletedCheckpoint();
            cuts.orElseThrow(() -> new CheckpointFailedException("Could not get positions for last checkpoint.")).entrySet().forEach(entry -> this.controller.updateSubscriberStreamCut(((Stream)entry.getKey()).getScope(), ((Stream)entry.getKey()).getStreamName(), NameUtils.getScopedReaderGroupName((String)this.scope, (String)this.groupName), state.getConfig().getReaderGroupId(), state.getConfig().getGeneration(), new StreamCutImpl((Stream)entry.getKey(), (Map)entry.getValue())));
            this.sync.updateStateUnconditionally(new ReaderGroupState.UpdateCheckpointPublished(true));
        }
    }

    Set<Stream> getStreams() {
        return Collections.unmodifiableSet(this.sync.getState().getConfig().getStartingStreamCuts().keySet());
    }

    Map<SegmentWithRange, Long> getLastReadpositions(Stream stream) {
        return this.sync.getState().getLastReadPositions(stream);
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String getReaderId() {
        return this.readerId;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String getScope() {
        return this.scope;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String getGroupName() {
        return this.groupName;
    }
}

