/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.segment.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.auth.InvalidTokenException;
import io.pravega.auth.TokenExpiredException;
import io.pravega.client.connection.impl.ClientConnection;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.connection.impl.Flow;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.segment.impl.NoSuchSegmentException;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentOutputStream;
import io.pravega.client.segment.impl.SegmentSealedException;
import io.pravega.client.stream.impl.PendingEvent;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.common.util.Retry;
import io.pravega.common.util.ReusableFutureLatch;
import io.pravega.common.util.ReusableLatch;
import io.pravega.shared.NameUtils;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.FailingReplyProcessor;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommands;
import java.beans.ConstructorProperties;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SegmentOutputStreamImpl
implements SegmentOutputStream {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SegmentOutputStreamImpl.class);
    private final String segmentName;
    @VisibleForTesting
    private final boolean useConnectionPooling;
    private final Controller controller;
    private final ConnectionPool connectionPool;
    private final UUID writerId;
    private final Consumer<Segment> resendToSuccessorsCallback;
    private final State state = new State();
    private final ResponseProcessor responseProcessor = new ResponseProcessor();
    private final Retry.RetryWithBackoff retrySchedule;
    private final Object writeOrderLock = new Object();
    private final DelegationTokenProvider tokenProvider;
    @VisibleForTesting
    private final long requestId = Flow.create().asLong();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void write(PendingEvent event) {
        Preconditions.checkState((!this.state.isAlreadySealed() || NameUtils.isTransactionSegment((String)this.segmentName) ? 1 : 0) != 0, (String)"Segment: %s is already sealed", (Object)this.segmentName);
        Object object = this.writeOrderLock;
        synchronized (object) {
            ClientConnection connection;
            try {
                connection = (ClientConnection)Futures.getThrowingException(this.getConnection());
            }
            catch (NoSuchSegmentException | SegmentSealedException e) {
                this.state.addToInflight(event);
                return;
            }
            catch (RetriesExhaustedException e) {
                event.getAckFuture().completeExceptionally(e);
                log.error("Failed to write event to Pravega due connectivity error ", (Throwable)e);
                return;
            }
            long eventNumber = this.state.addToInflight(event);
            try {
                Append append = new Append(this.segmentName, this.writerId, eventNumber, event.getEventCount(), event.getData(), null, this.requestId);
                log.trace("Sending append request: {}", (Object)append);
                connection.send(append);
            }
            catch (ConnectionFailedException e) {
                log.warn("Failed writing event through writer " + this.writerId + " due to: ", (Throwable)e);
                this.reconnect();
            }
        }
    }

    CompletableFuture<ClientConnection> getConnection() throws SegmentSealedException {
        if (this.state.isClosed()) {
            throw new IllegalStateException("SegmentOutputStream is already closed", this.state.getException());
        }
        if (this.state.needSuccessors.get()) {
            throw new SegmentSealedException(this.segmentName);
        }
        if (this.state.getConnection() == null) {
            this.reconnect();
        }
        CompletableFuture<ClientConnection> future = new CompletableFuture<ClientConnection>();
        this.state.setupConnection.register(future);
        return future;
    }

    @Override
    public void close() throws SegmentSealedException {
        if (this.state.isClosed()) {
            return;
        }
        log.debug("Closing writer: {}", (Object)this.writerId);
        this.flush();
        this.state.setClosed(true);
        ClientConnection connection = this.state.getConnection();
        if (connection != null) {
            connection.close();
        }
    }

    @Override
    public void flush() throws SegmentSealedException {
        int numInflight = this.state.getNumInflight();
        log.debug("Flushing writer: {} with {} inflight events", (Object)this.writerId, (Object)numInflight);
        if (numInflight != 0) {
            block9: {
                try {
                    ClientConnection connection = (ClientConnection)Futures.getThrowingException(this.getConnection());
                    connection.send((WireCommand)new WireCommands.KeepAlive());
                }
                catch (NoSuchSegmentException | SegmentSealedException e) {
                    if (NameUtils.isTransactionSegment((String)this.segmentName)) {
                        log.warn("Exception observed during a flush on a transaction segment, this indicates that the transaction is committed/aborted. Details: {}", (Object)e.getMessage());
                        this.failConnection(e);
                    } else {
                        log.info("Exception observed while obtaining connection during flush. Details: {} ", (Object)e.getMessage());
                    }
                }
                catch (Exception e) {
                    this.failConnection(e);
                    if (!(e instanceof RetriesExhaustedException)) break block9;
                    log.error("Flush on segment {} by writer {} failed after all retries", (Object)this.segmentName, (Object)this.writerId);
                    throw Exceptions.sneakyThrow((Throwable)e);
                }
            }
            this.state.waitForInflight();
            Exceptions.checkNotClosed((boolean)this.state.isClosed(), (Object)this);
            if (this.state.needSuccessors.get() || NameUtils.isTransactionSegment((String)this.segmentName) && this.state.isAlreadySealed()) {
                throw new SegmentSealedException(this.segmentName + " sealed for writer " + this.writerId);
            }
        } else if (this.state.exception instanceof RetriesExhaustedException) {
            log.error("Flush on segment {} by writer {} failed after all retries", (Object)this.segmentName, (Object)this.writerId);
            throw Exceptions.sneakyThrow((Throwable)this.state.exception);
        }
    }

    @Override
    public void flushAsync() {
    }

    private void failConnection(Throwable e) {
        if (e instanceof TokenExpiredException) {
            this.tokenProvider.signalTokenExpired();
        }
        log.warn("Failing connection for writer {} with exception {}", (Object)this.writerId, (Object)e.toString());
        this.state.failConnection(Exceptions.unwrap((Throwable)e));
        this.reconnect();
    }

    @VisibleForTesting
    void reconnect() {
        if (this.state.isClosed()) {
            return;
        }
        log.debug("(Re)connect invoked, Segment: {}, writerID: {}", (Object)this.segmentName, (Object)this.writerId);
        this.state.setupConnection.registerAndRunReleaser(() -> this.retrySchedule.retryWhen(t -> t instanceof Exception).runAsync(() -> {
            log.debug("Running reconnect for segment {} writer {}", (Object)this.segmentName, (Object)this.writerId);
            if (this.state.isClosed() || this.state.needSuccessors.get()) {
                return CompletableFuture.completedFuture(null);
            }
            Preconditions.checkState((this.state.getConnection() == null ? 1 : 0) != 0);
            log.info("Fetching endpoint for segment {}, writer {}", (Object)this.segmentName, (Object)this.writerId);
            return ((CompletableFuture)((CompletableFuture)this.controller.getEndpointForSegment(this.segmentName).thenComposeAsync(uri -> {
                log.info("Establishing connection to {} for {}, writerID: {}", new Object[]{uri, this.segmentName, this.writerId});
                return this.establishConnection((PravegaNodeUri)uri);
            }, (Executor)this.connectionPool.getInternalExecutor())).thenCombineAsync(this.tokenProvider.retrieveToken(), AbstractMap.SimpleEntry::new, (Executor)this.connectionPool.getInternalExecutor())).thenComposeAsync(pair -> {
                ClientConnection connection = (ClientConnection)pair.getKey();
                String token = (String)pair.getValue();
                CompletableFuture connectionSetupFuture = this.state.newConnection(connection);
                WireCommands.SetupAppend cmd = new WireCommands.SetupAppend(this.requestId, this.writerId, this.segmentName, token);
                try {
                    connection.send((WireCommand)cmd);
                }
                catch (ConnectionFailedException e1) {
                    this.state.failConnection(e1);
                    throw Exceptions.sneakyThrow((Throwable)e1);
                }
                return connectionSetupFuture.exceptionally(t1 -> {
                    Throwable exception = Exceptions.unwrap((Throwable)t1);
                    if (exception instanceof InvalidTokenException) {
                        log.info("Ending reconnect attempts on writer {} to {} because token verification failed due to invalid token", (Object)this.writerId, (Object)this.segmentName);
                        return null;
                    }
                    if (exception instanceof SegmentSealedException) {
                        log.info("Ending reconnect attempts on writer {} to {} because segment is sealed", (Object)this.writerId, (Object)this.segmentName);
                        return null;
                    }
                    if (exception instanceof NoSuchSegmentException) {
                        log.info("Ending reconnect attempts on writer {} to {} because segment is truncated", (Object)this.writerId, (Object)this.segmentName);
                        return null;
                    }
                    throw Exceptions.sneakyThrow((Throwable)t1);
                });
            }, (Executor)this.connectionPool.getInternalExecutor());
        }, this.connectionPool.getInternalExecutor()).exceptionally(t -> {
            log.error("Error while attempting to establish connection for writer {}", (Object)this.writerId, t);
            this.failAndRemoveUnackedEvents((Throwable)t);
            return null;
        }), new CompletableFuture());
    }

    private CompletableFuture<ClientConnection> establishConnection(PravegaNodeUri uri) {
        if (this.useConnectionPooling) {
            return this.connectionPool.getClientConnection(Flow.from(this.requestId), uri, (ReplyProcessor)this.responseProcessor);
        }
        return this.connectionPool.getClientConnection(uri, (ReplyProcessor)this.responseProcessor);
    }

    private void failAndRemoveUnackedEvents(Throwable t) {
        this.state.getAllInflightEventsAndClear().parallelStream().forEach(event -> event.getAckFuture().completeExceptionally(t));
        this.state.failConnection(t);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PendingEvent> getUnackedEventsOnSeal() {
        log.debug("GetUnackedEventsOnSeal called on {}", (Object)this.writerId);
        Object object = this.writeOrderLock;
        synchronized (object) {
            this.state.failConnection(new SegmentSealedException(this.segmentName));
            return Collections.unmodifiableList(this.state.getAllInflightEvents());
        }
    }

    @Override
    public long getLastObservedWriteOffset() {
        return this.state.getLastSegmentLength();
    }

    @ConstructorProperties(value={"segmentName", "useConnectionPooling", "controller", "connectionPool", "writerId", "resendToSuccessorsCallback", "retrySchedule", "tokenProvider"})
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public SegmentOutputStreamImpl(String segmentName, boolean useConnectionPooling, Controller controller, ConnectionPool connectionPool, UUID writerId, Consumer<Segment> resendToSuccessorsCallback, Retry.RetryWithBackoff retrySchedule, DelegationTokenProvider tokenProvider) {
        this.segmentName = segmentName;
        this.useConnectionPooling = useConnectionPooling;
        this.controller = controller;
        this.connectionPool = connectionPool;
        this.writerId = writerId;
        this.resendToSuccessorsCallback = resendToSuccessorsCallback;
        this.retrySchedule = retrySchedule;
        this.tokenProvider = tokenProvider;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String toString() {
        return "SegmentOutputStreamImpl(segmentName=" + this.getSegmentName() + ", writerId=" + this.writerId + ", state=" + this.state + ")";
    }

    @Override
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String getSegmentName() {
        return this.segmentName;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public boolean isUseConnectionPooling() {
        return this.useConnectionPooling;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public long getRequestId() {
        return this.requestId;
    }

    private final class ResponseProcessor
    extends FailingReplyProcessor {
        private ResponseProcessor() {
        }

        public void connectionDropped() {
            SegmentOutputStreamImpl.this.failConnection(new ConnectionFailedException("Connection dropped for writer " + SegmentOutputStreamImpl.this.writerId));
        }

        public void wrongHost(WireCommands.WrongHost wrongHost) {
            SegmentOutputStreamImpl.this.failConnection(new ConnectionFailedException(wrongHost.toString()));
        }

        public void segmentIsSealed(WireCommands.SegmentIsSealed segmentIsSealed) {
            log.info("Received SegmentSealed {} on writer {}", (Object)segmentIsSealed, (Object)SegmentOutputStreamImpl.this.writerId);
            this.invokeResendCallBack((WireCommand)segmentIsSealed);
        }

        public void noSuchSegment(WireCommands.NoSuchSegment noSuchSegment) {
            log.info("Received noSuchSegment for writer {}", (Object)SegmentOutputStreamImpl.this.writerId);
            String segment = noSuchSegment.getSegment();
            if (NameUtils.isTransactionSegment((String)segment)) {
                log.info("Transaction Segment: {} no longer exists since the txn is aborted. {}", (Object)noSuchSegment.getSegment(), (Object)noSuchSegment.getServerStackTrace());
                SegmentOutputStreamImpl.this.state.failConnection(new SegmentSealedException(segment));
            } else {
                SegmentOutputStreamImpl.this.state.failConnection(new NoSuchSegmentException(segment));
                log.info("Segment being written to {} by writer {} no longer exists due to Stream Truncation, resending to the newer segment. {}", new Object[]{noSuchSegment.getSegment(), SegmentOutputStreamImpl.this.writerId, noSuchSegment.getServerStackTrace()});
                this.invokeResendCallBack((WireCommand)noSuchSegment);
            }
        }

        public void errorMessage(WireCommands.ErrorMessage errorMessage) {
            log.info("Received an errorMessage containing an unhandled {} on segment {}", (Object)errorMessage.getErrorCode().getExceptionType().getSimpleName(), (Object)errorMessage.getSegment());
            SegmentOutputStreamImpl.this.state.failConnection(errorMessage.getThrowableException());
        }

        public void dataAppended(WireCommands.DataAppended dataAppended) {
            log.trace("Received dataAppended ack: {}", (Object)dataAppended);
            long ackLevel = dataAppended.getEventNumber();
            long previousAckLevel = dataAppended.getPreviousEventNumber();
            try {
                this.checkAckLevels(ackLevel, previousAckLevel);
                SegmentOutputStreamImpl.this.state.noteSegmentLength(dataAppended.getCurrentSegmentWriteOffset());
                this.ackUpTo(ackLevel);
            }
            catch (Exception e) {
                SegmentOutputStreamImpl.this.failConnection(e);
            }
        }

        public void appendSetup(WireCommands.AppendSetup appendSetup) {
            log.info("Received appendSetup {}", (Object)appendSetup);
            long ackLevel = appendSetup.getLastEventNumber();
            this.ackUpTo(ackLevel);
            List<Append> toRetransmit = SegmentOutputStreamImpl.this.state.getAllInflight().stream().map(entry -> new Append(SegmentOutputStreamImpl.this.segmentName, SegmentOutputStreamImpl.this.writerId, ((Long)entry.getKey()).longValue(), ((PendingEvent)entry.getValue()).getEventCount(), ((PendingEvent)entry.getValue()).getData(), null, SegmentOutputStreamImpl.this.requestId)).collect(Collectors.toList());
            ClientConnection connection = SegmentOutputStreamImpl.this.state.getConnection();
            if (connection == null) {
                log.warn("Connection setup could not be completed because connection is already failed for writer {}", (Object)SegmentOutputStreamImpl.this.writerId);
                return;
            }
            if (toRetransmit.isEmpty() || SegmentOutputStreamImpl.this.state.needSuccessors.get()) {
                log.info("Connection setup complete for writer {}", (Object)SegmentOutputStreamImpl.this.writerId);
                SegmentOutputStreamImpl.this.state.connectionSetupComplete(connection);
            } else {
                connection.sendAsync(toRetransmit, e -> {
                    if (e == null) {
                        SegmentOutputStreamImpl.this.state.connectionSetupComplete(connection);
                    } else {
                        SegmentOutputStreamImpl.this.failConnection(e);
                    }
                });
            }
        }

        private void invokeResendCallBack(WireCommand wireCommand) {
            if (SegmentOutputStreamImpl.this.state.needSuccessors.compareAndSet(false, true)) {
                Retry.indefinitelyWithExpBackoff((long)SegmentOutputStreamImpl.this.retrySchedule.getInitialMillis(), (int)SegmentOutputStreamImpl.this.retrySchedule.getMultiplier(), (long)SegmentOutputStreamImpl.this.retrySchedule.getMaxDelay(), t -> log.error(SegmentOutputStreamImpl.this.writerId + " to invoke resendToSuccessors callback: ", t)).runInExecutor(() -> {
                    log.debug("Invoking resendToSuccessors call back for {} on writer {}", (Object)wireCommand, (Object)SegmentOutputStreamImpl.this.writerId);
                    SegmentOutputStreamImpl.this.resendToSuccessorsCallback.accept(Segment.fromScopedName(SegmentOutputStreamImpl.this.getSegmentName()));
                }, SegmentOutputStreamImpl.this.connectionPool.getInternalExecutor()).thenRun(() -> {
                    log.trace("Release inflight latch for writer {}", (Object)SegmentOutputStreamImpl.this.writerId);
                    SegmentOutputStreamImpl.this.state.waitingInflight.release();
                });
            }
        }

        private void ackUpTo(long ackLevel) {
            List pendingEvents = SegmentOutputStreamImpl.this.state.removeInflightBelow(ackLevel);
            SegmentOutputStreamImpl.this.connectionPool.getInternalExecutor().execute(() -> {
                for (PendingEvent toAck : pendingEvents) {
                    if (toAck.getAckFuture() != null) {
                        toAck.getAckFuture().complete(null);
                    }
                    toAck.getData().release();
                }
            });
        }

        private void checkAckLevels(long ackLevel, long previousAckLevel) {
            Preconditions.checkState((previousAckLevel < ackLevel ? 1 : 0) != 0, (String)"Bad ack from server - previousAckLevel = %s, ackLevel = %s", (long)previousAckLevel, (long)ackLevel);
            Long lowest = SegmentOutputStreamImpl.this.state.getLowestInflight();
            Preconditions.checkState((lowest > previousAckLevel ? 1 : 0) != 0, (String)"Missed ack from server - previousAckLevel = %s, ackLevel = %s, inFlightLevel = %s", (Object)previousAckLevel, (Object)ackLevel, (Object)lowest);
        }

        public void processingFailure(Exception error) {
            SegmentOutputStreamImpl.this.failConnection(error);
        }

        public void authTokenCheckFailed(WireCommands.AuthTokenCheckFailed authTokenCheckFailed) {
            if (authTokenCheckFailed.isTokenExpired()) {
                SegmentOutputStreamImpl.this.failConnection((Throwable)new TokenExpiredException(authTokenCheckFailed.getServerStackTrace()));
            } else {
                SegmentOutputStreamImpl.this.failConnection((Throwable)new InvalidTokenException(authTokenCheckFailed.toString()));
            }
        }
    }

    private final class State {
        private final Object lock = new Object();
        @GuardedBy(value="lock")
        private boolean closed = false;
        @GuardedBy(value="lock")
        private ClientConnection connection;
        @GuardedBy(value="lock")
        private CompletableFuture<Void> connectionSetupCompleted;
        @GuardedBy(value="lock")
        private Throwable exception = null;
        @GuardedBy(value="lock")
        private final ArrayDeque<Map.Entry<Long, PendingEvent>> inflight = new ArrayDeque();
        @GuardedBy(value="lock")
        private long eventNumber = 0L;
        @GuardedBy(value="lock")
        private long segmentLength = -1L;
        private final ReusableFutureLatch<ClientConnection> setupConnection = new ReusableFutureLatch();
        private final ReusableLatch waitingInflight = new ReusableLatch(true);
        private final AtomicBoolean needSuccessors = new AtomicBoolean();

        private State() {
        }

        private void waitForInflight() {
            Exceptions.handleInterrupted(() -> this.waitingInflight.await());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean isAlreadySealed() {
            Object object = this.lock;
            synchronized (object) {
                return this.connection == null && this.exception != null && this.exception instanceof SegmentSealedException;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private int getNumInflight() {
            Object object = this.lock;
            synchronized (object) {
                return this.inflight.size();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private long getLastSegmentLength() {
            Object object = this.lock;
            synchronized (object) {
                return this.segmentLength;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void noteSegmentLength(long newLength) {
            Object object = this.lock;
            synchronized (object) {
                this.segmentLength = Math.max(this.segmentLength, newLength);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void connectionSetupComplete(ClientConnection connection) {
            CompletableFuture<Void> toComplete;
            Object object = this.lock;
            synchronized (object) {
                toComplete = this.connectionSetupCompleted;
            }
            if (toComplete != null) {
                toComplete.complete(null);
                this.setupConnection.release((Object)connection);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private ClientConnection getConnection() {
            Object object = this.lock;
            synchronized (object) {
                return this.connection;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private CompletableFuture<Void> newConnection(ClientConnection newConnection) {
            CompletableFuture<Void> result = new CompletableFuture<Void>();
            Object object = this.lock;
            synchronized (object) {
                this.connectionSetupCompleted = result;
                this.connection = newConnection;
                this.exception = null;
            }
            return result;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void failConnection(Throwable throwable) {
            ClientConnection oldConnection = null;
            CompletableFuture<Void> oldConnectionSetupCompleted = null;
            boolean failSetupConnection = false;
            Object object = this.lock;
            synchronized (object) {
                if (this.connection != null) {
                    if (this.connectionSetupCompleted.isDone()) {
                        failSetupConnection = true;
                    } else {
                        oldConnectionSetupCompleted = this.connectionSetupCompleted;
                    }
                    oldConnection = this.connection;
                }
                log.info("Handling exception {} for connection {} on writer {}. SetupCompleted: {}, Closed: {}", new Object[]{throwable, this.connection, SegmentOutputStreamImpl.this.writerId, this.connectionSetupCompleted == null ? null : Boolean.valueOf(this.connectionSetupCompleted.isDone()), this.closed});
                if (this.exception == null || throwable instanceof RetriesExhaustedException) {
                    this.exception = throwable;
                }
                this.connection = null;
                this.connectionSetupCompleted = null;
                if (this.closed || throwable instanceof SegmentSealedException || throwable instanceof RetriesExhaustedException) {
                    this.waitingInflight.release();
                }
                if (!this.closed) {
                    String message = throwable.getMessage() == null ? throwable.getClass().toString() : throwable.getMessage();
                    log.warn("Connection for segment {} on writer {} failed due to: {}", new Object[]{SegmentOutputStreamImpl.this.segmentName, SegmentOutputStreamImpl.this.writerId, message});
                }
            }
            if (throwable instanceof SegmentSealedException || throwable instanceof NoSuchSegmentException || throwable instanceof InvalidTokenException || throwable instanceof RetriesExhaustedException) {
                this.setupConnection.releaseExceptionally(throwable);
            } else if (failSetupConnection) {
                this.setupConnection.releaseExceptionallyAndReset(throwable);
            }
            if (oldConnection != null) {
                oldConnection.close();
            }
            if (oldConnectionSetupCompleted != null) {
                oldConnectionSetupCompleted.completeExceptionally(throwable);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private long addToInflight(PendingEvent event) {
            Object object = this.lock;
            synchronized (object) {
                this.eventNumber += (long)event.getEventCount();
                log.trace("Adding event {} to inflight on writer {}", (Object)this.eventNumber, (Object)SegmentOutputStreamImpl.this.writerId);
                this.inflight.addLast(new AbstractMap.SimpleImmutableEntry<Long, PendingEvent>(this.eventNumber, event));
                if (!this.needSuccessors.get()) {
                    this.waitingInflight.reset();
                }
                return this.eventNumber;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<PendingEvent> removeInflightBelow(long ackLevel) {
            Object object = this.lock;
            synchronized (object) {
                ArrayList<PendingEvent> result = new ArrayList<PendingEvent>();
                Map.Entry<Long, PendingEvent> entry = this.inflight.peekFirst();
                while (entry != null && entry.getKey() <= ackLevel) {
                    this.inflight.pollFirst();
                    result.add(entry.getValue());
                    entry = this.inflight.peekFirst();
                }
                this.releaseIfEmptyInflight();
                return result;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Long getLowestInflight() {
            Object object = this.lock;
            synchronized (object) {
                Map.Entry<Long, PendingEvent> entry = this.inflight.peekFirst();
                return entry == null ? null : entry.getKey();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void releaseIfEmptyInflight() {
            Object object = this.lock;
            synchronized (object) {
                if (this.inflight.isEmpty()) {
                    log.trace("Inflight empty for writer {}", (Object)SegmentOutputStreamImpl.this.writerId);
                    this.waitingInflight.release();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<Map.Entry<Long, PendingEvent>> getAllInflight() {
            Object object = this.lock;
            synchronized (object) {
                return new ArrayList<Map.Entry<Long, PendingEvent>>(this.inflight);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<PendingEvent> getAllInflightEvents() {
            Object object = this.lock;
            synchronized (object) {
                return this.inflight.stream().map(entry -> (PendingEvent)entry.getValue()).collect(Collectors.toList());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<PendingEvent> getAllInflightEventsAndClear() {
            Object object = this.lock;
            synchronized (object) {
                List<PendingEvent> inflightEvents = this.getAllInflightEvents();
                this.inflight.clear();
                return inflightEvents;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean isClosed() {
            Object object = this.lock;
            synchronized (object) {
                return this.closed;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void setClosed(boolean closed) {
            Object object = this.lock;
            synchronized (object) {
                this.closed = closed;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Throwable getException() {
            Object object = this.lock;
            synchronized (object) {
                return this.exception;
            }
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public String toString() {
            return "SegmentOutputStreamImpl.State(closed=" + this.isClosed() + ", exception=" + this.getException() + ", eventNumber=" + this.eventNumber + ")";
        }
    }
}

