/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.SynchronizerClientFactory;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.control.impl.ReaderGroupConfigRejectedException;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.state.InitialUpdate;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.state.SynchronizerConfig;
import io.pravega.client.state.Update;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReaderGroupMetrics;
import io.pravega.client.stream.ReaderSegmentDistribution;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.CheckpointFailedException;
import io.pravega.client.stream.impl.CheckpointImpl;
import io.pravega.client.stream.impl.CheckpointState;
import io.pravega.client.stream.impl.MaxNumberOfCheckpointsExceededException;
import io.pravega.client.stream.impl.ReaderGroupState;
import io.pravega.client.stream.impl.ReaderGroupStateManager;
import io.pravega.client.stream.impl.SegmentWithRange;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.client.stream.impl.StreamSegmentSuccessors;
import io.pravega.client.stream.notifications.EndOfDataNotification;
import io.pravega.client.stream.notifications.NotificationSystem;
import io.pravega.client.stream.notifications.NotifierFactory;
import io.pravega.client.stream.notifications.Observable;
import io.pravega.client.stream.notifications.SegmentNotification;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.concurrent.SequentialProcessor;
import io.pravega.shared.NameUtils;
import io.pravega.shared.security.auth.AccessOperation;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ReaderGroupImpl
implements ReaderGroup,
ReaderGroupMetrics {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReaderGroupImpl.class);
    static final String SILENT = "_SILENT_";
    private final String scope;
    private final String groupName;
    private final Controller controller;
    private final SegmentMetadataClientFactory metaFactory;
    private final StateSynchronizer<ReaderGroupState> synchronizer;
    private final NotifierFactory notifierFactory;
    @VisibleForTesting
    private final ConnectionPool connectionPool;
    @VisibleForTesting
    private final SequentialProcessor sequentialProcessor;

    public ReaderGroupImpl(String scope, String groupName, SynchronizerConfig synchronizerConfig, Serializer<InitialUpdate<ReaderGroupState>> initSerializer, Serializer<Update<ReaderGroupState>> updateSerializer, SynchronizerClientFactory clientFactory, Controller controller, ConnectionPool connectionPool) {
        Preconditions.checkNotNull((Object)synchronizerConfig);
        Preconditions.checkNotNull(initSerializer);
        Preconditions.checkNotNull(updateSerializer);
        Preconditions.checkNotNull((Object)clientFactory);
        this.connectionPool = (ConnectionPool)Preconditions.checkNotNull((Object)connectionPool);
        this.sequentialProcessor = new SequentialProcessor((Executor)connectionPool.getInternalExecutor());
        this.scope = (String)Preconditions.checkNotNull((Object)scope);
        this.groupName = (String)Preconditions.checkNotNull((Object)groupName);
        this.controller = (Controller)Preconditions.checkNotNull((Object)controller);
        this.metaFactory = new SegmentMetadataClientFactoryImpl(controller, connectionPool);
        this.synchronizer = clientFactory.createStateSynchronizer(NameUtils.getStreamForReaderGroup((String)groupName), updateSerializer, initSerializer, synchronizerConfig);
        this.notifierFactory = new NotifierFactory(new NotificationSystem(), this.synchronizer);
    }

    @Override
    public void updateRetentionStreamCut(Map<Stream, StreamCut> streamCuts) {
        this.synchronizer.fetchUpdates();
        if (this.synchronizer.getState().getConfig().getRetentionType().equals((Object)ReaderGroupConfig.StreamDataRetention.MANUAL_RELEASE_AT_USER_STREAMCUT)) {
            streamCuts.forEach((stream, cut) -> Futures.getThrowingException(this.controller.updateSubscriberStreamCut(stream.getScope(), stream.getStreamName(), NameUtils.getScopedReaderGroupName((String)this.scope, (String)this.groupName), this.synchronizer.getState().getConfig().getReaderGroupId(), this.synchronizer.getState().getConfig().getGeneration(), (StreamCut)cut)));
            return;
        }
        throw new UnsupportedOperationException("Operation not allowed when ReaderGroup retentionConfig is set to " + this.synchronizer.getState().getConfig().getRetentionType().toString());
    }

    @Override
    public void readerOffline(String readerId, Position lastPosition) {
        ReaderGroupStateManager.readerShutdown(readerId, lastPosition, this.synchronizer);
    }

    @Override
    public Set<String> getOnlineReaders() {
        this.synchronizer.fetchUpdates();
        return this.synchronizer.getState().getOnlineReaders();
    }

    @Override
    public Set<String> getStreamNames() {
        this.synchronizer.fetchUpdates();
        return this.synchronizer.getState().getStreamNames();
    }

    @Override
    public CompletableFuture<Checkpoint> initiateCheckpoint(String checkpointName, ScheduledExecutorService backgroundExecutor) {
        String rejectMessage = "rejecting checkpoint request since pending checkpoint reaches max allowed limit";
        boolean canPerformCheckpoint = (Boolean)this.synchronizer.updateState((state, updates) -> {
            ReaderGroupConfig config = state.getConfig();
            CheckpointState checkpointState = state.getCheckpointState();
            int maxOutstandingCheckpointRequest = config.getMaxOutstandingCheckpointRequest();
            List<String> outstandingCheckpoints = checkpointState.getOutstandingCheckpoints();
            int currentOutstandingCheckpointRequest = outstandingCheckpoints.size();
            if (currentOutstandingCheckpointRequest >= maxOutstandingCheckpointRequest) {
                log.warn("Current outstanding checkpoints are : {}, maxOutstandingCheckpointRequest: {}, currentOutstandingCheckpointRequest: {}, errorMessage: {} {}", new Object[]{outstandingCheckpoints, maxOutstandingCheckpointRequest, currentOutstandingCheckpointRequest, rejectMessage, maxOutstandingCheckpointRequest});
                return false;
            }
            updates.add(new ReaderGroupState.CreateCheckpoint(checkpointName));
            return true;
        });
        if (!canPerformCheckpoint) {
            return Futures.failedFuture((Throwable)new MaxNumberOfCheckpointsExceededException(rejectMessage));
        }
        return ((CompletableFuture)this.waitForCheckpointComplete(checkpointName, backgroundExecutor).thenApply(v -> this.completeCheckpoint(checkpointName))).thenApply(checkpoint -> checkpoint);
    }

    @Override
    public CompletableFuture<Checkpoint> initiateCheckpoint(String checkpointName) {
        return this.initiateCheckpoint(checkpointName, this.connectionPool.getInternalExecutor());
    }

    private CompletableFuture<Void> waitForCheckpointComplete(String checkpointName, ScheduledExecutorService backgroundExecutor) {
        AtomicBoolean checkpointPending = new AtomicBoolean(true);
        return Futures.loop(checkpointPending::get, () -> Futures.delayedTask(() -> {
            this.sequentialProcessor.add(() -> {
                this.synchronizer.fetchUpdates();
                checkpointPending.set(!this.synchronizer.getState().isCheckpointComplete(checkpointName));
                if (checkpointPending.get()) {
                    log.info("Waiting on checkpoint: {} currentState is: {}", (Object)checkpointName, (Object)this.synchronizer.getState());
                }
                return CompletableFuture.completedFuture(null);
            });
            return null;
        }, (Duration)Duration.ofMillis(500L), (ScheduledExecutorService)backgroundExecutor), (Executor)backgroundExecutor);
    }

    private Checkpoint completeCheckpoint(String checkpointName) {
        ReaderGroupState state = this.synchronizer.getState();
        Map<Segment, Long> map = state.getPositionsForCompletedCheckpoint(checkpointName);
        this.synchronizer.updateStateUnconditionally(new ReaderGroupState.ClearCheckpointsBefore(checkpointName));
        if (map == null) {
            throw new CheckpointFailedException("Checkpoint was cleared before results could be read.");
        }
        if (map.isEmpty()) {
            log.info("All the events between start and end of stream cuts are already read by {}, nothing more to read", (Object)this.getGroupName());
        }
        return new CheckpointImpl(checkpointName, map);
    }

    @Override
    public void resetReaderGroup() {
        ReaderGroupConfig latestCheckpointConfig;
        log.info("Reset ReaderGroup {} to successfully last completed checkpoint", (Object)this.getGroupName());
        this.synchronizer.fetchUpdates();
        ReaderGroupConfig config = latestCheckpointConfig = this.synchronizer.getState().getConfig();
        Optional<Map<Stream, Map<Segment, Long>>> lastCheckPointPositions = this.synchronizer.getState().getPositionsForLastCompletedCheckpoint();
        if (lastCheckPointPositions.isPresent()) {
            HashMap<Stream, StreamCut> streamCuts = new HashMap<Stream, StreamCut>();
            for (Map.Entry<Stream, Map<Segment, Long>> streamPosition : lastCheckPointPositions.get().entrySet()) {
                streamCuts.put(streamPosition.getKey(), new StreamCutImpl(streamPosition.getKey(), streamPosition.getValue()));
            }
            config = latestCheckpointConfig.toBuilder().startingStreamCuts(streamCuts).build();
        } else {
            log.info("Reset reader group to last completed checkpoint is not successful as there is no checkpoint available, so resetting to start of stream cut. ");
        }
        this.resetReaderGroup(config);
    }

    @Override
    public void resetReaderGroup(ReaderGroupConfig config) {
        block3: {
            long newGen;
            ReaderGroupConfig newConfig;
            log.info("Reset ReaderGroup {} to {}", (Object)this.getGroupName(), (Object)config);
            this.synchronizer.fetchUpdates();
            while (true) {
                ReaderGroupConfig currentConfig;
                if (!this.stateTransition(currentConfig = this.synchronizer.getState().getConfig(), new ReaderGroupState.UpdatingConfig(true))) {
                    continue;
                }
                if (currentConfig.getReaderGroupId() == ReaderGroupConfig.DEFAULT_UUID && currentConfig.getGeneration() == -1L) {
                    ReaderGroupConfig updateConfig = ReaderGroupConfig.cloneConfig(config, UUID.randomUUID(), 0L);
                    long nextGen = (Long)Futures.getThrowingException((Future)((Object)this.controller.createReaderGroup(this.scope, this.getGroupName(), updateConfig).thenCompose(conf -> {
                        if (!conf.getReaderGroupId().equals(updateConfig.getReaderGroupId())) {
                            return this.controller.updateReaderGroup(this.scope, this.groupName, ReaderGroupConfig.cloneConfig(updateConfig, conf.getReaderGroupId(), conf.getGeneration()));
                        }
                        return CompletableFuture.completedFuture(conf.getGeneration());
                    })));
                    this.updateConfigInStateSynchronizer(updateConfig, nextGen);
                    break block3;
                }
                newConfig = ReaderGroupConfig.cloneConfig(config, currentConfig.getReaderGroupId(), currentConfig.getGeneration());
                newGen = (Long)Futures.exceptionallyExpecting(this.controller.updateReaderGroup(this.scope, this.groupName, newConfig), e -> Exceptions.unwrap((Throwable)e) instanceof ReaderGroupConfigRejectedException, (Object)-1L).join();
                if (newGen != -1L) break;
                log.debug("Synchronize reader group with the one present on controller.");
                this.synchronizeReaderGroupConfig();
            }
            this.updateConfigInStateSynchronizer(newConfig, newGen);
        }
    }

    private void updateConfigInStateSynchronizer(ReaderGroupConfig config, long newGen) {
        Map<SegmentWithRange, Long> segments = ReaderGroupImpl.getSegmentsForStreams(this.controller, config);
        this.synchronizer.updateState((s, updates) -> updates.add(new ReaderGroupState.ReaderGroupStateInit(ReaderGroupConfig.cloneConfig(config, config.getReaderGroupId(), newGen), segments, ReaderGroupImpl.getEndSegmentsForStreams(config), false)));
    }

    private void synchronizeReaderGroupConfig() {
        ReaderGroupConfig controllerConfig = (ReaderGroupConfig)Futures.getThrowingException(this.controller.getReaderGroupConfig(this.scope, this.groupName));
        Map<SegmentWithRange, Long> segments = ReaderGroupImpl.getSegmentsForStreams(this.controller, controllerConfig);
        this.synchronizer.updateState((s, updates) -> {
            if (s.getConfig().getGeneration() < controllerConfig.getGeneration()) {
                updates.add(new ReaderGroupState.ReaderGroupStateInit(controllerConfig, segments, ReaderGroupImpl.getEndSegmentsForStreams(controllerConfig), false));
            }
        });
    }

    private boolean stateTransition(ReaderGroupConfig config, ReaderGroupState.ReaderGroupStateUpdate update) {
        AtomicBoolean successfullyUpdated = new AtomicBoolean(true);
        this.synchronizer.updateState((state, updates) -> {
            boolean updated = state.getConfig().equals(config);
            successfullyUpdated.set(updated);
            if (updated) {
                updates.add(update);
            }
        });
        return successfullyUpdated.get();
    }

    @Override
    public ReaderSegmentDistribution getReaderSegmentDistribution() {
        this.synchronizer.fetchUpdates();
        ReaderGroupState state = this.synchronizer.getState();
        ImmutableMap.Builder mapBuilder = ImmutableMap.builder();
        state.getOnlineReaders().forEach(reader -> {
            Map<SegmentWithRange, Long> assigned = state.getAssignedSegments((String)reader);
            int size = assigned != null ? assigned.size() : 0;
            mapBuilder.put(reader, (Object)size);
        });
        int unassigned = state.getNumberOfUnassignedSegments();
        ImmutableMap readerDistribution = mapBuilder.build();
        log.info("ReaderGroup {} has unassigned segments count = {} and segment distribution as {}", new Object[]{this.getGroupName(), unassigned, readerDistribution});
        return ReaderSegmentDistribution.builder().readerSegmentDistribution((Map<String, Integer>)readerDistribution).unassignedSegments(unassigned).build();
    }

    @VisibleForTesting
    public static Map<SegmentWithRange, Long> getSegmentsForStreams(Controller controller, ReaderGroupConfig config) {
        Map<Stream, StreamCut> streamToStreamCuts = config.getStartingStreamCuts();
        ArrayList futures = new ArrayList(streamToStreamCuts.size());
        streamToStreamCuts.entrySet().forEach(e -> {
            if (((StreamCut)e.getValue()).equals(StreamCut.UNBOUNDED)) {
                futures.add(controller.getSegmentsAtTime((Stream)e.getKey(), 0L));
            } else {
                futures.add(CompletableFuture.completedFuture(((StreamCut)e.getValue()).asImpl().getPositions()));
            }
        });
        return (Map)Futures.getAndHandleExceptions((Future)((Object)Futures.allOfWithResults(futures).thenApply(listOfMaps -> listOfMaps.stream().flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(e -> new SegmentWithRange((Segment)e.getKey(), null), e -> (Long)e.getValue())))), InvalidStreamException::new);
    }

    public static Map<Segment, Long> getEndSegmentsForStreams(ReaderGroupConfig config) {
        List listOfMaps = config.getEndingStreamCuts().entrySet().stream().filter(e -> !((StreamCut)e.getValue()).equals(StreamCut.UNBOUNDED)).map(e -> ((StreamCut)e.getValue()).asImpl().getPositions()).collect(Collectors.toList());
        return listOfMaps.stream().flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, entry -> (Long)entry.getValue() == -1L ? Long.valueOf(Long.MAX_VALUE) : (Long)entry.getValue()));
    }

    @Override
    public ReaderGroupMetrics getMetrics() {
        return this;
    }

    @Override
    public long unreadBytes() {
        this.synchronizer.fetchUpdates();
        Optional<Map<Stream, Map<Segment, Long>>> checkPointedPositions = this.synchronizer.getState().getPositionsForLastCompletedCheckpoint();
        if (checkPointedPositions.isPresent()) {
            log.debug("Computing unread bytes based on the last checkPoint position");
            return this.getUnreadBytes(checkPointedPositions.get(), this.synchronizer.getState().getEndSegments());
        }
        log.info("No checkpoints found, using the last known offset to compute unread bytes");
        return this.getUnreadBytesIgnoringRange(this.synchronizer.getState().getPositions(), this.synchronizer.getState().getEndSegments());
    }

    private long getUnreadBytes(Map<Stream, Map<Segment, Long>> positions, Map<Segment, Long> endSegments) {
        log.debug("Compute unread bytes from position {}", positions);
        ArrayList<CompletableFuture<Long>> futures = new ArrayList<CompletableFuture<Long>>(positions.size());
        for (Map.Entry<Stream, Map<Segment, Long>> streamPosition : positions.entrySet()) {
            StreamCutImpl fromStreamCut = new StreamCutImpl(streamPosition.getKey(), streamPosition.getValue());
            StreamCut toStreamCut = this.computeEndStreamCut(streamPosition.getKey(), endSegments);
            futures.add(this.getRemainingBytes(streamPosition.getKey(), fromStreamCut, toStreamCut));
        }
        return (Long)Futures.getAndHandleExceptions((Future)((Object)Futures.allOfWithResults(futures).thenApply(listOfLong -> listOfLong.stream().mapToLong(i -> i).sum())), RuntimeException::new);
    }

    private long getUnreadBytesIgnoringRange(Map<Stream, Map<SegmentWithRange, Long>> positions, Map<Segment, Long> endSegments) {
        log.debug("Compute unread bytes from position {}", positions);
        long totalLength = 0L;
        for (Map.Entry<Stream, Map<SegmentWithRange, Long>> streamPosition : positions.entrySet()) {
            StreamCutImpl fromStreamCut = new StreamCutImpl(streamPosition.getKey(), this.dropRange(streamPosition.getValue()));
            StreamCut toStreamCut = this.computeEndStreamCut(streamPosition.getKey(), endSegments);
            totalLength += ((Long)Futures.getAndHandleExceptions(this.getRemainingBytes(streamPosition.getKey(), fromStreamCut, toStreamCut), RuntimeException::new)).longValue();
        }
        return totalLength;
    }

    private Map<Segment, Long> dropRange(Map<SegmentWithRange, Long> in) {
        return in.entrySet().stream().collect(Collectors.toMap(e -> ((SegmentWithRange)e.getKey()).getSegment(), e -> (Long)e.getValue()));
    }

    private StreamCut computeEndStreamCut(Stream stream, Map<Segment, Long> endSegments) {
        Map<Segment, Long> toPositions = endSegments.entrySet().stream().filter(e -> ((Segment)e.getKey()).getStream().equals(stream)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        return toPositions.isEmpty() ? StreamCut.UNBOUNDED : new StreamCutImpl(stream, toPositions);
    }

    private CompletableFuture<Long> getRemainingBytes(Stream stream, StreamCut fromStreamCut, StreamCut toStreamCut) {
        Map<Object, Object> endPositions;
        CompletableFuture<StreamSegmentSuccessors> unread;
        if (toStreamCut.equals(StreamCut.UNBOUNDED)) {
            unread = this.controller.getSuccessors(fromStreamCut);
            endPositions = Collections.emptyMap();
        } else {
            unread = this.controller.getSegments(fromStreamCut, toStreamCut);
            endPositions = toStreamCut.asImpl().getPositions();
        }
        return ((CompletableFuture)unread.thenCompose(unreadVal -> {
            DelegationTokenProvider tokenProvider = DelegationTokenProviderFactory.create(this.controller, stream.getScope(), stream.getStreamName(), AccessOperation.READ);
            return Futures.allOfWithResults(unreadVal.getSegments().stream().map(s -> {
                if (endPositions.containsKey(s)) {
                    return CompletableFuture.completedFuture((Long)endPositions.get(s));
                }
                SegmentMetadataClient metadataClient = this.metaFactory.createSegmentMetadataClient((Segment)s, tokenProvider);
                CompletableFuture<Long> result = metadataClient.fetchCurrentSegmentLength();
                result.whenComplete((r, e) -> metadataClient.close());
                return result;
            }).collect(Collectors.toList()));
        })).thenApply(sizes -> {
            long totalLength = 0L;
            Iterator<Object> iterator = sizes.iterator();
            while (iterator.hasNext()) {
                long bytesRemaining = (Long)iterator.next();
                totalLength += bytesRemaining;
            }
            iterator = fromStreamCut.asImpl().getPositions().values().iterator();
            while (iterator.hasNext()) {
                long bytesRead = (Long)iterator.next();
                totalLength -= bytesRead;
            }
            log.debug("Remaining bytes from position: {} to position: {} is {}", new Object[]{fromStreamCut, toStreamCut, totalLength});
            return totalLength;
        });
    }

    @Override
    public Observable<SegmentNotification> getSegmentNotifier(ScheduledExecutorService executor) {
        Preconditions.checkNotNull((Object)executor, (Object)"executor");
        return this.notifierFactory.getSegmentNotifier(executor);
    }

    @Override
    public Observable<EndOfDataNotification> getEndOfDataNotifier(ScheduledExecutorService executor) {
        Preconditions.checkNotNull((Object)executor, (Object)"executor");
        return this.notifierFactory.getEndOfDataNotifier(executor);
    }

    @Override
    @VisibleForTesting
    public Map<Stream, StreamCut> getStreamCuts() {
        this.synchronizer.fetchUpdates();
        ReaderGroupState state = this.synchronizer.getState();
        Map<Stream, Map<SegmentWithRange, Long>> positions = state.getPositions();
        HashMap<Stream, StreamCut> cuts = new HashMap<Stream, StreamCut>();
        for (Map.Entry<Stream, Map<SegmentWithRange, Long>> streamPosition : positions.entrySet()) {
            StreamCutImpl position = new StreamCutImpl(streamPosition.getKey(), this.dropRange(streamPosition.getValue()));
            cuts.put(streamPosition.getKey(), position);
        }
        return cuts;
    }

    @Override
    public CompletableFuture<Map<Stream, StreamCut>> generateStreamCuts(ScheduledExecutorService backgroundExecutor) {
        String checkpointId = this.generateSilentCheckpointId();
        log.debug("Fetching the current StreamCut using id {}", (Object)checkpointId);
        this.synchronizer.updateStateUnconditionally(new ReaderGroupState.CreateCheckpoint(checkpointId));
        return this.waitForCheckpointComplete(checkpointId, backgroundExecutor).thenApply(v -> this.completeCheckpointAndFetchStreamCut(checkpointId));
    }

    private String generateSilentCheckpointId() {
        byte[] randomBytes = new byte[32];
        ThreadLocalRandom.current().nextBytes(randomBytes);
        return Base64.getEncoder().encodeToString(randomBytes) + SILENT;
    }

    private Map<Stream, StreamCut> completeCheckpointAndFetchStreamCut(String checkPointId) {
        ReaderGroupState state = this.synchronizer.getState();
        Optional<Map<Stream, StreamCut>> cuts = state.getStreamCutsForCompletedCheckpoint(checkPointId);
        this.synchronizer.updateStateUnconditionally(new ReaderGroupState.ClearCheckpointsBefore(checkPointId));
        return cuts.orElseThrow(() -> new CheckpointFailedException("Internal CheckPoint was cleared before results could be read."));
    }

    @Override
    public void close() {
        this.synchronizer.close();
        this.sequentialProcessor.close();
    }

    @Override
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String getScope() {
        return this.scope;
    }

    @Override
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String getGroupName() {
        return this.groupName;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public Controller getController() {
        return this.controller;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public SegmentMetadataClientFactory getMetaFactory() {
        return this.metaFactory;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public StateSynchronizer<ReaderGroupState> getSynchronizer() {
        return this.synchronizer;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public NotifierFactory getNotifierFactory() {
        return this.notifierFactory;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public ConnectionPool getConnectionPool() {
        return this.connectionPool;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public SequentialProcessor getSequentialProcessor() {
        return this.sequentialProcessor;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof ReaderGroupImpl)) {
            return false;
        }
        ReaderGroupImpl other = (ReaderGroupImpl)o;
        String this$scope = this.getScope();
        String other$scope = other.getScope();
        if (this$scope == null ? other$scope != null : !this$scope.equals(other$scope)) {
            return false;
        }
        String this$groupName = this.getGroupName();
        String other$groupName = other.getGroupName();
        if (this$groupName == null ? other$groupName != null : !this$groupName.equals(other$groupName)) {
            return false;
        }
        Controller this$controller = this.getController();
        Controller other$controller = other.getController();
        if (this$controller == null ? other$controller != null : !this$controller.equals(other$controller)) {
            return false;
        }
        SegmentMetadataClientFactory this$metaFactory = this.getMetaFactory();
        SegmentMetadataClientFactory other$metaFactory = other.getMetaFactory();
        if (this$metaFactory == null ? other$metaFactory != null : !this$metaFactory.equals(other$metaFactory)) {
            return false;
        }
        StateSynchronizer<ReaderGroupState> this$synchronizer = this.getSynchronizer();
        StateSynchronizer<ReaderGroupState> other$synchronizer = other.getSynchronizer();
        if (this$synchronizer == null ? other$synchronizer != null : !this$synchronizer.equals(other$synchronizer)) {
            return false;
        }
        NotifierFactory this$notifierFactory = this.getNotifierFactory();
        NotifierFactory other$notifierFactory = other.getNotifierFactory();
        if (this$notifierFactory == null ? other$notifierFactory != null : !this$notifierFactory.equals(other$notifierFactory)) {
            return false;
        }
        ConnectionPool this$connectionPool = this.getConnectionPool();
        ConnectionPool other$connectionPool = other.getConnectionPool();
        if (this$connectionPool == null ? other$connectionPool != null : !this$connectionPool.equals(other$connectionPool)) {
            return false;
        }
        SequentialProcessor this$sequentialProcessor = this.getSequentialProcessor();
        SequentialProcessor other$sequentialProcessor = other.getSequentialProcessor();
        return !(this$sequentialProcessor == null ? other$sequentialProcessor != null : !this$sequentialProcessor.equals(other$sequentialProcessor));
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        String $scope = this.getScope();
        result = result * 59 + ($scope == null ? 43 : $scope.hashCode());
        String $groupName = this.getGroupName();
        result = result * 59 + ($groupName == null ? 43 : $groupName.hashCode());
        Controller $controller = this.getController();
        result = result * 59 + ($controller == null ? 43 : $controller.hashCode());
        SegmentMetadataClientFactory $metaFactory = this.getMetaFactory();
        result = result * 59 + ($metaFactory == null ? 43 : $metaFactory.hashCode());
        StateSynchronizer<ReaderGroupState> $synchronizer = this.getSynchronizer();
        result = result * 59 + ($synchronizer == null ? 43 : $synchronizer.hashCode());
        NotifierFactory $notifierFactory = this.getNotifierFactory();
        result = result * 59 + ($notifierFactory == null ? 43 : $notifierFactory.hashCode());
        ConnectionPool $connectionPool = this.getConnectionPool();
        result = result * 59 + ($connectionPool == null ? 43 : $connectionPool.hashCode());
        SequentialProcessor $sequentialProcessor = this.getSequentialProcessor();
        result = result * 59 + ($sequentialProcessor == null ? 43 : $sequentialProcessor.hashCode());
        return result;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String toString() {
        return "ReaderGroupImpl(scope=" + this.getScope() + ", groupName=" + this.getGroupName() + ", controller=" + this.getController() + ", metaFactory=" + this.getMetaFactory() + ", synchronizer=" + this.getSynchronizer() + ", notifierFactory=" + this.getNotifierFactory() + ", connectionPool=" + this.getConnectionPool() + ", sequentialProcessor=" + this.getSequentialProcessor() + ")";
    }
}

