/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.segmentstore.server.containers;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.ObjectClosedException;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.concurrent.Services;
import io.pravega.common.util.BufferView;
import io.pravega.common.util.Retry;
import io.pravega.segmentstore.contracts.AttributeUpdate;
import io.pravega.segmentstore.contracts.AttributeUpdateType;
import io.pravega.segmentstore.contracts.Attributes;
import io.pravega.segmentstore.contracts.BadAttributeUpdateException;
import io.pravega.segmentstore.contracts.MergeStreamSegmentResult;
import io.pravega.segmentstore.contracts.ReadResult;
import io.pravega.segmentstore.contracts.SegmentProperties;
import io.pravega.segmentstore.contracts.StreamSegmentMergedException;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.server.AttributeIterator;
import io.pravega.segmentstore.server.ContainerOfflineException;
import io.pravega.segmentstore.server.DirectSegmentAccess;
import io.pravega.segmentstore.server.IllegalContainerStateException;
import io.pravega.segmentstore.server.OperationLog;
import io.pravega.segmentstore.server.OperationLogFactory;
import io.pravega.segmentstore.server.ReadIndex;
import io.pravega.segmentstore.server.ReadIndexFactory;
import io.pravega.segmentstore.server.SegmentContainer;
import io.pravega.segmentstore.server.SegmentContainerExtension;
import io.pravega.segmentstore.server.SegmentContainerFactory;
import io.pravega.segmentstore.server.SegmentMetadata;
import io.pravega.segmentstore.server.SegmentOperation;
import io.pravega.segmentstore.server.SegmentStoreMetrics;
import io.pravega.segmentstore.server.UpdateableSegmentMetadata;
import io.pravega.segmentstore.server.Writer;
import io.pravega.segmentstore.server.WriterFactory;
import io.pravega.segmentstore.server.WriterSegmentProcessor;
import io.pravega.segmentstore.server.attributes.AttributeIndexFactory;
import io.pravega.segmentstore.server.attributes.ContainerAttributeIndex;
import io.pravega.segmentstore.server.containers.ContainerConfig;
import io.pravega.segmentstore.server.containers.MetadataCleaner;
import io.pravega.segmentstore.server.containers.MetadataStore;
import io.pravega.segmentstore.server.containers.SegmentAttributeIterator;
import io.pravega.segmentstore.server.containers.StreamSegmentContainerMetadata;
import io.pravega.segmentstore.server.containers.TableMetadataStore;
import io.pravega.segmentstore.server.logs.operations.AttributeUpdaterOperation;
import io.pravega.segmentstore.server.logs.operations.DeleteSegmentOperation;
import io.pravega.segmentstore.server.logs.operations.MergeSegmentOperation;
import io.pravega.segmentstore.server.logs.operations.Operation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentAppendOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentMapOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentSealOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentTruncateOperation;
import io.pravega.segmentstore.server.logs.operations.UpdateAttributesOperation;
import io.pravega.segmentstore.server.tables.ContainerTableExtension;
import io.pravega.segmentstore.storage.ReadOnlyStorage;
import io.pravega.segmentstore.storage.Storage;
import io.pravega.segmentstore.storage.StorageFactory;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StreamSegmentContainer
extends AbstractService
implements SegmentContainer {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(StreamSegmentContainer.class);
    private static final Retry.RetryAndThrowConditionally CACHE_ATTRIBUTES_RETRY = Retry.withExpBackoff((long)50L, (int)2, (int)10, (long)1000L).retryWhen(ex -> ex instanceof BadAttributeUpdateException);
    protected final StreamSegmentContainerMetadata metadata;
    private final String traceObjectId;
    private final OperationLog durableLog;
    private final ReadIndex readIndex;
    private final ContainerAttributeIndex attributeIndex;
    private final Writer writer;
    private final Storage storage;
    private final MetadataStore metadataStore;
    private final ScheduledExecutorService executor;
    private final MetadataCleaner metadataCleaner;
    private final AtomicBoolean closed;
    private final SegmentStoreMetrics.Container metrics;
    private final Map<Class<? extends SegmentContainerExtension>, ? extends SegmentContainerExtension> extensions;
    private final ContainerConfig config;

    StreamSegmentContainer(int streamSegmentContainerId, ContainerConfig config, OperationLogFactory durableLogFactory, ReadIndexFactory readIndexFactory, AttributeIndexFactory attributeIndexFactory, WriterFactory writerFactory, StorageFactory storageFactory, SegmentContainerFactory.CreateExtensions createExtensions, ScheduledExecutorService executor) {
        Preconditions.checkNotNull((Object)config, (Object)"config");
        Preconditions.checkNotNull((Object)durableLogFactory, (Object)"durableLogFactory");
        Preconditions.checkNotNull((Object)readIndexFactory, (Object)"readIndexFactory");
        Preconditions.checkNotNull((Object)writerFactory, (Object)"writerFactory");
        Preconditions.checkNotNull((Object)storageFactory, (Object)"storageFactory");
        Preconditions.checkNotNull((Object)executor, (Object)"executor");
        this.traceObjectId = String.format("SegmentContainer[%d]", streamSegmentContainerId);
        this.storage = storageFactory.createStorageAdapter();
        this.metadata = new StreamSegmentContainerMetadata(streamSegmentContainerId, config.getMaxActiveSegmentCount());
        this.readIndex = readIndexFactory.createReadIndex(this.metadata, (ReadOnlyStorage)this.storage);
        this.executor = executor;
        this.config = config;
        this.durableLog = durableLogFactory.createDurableLog(this.metadata, this.readIndex);
        this.shutdownWhenStopped(this.durableLog, "DurableLog");
        this.attributeIndex = attributeIndexFactory.createContainerAttributeIndex(this.metadata, this.storage);
        this.writer = writerFactory.createWriter(this.metadata, this.durableLog, this.readIndex, this.attributeIndex, this.storage, this::createWriterProcessors);
        this.shutdownWhenStopped(this.writer, "Writer");
        this.extensions = Collections.unmodifiableMap(createExtensions.apply(this, this.executor));
        this.metadataStore = this.createMetadataStore();
        this.metadataCleaner = new MetadataCleaner(config, this.metadata, this.metadataStore, this::notifyMetadataRemoved, this.executor, this.traceObjectId);
        this.shutdownWhenStopped((Service)this.metadataCleaner, "MetadataCleaner");
        this.metrics = new SegmentStoreMetrics.Container(streamSegmentContainerId);
        this.closed = new AtomicBoolean();
    }

    private MetadataStore createMetadataStore() {
        MetadataStore.Connector connector = new MetadataStore.Connector(this.metadata, this::mapSegmentId, this::deleteSegmentImmediate, this::deleteSegmentDelayed, this::runMetadataCleanup);
        ContainerTableExtension tableExtension = this.getExtension(ContainerTableExtension.class);
        Preconditions.checkArgument((tableExtension != null ? 1 : 0) != 0, (Object)"ContainerTableExtension required for initialization.");
        return new TableMetadataStore(connector, tableExtension, this.executor);
    }

    private Collection<WriterSegmentProcessor> createWriterProcessors(UpdateableSegmentMetadata segmentMetadata) {
        ImmutableList.Builder builder = ImmutableList.builder();
        this.extensions.values().forEach(p -> builder.addAll(p.createWriterSegmentProcessors(segmentMetadata)));
        return builder.build();
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.metadataStore.close();
            this.extensions.values().forEach(SegmentContainerExtension::close);
            Futures.await((CompletableFuture)Services.stopAsync((Service)this, (Executor)this.executor));
            this.metadataCleaner.close();
            this.writer.close();
            this.durableLog.close();
            this.readIndex.close();
            this.storage.close();
            this.metrics.close();
            log.info("{}: Closed.", (Object)this.traceObjectId);
        }
    }

    protected void doStart() {
        log.info("{}: Starting.", (Object)this.traceObjectId);
        ((CompletableFuture)Services.startAsync((Service)this.durableLog, (Executor)this.executor).thenComposeAsync(v -> this.startWhenDurableLogOnline(), (Executor)this.executor)).whenComplete((v, ex) -> {
            if (ex == null) {
                this.notifyStarted();
            } else {
                this.doStop((Throwable)ex);
            }
        });
    }

    private CompletableFuture<Void> startWhenDurableLogOnline() {
        CompletionStage<Void> delayedStart;
        CompletableFuture<Void> isReady;
        if (this.durableLog.isOffline()) {
            log.info("{}: DurableLog is OFFLINE. Not starting secondary services yet.", (Object)this.traceObjectId);
            isReady = CompletableFuture.completedFuture(null);
            delayedStart = this.durableLog.awaitOnline().thenComposeAsync(v -> this.initializeSecondaryServices(), (Executor)this.executor);
        } else {
            delayedStart = isReady = this.initializeSecondaryServices();
        }
        ((CompletableFuture)delayedStart.thenComposeAsync(v -> this.startSecondaryServicesAsync(), (Executor)this.executor)).whenComplete((v, ex) -> {
            if (ex == null) {
                log.info("{}: Started.", (Object)this.traceObjectId);
            } else if (!(Exceptions.unwrap((Throwable)ex) instanceof ObjectClosedException) || !Services.isTerminating((Service.State)this.state())) {
                this.doStop((Throwable)ex);
            }
        });
        return isReady;
    }

    private CompletableFuture<Void> initializeSecondaryServices() {
        this.storage.initialize(this.metadata.getContainerEpoch());
        return this.metadataStore.initialize(this.config.getMetadataStoreInitTimeout());
    }

    private CompletableFuture<Void> startSecondaryServicesAsync() {
        return CompletableFuture.allOf(Services.startAsync((Service)this.metadataCleaner, (Executor)this.executor), Services.startAsync((Service)this.writer, (Executor)this.executor));
    }

    protected void doStop() {
        this.doStop(null);
    }

    private void doStop(Throwable cause) {
        long traceId = LoggerHelpers.traceEnterWithContext((Logger)log, (String)this.traceObjectId, (String)"doStop", (Object[])new Object[0]);
        log.info("{}: Stopping.", (Object)this.traceObjectId);
        ((CompletableFuture)CompletableFuture.allOf(Services.stopAsync((Service)this.metadataCleaner, (Executor)this.executor), Services.stopAsync((Service)this.writer, (Executor)this.executor), Services.stopAsync((Service)this.durableLog, (Executor)this.executor)).whenCompleteAsync((r, ex) -> {
            Throwable failureCause = this.getFailureCause(new Service[]{this.durableLog, this.writer, this.metadataCleaner});
            if (failureCause == null) {
                failureCause = cause;
            } else if (cause != null && failureCause != cause) {
                failureCause.addSuppressed(cause);
            }
            if (failureCause == null) {
                log.info("{}: Stopped.", (Object)this.traceObjectId);
                LoggerHelpers.traceLeave((Logger)log, (String)this.traceObjectId, (String)"doStop", (long)traceId, (Object[])new Object[0]);
                this.notifyStopped();
            } else {
                log.warn("{}: Failed due to component failure.", (Object)this.traceObjectId);
                LoggerHelpers.traceLeave((Logger)log, (String)this.traceObjectId, (String)"doStop", (long)traceId, (Object[])new Object[0]);
                this.notifyFailed(failureCause);
            }
        }, (Executor)this.executor)).exceptionally(ex -> {
            this.notifyFailed((Throwable)ex);
            return null;
        });
    }

    private Throwable getFailureCause(Service ... services) {
        Throwable result = null;
        for (Service s : services) {
            if (s.state() != Service.State.FAILED) continue;
            Throwable realEx = Exceptions.unwrap((Throwable)s.failureCause());
            if (result == null) {
                result = realEx;
                continue;
            }
            result.addSuppressed(realEx);
        }
        return result;
    }

    @Override
    public int getId() {
        return this.metadata.getContainerId();
    }

    @Override
    public boolean isOffline() {
        return this.durableLog.isOffline();
    }

    public CompletableFuture<Long> append(String streamSegmentName, BufferView data, Collection<AttributeUpdate> attributeUpdates, Duration timeout) {
        this.ensureRunning();
        TimeoutTimer timer = new TimeoutTimer(timeout);
        this.logRequest("append", streamSegmentName, data.getLength());
        this.metrics.append();
        return this.metadataStore.getOrAssignSegmentId(streamSegmentName, timer.getRemaining(), streamSegmentId -> {
            StreamSegmentAppendOperation operation = new StreamSegmentAppendOperation((long)streamSegmentId, data, attributeUpdates);
            return this.processAppend(operation, timer).thenApply(v -> operation.getLastStreamSegmentOffset());
        });
    }

    public CompletableFuture<Long> append(String streamSegmentName, long offset, BufferView data, Collection<AttributeUpdate> attributeUpdates, Duration timeout) {
        this.ensureRunning();
        TimeoutTimer timer = new TimeoutTimer(timeout);
        this.logRequest("appendWithOffset", streamSegmentName, data.getLength());
        this.metrics.appendWithOffset();
        return this.metadataStore.getOrAssignSegmentId(streamSegmentName, timer.getRemaining(), streamSegmentId -> {
            StreamSegmentAppendOperation operation = new StreamSegmentAppendOperation((long)streamSegmentId, offset, data, attributeUpdates);
            return this.processAppend(operation, timer).thenApply(v -> operation.getLastStreamSegmentOffset());
        });
    }

    public CompletableFuture<Void> updateAttributes(String streamSegmentName, Collection<AttributeUpdate> attributeUpdates, Duration timeout) {
        this.ensureRunning();
        TimeoutTimer timer = new TimeoutTimer(timeout);
        this.logRequest("updateAttributes", streamSegmentName, attributeUpdates);
        this.metrics.updateAttributes();
        return this.metadataStore.getOrAssignSegmentId(streamSegmentName, timer.getRemaining(), streamSegmentId -> this.updateAttributesForSegment((long)streamSegmentId, attributeUpdates, timer.getRemaining()));
    }

    public CompletableFuture<Map<UUID, Long>> getAttributes(String streamSegmentName, Collection<UUID> attributeIds, boolean cache, Duration timeout) {
        this.ensureRunning();
        TimeoutTimer timer = new TimeoutTimer(timeout);
        this.logRequest("getAttributes", streamSegmentName, attributeIds);
        this.metrics.getAttributes();
        return this.metadataStore.getOrAssignSegmentId(streamSegmentName, timer.getRemaining(), streamSegmentId -> this.getAttributesForSegment((long)streamSegmentId, attributeIds, cache, timer));
    }

    public CompletableFuture<ReadResult> read(String streamSegmentName, long offset, int maxLength, Duration timeout) {
        this.ensureRunning();
        this.logRequest("read", streamSegmentName, offset, maxLength);
        this.metrics.read();
        TimeoutTimer timer = new TimeoutTimer(timeout);
        return this.metadataStore.getOrAssignSegmentId(streamSegmentName, timer.getRemaining(), streamSegmentId -> {
            try {
                return CompletableFuture.completedFuture(this.readIndex.read((long)streamSegmentId, offset, maxLength, timer.getRemaining()));
            }
            catch (StreamSegmentNotExistsException ex) {
                return Futures.failedFuture((Throwable)ex);
            }
        });
    }

    public CompletableFuture<SegmentProperties> getStreamSegmentInfo(String streamSegmentName, Duration timeout) {
        this.ensureRunning();
        this.logRequest("getStreamSegmentInfo", streamSegmentName);
        this.metrics.getInfo();
        return this.metadataStore.getSegmentInfo(streamSegmentName, timeout);
    }

    public CompletableFuture<Void> createStreamSegment(String streamSegmentName, Collection<AttributeUpdate> attributes, Duration timeout) {
        this.ensureRunning();
        this.logRequest("createStreamSegment", streamSegmentName);
        this.metrics.createSegment();
        return this.metadataStore.createSegment(streamSegmentName, attributes, timeout);
    }

    public CompletableFuture<Void> deleteStreamSegment(String streamSegmentName, Duration timeout) {
        this.ensureRunning();
        this.logRequest("deleteStreamSegment", streamSegmentName);
        this.metrics.deleteSegment();
        TimeoutTimer timer = new TimeoutTimer(timeout);
        long segmentId = this.metadata.getStreamSegmentId(streamSegmentName, false);
        UpdateableSegmentMetadata toDelete = this.metadata.getStreamSegmentMetadata(segmentId);
        return this.metadataStore.deleteSegment(streamSegmentName, timer.getRemaining()).thenAccept(deleted -> {
            if (!deleted.booleanValue()) {
                throw new CompletionException(new StreamSegmentNotExistsException(streamSegmentName));
            }
            if (toDelete != null) {
                this.notifyMetadataRemoved(Collections.singleton(toDelete));
            }
        });
    }

    public CompletableFuture<Void> truncateStreamSegment(String streamSegmentName, long offset, Duration timeout) {
        this.ensureRunning();
        this.logRequest("truncateStreamSegment", streamSegmentName);
        this.metrics.truncate();
        TimeoutTimer timer = new TimeoutTimer(timeout);
        return this.metadataStore.getOrAssignSegmentId(streamSegmentName, timer.getRemaining(), streamSegmentId -> this.truncate((long)streamSegmentId, offset, timer.getRemaining()));
    }

    public CompletableFuture<MergeStreamSegmentResult> mergeStreamSegment(String targetStreamSegment, String sourceStreamSegment, Duration timeout) {
        this.ensureRunning();
        this.logRequest("mergeStreamSegment", targetStreamSegment, sourceStreamSegment);
        this.metrics.mergeSegment();
        TimeoutTimer timer = new TimeoutTimer(timeout);
        return this.metadataStore.getOrAssignSegmentId(targetStreamSegment, timer.getRemaining(), targetSegmentId -> this.metadataStore.getOrAssignSegmentId(sourceStreamSegment, timer.getRemaining(), sourceSegmentId -> this.mergeStreamSegment((long)targetSegmentId, (long)sourceSegmentId, timer))).handleAsync((msr, ex) -> {
            if (ex == null || Exceptions.unwrap((Throwable)ex) instanceof StreamSegmentMergedException) {
                this.metadataStore.clearSegmentInfo(sourceStreamSegment, timer.getRemaining());
            }
            if (ex == null) {
                return msr;
            }
            throw new CompletionException((Throwable)ex);
        }, (Executor)this.executor);
    }

    private CompletableFuture<MergeStreamSegmentResult> mergeStreamSegment(long targetSegmentId, long sourceSegmentId, TimeoutTimer timer) {
        UpdateableSegmentMetadata sourceMetadata = this.metadata.getStreamSegmentMetadata(sourceSegmentId);
        CompletableFuture<Void> sealResult = this.trySealStreamSegment(sourceMetadata, timer.getRemaining());
        if (sourceMetadata.getLength() == 0L) {
            return sealResult.thenComposeAsync(v -> {
                if (sourceMetadata.getLength() == 0L) {
                    log.debug("{}: Deleting empty source segment instead of merging {}.", (Object)this.traceObjectId, (Object)sourceMetadata.getName());
                    return this.deleteStreamSegment(sourceMetadata.getName(), timer.getRemaining()).thenApply(v2 -> new MergeStreamSegmentResult(this.metadata.getStreamSegmentMetadata(targetSegmentId).getLength(), sourceMetadata.getLength(), sourceMetadata.getAttributes()));
                }
                MergeSegmentOperation operation = new MergeSegmentOperation(targetSegmentId, sourceSegmentId);
                return this.durableLog.add(operation, timer.getRemaining()).thenApply(v2 -> new MergeStreamSegmentResult(operation.getStreamSegmentOffset() + operation.getLength(), operation.getLength(), sourceMetadata.getAttributes()));
            }, (Executor)this.executor);
        }
        MergeSegmentOperation operation = new MergeSegmentOperation(targetSegmentId, sourceSegmentId);
        return CompletableFuture.allOf(sealResult, this.durableLog.add(operation, timer.getRemaining())).thenApply(v2 -> new MergeStreamSegmentResult(operation.getStreamSegmentOffset() + operation.getLength(), operation.getLength(), sourceMetadata.getAttributes()));
    }

    public CompletableFuture<Long> sealStreamSegment(String streamSegmentName, Duration timeout) {
        this.ensureRunning();
        this.logRequest("seal", streamSegmentName);
        this.metrics.seal();
        TimeoutTimer timer = new TimeoutTimer(timeout);
        return this.metadataStore.getOrAssignSegmentId(streamSegmentName, timer.getRemaining(), streamSegmentId -> this.seal((long)streamSegmentId, timer.getRemaining()));
    }

    @Override
    public CompletableFuture<DirectSegmentAccess> forSegment(String streamSegmentName, Duration timeout) {
        this.ensureRunning();
        this.logRequest("forSegment", streamSegmentName);
        return this.metadataStore.getOrAssignSegmentId(streamSegmentName, timeout, segmentId -> CompletableFuture.completedFuture(new DirectSegmentWrapper((long)segmentId)));
    }

    @Override
    public Collection<SegmentProperties> getActiveSegments() {
        this.ensureRunning();
        this.logRequest("getActiveSegments", new Object[0]);
        return this.metadata.getAllStreamSegmentIds().stream().map(this.metadata::getStreamSegmentMetadata).filter(Objects::nonNull).map(SegmentMetadata::getSnapshot).collect(Collectors.toList());
    }

    @Override
    public <T extends SegmentContainerExtension> T getExtension(Class<T> extensionClass) {
        SegmentContainerExtension extension = this.extensions.get(extensionClass);
        return (T)(extension == null ? null : extension);
    }

    private CompletableFuture<Void> updateAttributesForSegment(long segmentId, Collection<AttributeUpdate> attributeUpdates, Duration timeout) {
        UpdateAttributesOperation operation = new UpdateAttributesOperation(segmentId, attributeUpdates);
        return this.processAttributeUpdaterOperation(operation, new TimeoutTimer(timeout));
    }

    private CompletableFuture<Map<UUID, Long>> getAttributesForSegment(long segmentId, Collection<UUID> attributeIds, boolean cache, TimeoutTimer timer) {
        UpdateableSegmentMetadata metadata = this.metadata.getStreamSegmentMetadata(segmentId);
        if (cache) {
            return CACHE_ATTRIBUTES_RETRY.runAsync(() -> this.getAndCacheAttributes(metadata, attributeIds, cache, timer), this.executor);
        }
        return this.getAndCacheAttributes(metadata, attributeIds, cache, timer);
    }

    private CompletableFuture<Long> seal(long segmentId, Duration timeout) {
        StreamSegmentSealOperation operation = new StreamSegmentSealOperation(segmentId);
        return this.durableLog.add(operation, timeout).thenApply(seqNo -> operation.getStreamSegmentOffset());
    }

    private CompletableFuture<Void> truncate(long segmentId, long offset, Duration timeout) {
        StreamSegmentTruncateOperation op = new StreamSegmentTruncateOperation(segmentId, offset);
        return this.durableLog.add(op, timeout);
    }

    private CompletableFuture<Void> trySealStreamSegment(SegmentMetadata metadata, Duration timeout) {
        if (metadata.isSealed()) {
            return CompletableFuture.completedFuture(null);
        }
        return Futures.exceptionallyExpecting(this.durableLog.add(new StreamSegmentSealOperation(metadata.getId()), timeout), ex -> ex instanceof StreamSegmentSealedException, null);
    }

    private CompletableFuture<Void> processAppend(StreamSegmentAppendOperation appendOperation, TimeoutTimer timer) {
        CompletableFuture<Void> result = this.processAttributeUpdaterOperation(appendOperation, timer);
        Futures.exceptionListener(result, ex -> appendOperation.close());
        return result;
    }

    private <T extends Operation> CompletableFuture<Void> processAttributeUpdaterOperation(T operation, TimeoutTimer timer) {
        Collection<AttributeUpdate> updates = ((AttributeUpdaterOperation)((Object)operation)).getAttributeUpdates();
        if (updates == null || updates.isEmpty()) {
            return this.durableLog.add(operation, timer.getRemaining());
        }
        return Futures.exceptionallyCompose(this.durableLog.add(operation, timer.getRemaining()), ex -> {
            if ((ex = Exceptions.unwrap((Throwable)ex)) instanceof BadAttributeUpdateException && ((BadAttributeUpdateException)ex).isPreviousValueMissing()) {
                UpdateableSegmentMetadata segmentMetadata = this.metadata.getStreamSegmentMetadata(((SegmentOperation)((Object)operation)).getStreamSegmentId());
                Collection attributeIds = updates.stream().map(AttributeUpdate::getAttributeId).filter(id -> !Attributes.isCoreAttribute((UUID)id)).collect(Collectors.toList());
                if (!attributeIds.isEmpty()) {
                    return this.getAndCacheAttributes(segmentMetadata, attributeIds, true, timer).thenComposeAsync(attributes -> this.durableLog.add(operation, timer.getRemaining()), (Executor)this.executor);
                }
            }
            return Futures.failedFuture((Throwable)ex);
        });
    }

    private CompletableFuture<Map<UUID, Long>> getAndCacheAttributes(SegmentMetadata segmentMetadata, Collection<UUID> attributeIds, boolean cache, TimeoutTimer timer) {
        HashMap result = new HashMap();
        Map<UUID, Long> metadataAttributes = segmentMetadata.getAttributes();
        ArrayList extendedAttributeIds = new ArrayList();
        attributeIds.forEach(attributeId -> {
            Long v = (Long)metadataAttributes.get(attributeId);
            if (v != null) {
                result.put(attributeId, v);
            } else if (!Attributes.isCoreAttribute((UUID)attributeId)) {
                extendedAttributeIds.add(attributeId);
            }
        });
        if (extendedAttributeIds.isEmpty()) {
            return CompletableFuture.completedFuture(result);
        }
        CompletionStage r = ((CompletableFuture)this.attributeIndex.forSegment(segmentMetadata.getId(), timer.getRemaining()).thenComposeAsync(idx -> idx.get(extendedAttributeIds, timer.getRemaining()), (Executor)this.executor)).thenApplyAsync(extendedAttributes -> {
            if (extendedAttributeIds.size() == extendedAttributes.size()) {
                return extendedAttributes;
            }
            HashMap allValues = new HashMap(extendedAttributes);
            extendedAttributeIds.stream().filter(id -> !extendedAttributes.containsKey(id)).forEach(id -> allValues.put(id, Long.MIN_VALUE));
            return allValues;
        }, (Executor)this.executor);
        if (cache && !segmentMetadata.isSealed()) {
            r = ((CompletableFuture)r).thenComposeAsync(extendedAttributes -> {
                List<AttributeUpdate> updates = extendedAttributes.entrySet().stream().map(e -> new AttributeUpdate((UUID)e.getKey(), AttributeUpdateType.None, ((Long)e.getValue()).longValue())).collect(Collectors.toList());
                return this.durableLog.add(new UpdateAttributesOperation(segmentMetadata.getId(), updates), timer.getRemaining()).thenApply(v -> extendedAttributes);
            }, (Executor)this.executor);
        }
        return ((CompletableFuture)r).thenApply(extendedAttributes -> {
            result.putAll(extendedAttributes);
            return result;
        });
    }

    private CompletableFuture<AttributeIterator> attributeIterator(long segmentId, UUID fromId, UUID toId, Duration timeout) {
        return this.attributeIndex.forSegment(segmentId, timeout).thenApplyAsync(index -> {
            AttributeIterator indexIterator = index.iterator(fromId, toId, timeout);
            return new SegmentAttributeIterator(indexIterator, this.metadata.getStreamSegmentMetadata(segmentId), fromId, toId);
        }, (Executor)this.executor);
    }

    protected void notifyMetadataRemoved(Collection<SegmentMetadata> segments) {
        if (segments.size() > 0) {
            Collection segmentIds = segments.stream().map(SegmentMetadata::getId).collect(Collectors.toList());
            this.readIndex.cleanup(segmentIds);
            this.attributeIndex.cleanup(segmentIds);
        }
    }

    private void ensureRunning() {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        if (this.state() != Service.State.RUNNING) {
            throw new IllegalContainerStateException(this.getId(), this.state(), Service.State.RUNNING);
        }
        if (this.isOffline()) {
            throw new ContainerOfflineException(this.getId());
        }
    }

    private void logRequest(String requestName, Object ... args) {
        log.debug("{}: {} {}", new Object[]{this.traceObjectId, requestName, args});
    }

    private void shutdownWhenStopped(Service component, String componentName) {
        Consumer<Throwable> failedHandler = cause -> {
            log.warn("{}: {} failed. Shutting down StreamSegmentContainer.", new Object[]{this.traceObjectId, componentName, cause});
            if (this.state() == Service.State.RUNNING) {
                this.stopAsync();
            } else if (this.state() == Service.State.STARTING) {
                this.notifyFailed((Throwable)cause);
            }
        };
        Runnable stoppedHandler = () -> {
            if (this.state() == Service.State.STARTING || this.state() == Service.State.RUNNING) {
                log.warn("{}: {} stopped unexpectedly (no error) but StreamSegmentContainer was not currently stopping. Shutting down StreamSegmentContainer.", (Object)this.traceObjectId, (Object)componentName);
                this.stopAsync();
            }
        };
        Services.onStop((Service)component, (Runnable)stoppedHandler, failedHandler, (Executor)this.executor);
    }

    private CompletableFuture<Void> runMetadataCleanup() {
        return this.metadataCleaner.runOnce();
    }

    private CompletableFuture<Long> mapSegmentId(long segmentId, SegmentProperties segmentProperties, boolean pin, Duration timeout) {
        StreamSegmentMapOperation op = new StreamSegmentMapOperation(segmentProperties);
        if (segmentId != Long.MIN_VALUE) {
            op.setStreamSegmentId(segmentId);
        }
        if (pin) {
            op.markPinned();
        }
        return this.durableLog.add(op, timeout).thenApply(ignored -> op.getStreamSegmentId());
    }

    private CompletableFuture<Void> deleteSegmentImmediate(String segmentName, Duration timeout) {
        return CompletableFuture.allOf(new CompletableFuture[]{this.storage.openWrite(segmentName).thenComposeAsync(handle -> this.storage.delete(handle, timeout), (Executor)this.executor), this.attributeIndex.delete(segmentName, timeout)});
    }

    private CompletableFuture<Void> deleteSegmentDelayed(long segmentId, Duration timeout) {
        return this.durableLog.add(new DeleteSegmentOperation(segmentId), timeout);
    }

    private class DirectSegmentWrapper
    implements DirectSegmentAccess {
        private final long segmentId;

        @Override
        public CompletableFuture<Long> append(BufferView data, Collection<AttributeUpdate> attributeUpdates, Duration timeout) {
            StreamSegmentContainer.this.ensureRunning();
            StreamSegmentContainer.this.logRequest("append", new Object[]{this.segmentId, data.getLength()});
            StreamSegmentAppendOperation operation = new StreamSegmentAppendOperation(this.segmentId, data, attributeUpdates);
            return StreamSegmentContainer.this.processAppend(operation, new TimeoutTimer(timeout)).thenApply(v -> operation.getStreamSegmentOffset());
        }

        @Override
        public CompletableFuture<Void> updateAttributes(Collection<AttributeUpdate> attributeUpdates, Duration timeout) {
            StreamSegmentContainer.this.ensureRunning();
            StreamSegmentContainer.this.logRequest("updateAttributes", new Object[]{this.segmentId, attributeUpdates});
            return StreamSegmentContainer.this.updateAttributesForSegment(this.segmentId, attributeUpdates, timeout);
        }

        @Override
        public CompletableFuture<Map<UUID, Long>> getAttributes(Collection<UUID> attributeIds, boolean cache, Duration timeout) {
            StreamSegmentContainer.this.ensureRunning();
            StreamSegmentContainer.this.logRequest("getAttributes", new Object[]{this.segmentId, attributeIds});
            return StreamSegmentContainer.this.getAttributesForSegment(this.segmentId, attributeIds, cache, new TimeoutTimer(timeout));
        }

        @Override
        public ReadResult read(long offset, int maxLength, Duration timeout) {
            StreamSegmentContainer.this.ensureRunning();
            StreamSegmentContainer.this.logRequest("read", new Object[]{this.segmentId, offset, maxLength});
            return StreamSegmentContainer.this.readIndex.read(this.segmentId, offset, maxLength, timeout);
        }

        @Override
        public SegmentProperties getInfo() {
            StreamSegmentContainer.this.ensureRunning();
            return StreamSegmentContainer.this.metadata.getStreamSegmentMetadata(this.segmentId);
        }

        @Override
        public CompletableFuture<Long> seal(Duration timeout) {
            StreamSegmentContainer.this.ensureRunning();
            StreamSegmentContainer.this.logRequest("seal", new Object[]{this.segmentId});
            return StreamSegmentContainer.this.seal(this.segmentId, timeout);
        }

        @Override
        public CompletableFuture<Void> truncate(long offset, Duration timeout) {
            StreamSegmentContainer.this.ensureRunning();
            StreamSegmentContainer.this.logRequest("truncateStreamSegment", new Object[]{this.segmentId});
            return StreamSegmentContainer.this.truncate(this.segmentId, offset, timeout);
        }

        @Override
        public CompletableFuture<AttributeIterator> attributeIterator(UUID fromId, UUID toId, Duration timeout) {
            StreamSegmentContainer.this.ensureRunning();
            StreamSegmentContainer.this.logRequest("attributeIterator", new Object[]{this.segmentId, fromId, toId});
            return StreamSegmentContainer.this.attributeIterator(this.segmentId, fromId, toId, timeout);
        }

        @ConstructorProperties(value={"segmentId"})
        @SuppressFBWarnings(justification="generated code")
        public DirectSegmentWrapper(long segmentId) {
            this.segmentId = segmentId;
        }

        @Override
        @SuppressFBWarnings(justification="generated code")
        public long getSegmentId() {
            return this.segmentId;
        }
    }
}

