/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.segmentstore.server.host.handler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.LinkedListMultimap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.pravega.auth.AuthHandler;
import io.pravega.auth.TokenException;
import io.pravega.auth.TokenExpiredException;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.Timer;
import io.pravega.common.util.BufferView;
import io.pravega.segmentstore.contracts.AttributeUpdate;
import io.pravega.segmentstore.contracts.AttributeUpdateType;
import io.pravega.segmentstore.contracts.Attributes;
import io.pravega.segmentstore.contracts.BadAttributeUpdateException;
import io.pravega.segmentstore.contracts.BadOffsetException;
import io.pravega.segmentstore.contracts.ContainerNotFoundException;
import io.pravega.segmentstore.contracts.StreamSegmentExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.contracts.StreamSegmentStore;
import io.pravega.segmentstore.server.IllegalContainerStateException;
import io.pravega.segmentstore.server.host.delegationtoken.DelegationTokenVerifier;
import io.pravega.segmentstore.server.host.handler.ServerConnection;
import io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.ByteBufWrapper;
import io.pravega.shared.protocol.netty.DelegatingRequestProcessor;
import io.pravega.shared.protocol.netty.RequestProcessor;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommands;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AppendProcessor
extends DelegatingRequestProcessor {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(AppendProcessor.class);
    static final Duration TIMEOUT = Duration.ofMinutes(1L);
    private static final int HIGH_WATER_MARK = 0x100000;
    private static final int LOW_WATER_MARK = 655360;
    private static final String EMPTY_STACK_TRACE = "";
    private final StreamSegmentStore store;
    private final ServerConnection connection;
    private final AtomicLong pendingBytes = new AtomicLong(0L);
    private final RequestProcessor nextRequestProcessor;
    private final Object lock = new Object();
    private final SegmentStatsRecorder statsRecorder;
    private final DelegationTokenVerifier tokenVerifier;
    private final boolean replyWithStackTraceOnError;
    @GuardedBy(value="lock")
    private final LinkedListMultimap<UUID, Append> waitingAppends = LinkedListMultimap.create((int)2);
    @GuardedBy(value="lock")
    private final HashMap<Pair<String, UUID>, Long> latestEventNumbers = new HashMap();
    @GuardedBy(value="lock")
    private Append outstandingAppend = null;

    @VisibleForTesting
    public AppendProcessor(StreamSegmentStore store, ServerConnection connection, RequestProcessor next, DelegationTokenVerifier verifier) {
        this(store, connection, next, SegmentStatsRecorder.noOp(), verifier, false);
    }

    AppendProcessor(StreamSegmentStore store, ServerConnection connection, RequestProcessor next, SegmentStatsRecorder statsRecorder, DelegationTokenVerifier tokenVerifier, boolean replyWithStackTraceOnError) {
        this.store = (StreamSegmentStore)Preconditions.checkNotNull((Object)store, (Object)"store");
        this.connection = (ServerConnection)Preconditions.checkNotNull((Object)connection, (Object)"connection");
        this.nextRequestProcessor = (RequestProcessor)Preconditions.checkNotNull((Object)next, (Object)"next");
        this.statsRecorder = (SegmentStatsRecorder)Preconditions.checkNotNull((Object)statsRecorder, (Object)statsRecorder);
        this.tokenVerifier = tokenVerifier;
        this.replyWithStackTraceOnError = replyWithStackTraceOnError;
    }

    public void hello(WireCommands.Hello hello) {
        log.info("Received hello from connection: {}", (Object)this.connection);
        this.connection.send((WireCommand)new WireCommands.Hello(9, 5));
        if (hello.getLowVersion() > 9 || hello.getHighVersion() < 5) {
            log.warn("Incompatible wire protocol versions {} from connection {}", (Object)hello, (Object)this.connection);
            this.connection.close();
        }
    }

    public void setupAppend(WireCommands.SetupAppend setupAppend) {
        String newSegment = setupAppend.getSegment();
        UUID writer = setupAppend.getWriterId();
        log.info("Setting up appends for writer: {} on segment: {}", (Object)writer, (Object)newSegment);
        if (this.tokenVerifier != null) {
            try {
                this.tokenVerifier.verifyToken(newSegment, setupAppend.getDelegationToken(), AuthHandler.Permissions.READ_UPDATE);
            }
            catch (TokenException e) {
                this.handleException(setupAppend.getWriterId(), setupAppend.getRequestId(), newSegment, "Update Segment Attribute", e);
                return;
            }
        }
        this.store.getAttributes(newSegment, Collections.singleton(writer), true, TIMEOUT).whenComplete((attributes, u) -> {
            block6: {
                try {
                    if (u != null) {
                        this.handleException(writer, setupAppend.getRequestId(), newSegment, "setting up append", (Throwable)u);
                        break block6;
                    }
                    long eventNumber = attributes.getOrDefault(writer, Long.MIN_VALUE);
                    Object object = this.lock;
                    synchronized (object) {
                        this.latestEventNumbers.putIfAbsent((Pair<String, UUID>)Pair.of((Object)newSegment, (Object)writer), eventNumber);
                    }
                    this.connection.send((WireCommand)new WireCommands.AppendSetup(setupAppend.getRequestId(), newSegment, writer, eventNumber));
                }
                catch (Throwable e) {
                    this.handleException(writer, setupAppend.getRequestId(), newSegment, "handling setupAppend result", e);
                }
            }
        });
    }

    private void performNextWrite() {
        Append append = this.getNextAppend();
        if (append == null) {
            return;
        }
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"storeAppend", (Object[])new Object[]{append});
        Timer timer = new Timer();
        ((CompletableFuture)this.storeAppend(append).whenComplete((length, e) -> {
            this.handleAppendResult(append, (Long)length, (Throwable)e, timer);
            LoggerHelpers.traceLeave((Logger)log, (String)"storeAppend", (long)traceId, (Object[])new Object[]{append, e});
        })).whenComplete((v, e) -> append.getData().release());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Append getNextAppend() {
        Object object = this.lock;
        synchronized (object) {
            if (this.outstandingAppend != null || this.waitingAppends.isEmpty()) {
                return null;
            }
            UUID writer = (UUID)this.waitingAppends.keys().iterator().next();
            List appends = this.waitingAppends.get((Object)writer);
            if (((Append)appends.get(0)).isConditional()) {
                this.outstandingAppend = (Append)appends.remove(0);
            } else {
                Append a;
                ByteBuf[] toAppend = new ByteBuf[appends.size()];
                Append last = (Append)appends.get(0);
                int eventCount = 0;
                int i = -1;
                Iterator iterator = appends.iterator();
                while (iterator.hasNext() && !(a = (Append)iterator.next()).isConditional()) {
                    toAppend[++i] = a.getData();
                    last = a;
                    eventCount += a.getEventCount();
                    iterator.remove();
                }
                ByteBuf data = Unpooled.wrappedBuffer((ByteBuf[])toAppend);
                String segment = last.getSegment();
                long eventNumber = last.getEventNumber();
                this.outstandingAppend = new Append(segment, writer, eventNumber, eventCount, data, null, last.getRequestId());
            }
            this.setPendingBytes(this.getPendingBytes() - (long)this.outstandingAppend.getData().readableBytes());
            return this.outstandingAppend;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Long> storeAppend(Append append) {
        long lastEventNumber;
        Object object = this.lock;
        synchronized (object) {
            lastEventNumber = this.latestEventNumbers.get(Pair.of((Object)append.getSegment(), (Object)append.getWriterId()));
        }
        List<AttributeUpdate> attributes = Arrays.asList(new AttributeUpdate(append.getWriterId(), AttributeUpdateType.ReplaceIfEquals, append.getEventNumber(), lastEventNumber), new AttributeUpdate(Attributes.EVENT_COUNT, AttributeUpdateType.Accumulate, (long)append.getEventCount()));
        ByteBufWrapper buf = new ByteBufWrapper(append.getData());
        if (append.isConditional()) {
            return this.store.append(append.getSegment(), append.getExpectedLength().longValue(), (BufferView)buf, attributes, TIMEOUT);
        }
        return this.store.append(append.getSegment(), (BufferView)buf, attributes, TIMEOUT);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleAppendResult(Append append, Long newWriteOffset, Throwable exception, Timer elapsedTimer) {
        boolean success = exception == null;
        try {
            long previousEventNumber;
            boolean conditionalFailed = !success && Exceptions.unwrap((Throwable)exception) instanceof BadOffsetException;
            Object object = this.lock;
            synchronized (object) {
                previousEventNumber = this.latestEventNumbers.get(Pair.of((Object)append.getSegment(), (Object)append.getWriterId()));
                Preconditions.checkState((this.outstandingAppend == append ? 1 : 0) != 0, (String)"Synchronization error in: %s while processing append: %s.", (Object)((Object)((Object)this)).getClass().getName(), (Object)append);
            }
            if (success) {
                WireCommands.DataAppended dataAppendedAck = new WireCommands.DataAppended(append.getRequestId(), append.getWriterId(), append.getEventNumber(), previousEventNumber, newWriteOffset.longValue());
                log.trace("Sending DataAppended : {}", (Object)dataAppendedAck);
                this.connection.send((WireCommand)dataAppendedAck);
            } else if (conditionalFailed) {
                log.debug("Conditional append failed due to incorrect offset: {}, {}", (Object)append, (Object)exception.getMessage());
                this.connection.send((WireCommand)new WireCommands.ConditionalCheckFailed(append.getWriterId(), append.getEventNumber(), append.getRequestId()));
            } else {
                this.handleException(append.getWriterId(), append.getRequestId(), append.getSegment(), append.getEventNumber(), "appending data", exception);
            }
            object = this.lock;
            synchronized (object) {
                Preconditions.checkState((this.outstandingAppend == append ? 1 : 0) != 0, (String)"Synchronization error in: %s while processing append: %s.", (Object)((Object)((Object)this)).getClass().getName(), (Object)append);
                this.outstandingAppend = null;
                if (exception == null) {
                    this.latestEventNumbers.put((Pair<String, UUID>)Pair.of((Object)append.getSegment(), (Object)append.getWriterId()), append.getEventNumber());
                } else if (!conditionalFailed) {
                    long bytes = this.waitingAppends.removeAll((Object)append.getWriterId()).stream().mapToInt(a -> a.getData().readableBytes()).sum();
                    this.setPendingBytes(this.getPendingBytes() - bytes);
                    this.latestEventNumbers.remove(Pair.of((Object)append.getSegment(), (Object)append.getWriterId()));
                }
            }
            this.pauseOrResumeReading();
            this.performNextWrite();
        }
        catch (Throwable e) {
            success = false;
            this.handleException(append.getWriterId(), append.getEventNumber(), append.getSegment(), "handling append result", e);
        }
        if (success) {
            this.statsRecorder.recordAppend(append.getSegment(), append.getDataLength(), append.getEventCount(), elapsedTimer.getElapsed());
        }
    }

    private void handleException(UUID writerId, long requestId, String segment, String doingWhat, Throwable u) {
        this.handleException(writerId, requestId, segment, -1L, doingWhat, u);
    }

    private void handleException(UUID writerId, long requestId, String segment, long eventNumber, String doingWhat, Throwable u) {
        String clientReplyStackTrace;
        if (u == null) {
            IllegalStateException exception = new IllegalStateException("No exception to handle.");
            log.error("Append processor: Error {} on segment = '{}'", new Object[]{doingWhat, segment, exception});
            throw exception;
        }
        u = Exceptions.unwrap((Throwable)u);
        String string = clientReplyStackTrace = this.replyWithStackTraceOnError ? Throwables.getStackTraceAsString((Throwable)u) : EMPTY_STACK_TRACE;
        if (u instanceof StreamSegmentExistsException) {
            log.warn("Segment '{}' already exists and {} cannot perform operation '{}'.", new Object[]{segment, writerId, doingWhat});
            this.connection.send((WireCommand)new WireCommands.SegmentAlreadyExists(requestId, segment, clientReplyStackTrace));
        } else if (u instanceof StreamSegmentNotExistsException) {
            log.warn("Segment '{}' does not exist and {} cannot perform operation '{}'.", new Object[]{segment, writerId, doingWhat});
            this.connection.send((WireCommand)new WireCommands.NoSuchSegment(requestId, segment, clientReplyStackTrace, -1L));
        } else if (u instanceof StreamSegmentSealedException) {
            log.info("Segment '{}' is sealed and {} cannot perform operation '{}'.", new Object[]{segment, writerId, doingWhat});
            this.connection.send((WireCommand)new WireCommands.SegmentIsSealed(requestId, segment, clientReplyStackTrace, eventNumber));
        } else if (u instanceof ContainerNotFoundException) {
            int containerId = ((ContainerNotFoundException)u).getContainerId();
            log.warn("Wrong host. Segment '{}' (Container {}) is not owned and {} cannot perform operation '{}'.", new Object[]{segment, containerId, writerId, doingWhat});
            this.connection.send((WireCommand)new WireCommands.WrongHost(requestId, segment, EMPTY_STACK_TRACE, clientReplyStackTrace));
        } else if (u instanceof BadAttributeUpdateException) {
            log.warn("Bad attribute update by {} on segment {}.", new Object[]{writerId, segment, u});
            this.connection.send((WireCommand)new WireCommands.InvalidEventNumber(writerId, requestId, clientReplyStackTrace));
            this.connection.close();
        } else if (u instanceof TokenExpiredException) {
            log.warn("Token expired for writer {} on segment {}.", new Object[]{writerId, segment, u});
            this.connection.send((WireCommand)new WireCommands.AuthTokenCheckFailed(requestId, clientReplyStackTrace, WireCommands.AuthTokenCheckFailed.ErrorCode.TOKEN_EXPIRED));
        } else if (u instanceof TokenException) {
            log.warn("Token check failed or writer {} on segment {}.", new Object[]{writerId, segment, u});
            this.connection.send((WireCommand)new WireCommands.AuthTokenCheckFailed(requestId, clientReplyStackTrace, WireCommands.AuthTokenCheckFailed.ErrorCode.TOKEN_CHECK_FAILED));
        } else if (u instanceof UnsupportedOperationException) {
            log.warn("Unsupported Operation '{}'.", (Object)doingWhat, (Object)u);
            this.connection.send((WireCommand)new WireCommands.OperationUnsupported(requestId, doingWhat, clientReplyStackTrace));
        } else if (u instanceof CancellationException) {
            log.info("Closing connection '{}' while performing append on Segment '{}' due to {}.", new Object[]{this.connection, segment, u.toString()});
            this.connection.close();
        } else {
            this.logError(segment, u);
            this.connection.close();
        }
    }

    private void logError(String segment, Throwable u) {
        if (u instanceof IllegalContainerStateException) {
            log.warn("Error (Segment = '{}', Operation = 'append'): {}.", (Object)segment, (Object)u.toString());
        } else {
            log.error("Error (Segment = '{}', Operation = 'append')", (Object)segment, (Object)u);
        }
    }

    private void pauseOrResumeReading() {
        long bytesWaiting = this.getPendingBytes();
        if (bytesWaiting > 0x100000L) {
            log.debug("Pausing writing from connection {}", (Object)this.connection);
            this.connection.pauseReading();
        }
        if (bytesWaiting < 655360L) {
            log.trace("Resuming writing from connection {}", (Object)this.connection);
            this.connection.resumeReading();
        }
    }

    protected long getPendingBytes() {
        return this.pendingBytes.get();
    }

    protected void setPendingBytes(long val2) {
        this.pendingBytes.set(Math.max(val2, 0L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void append(Append append) {
        log.trace("Processing append received from client {}", (Object)append);
        UUID id = append.getWriterId();
        Object object = this.lock;
        synchronized (object) {
            Long lastEventNumber = this.latestEventNumbers.get(Pair.of((Object)append.getSegment(), (Object)id));
            Preconditions.checkState((lastEventNumber != null ? 1 : 0) != 0, (String)"Data from unexpected connection: %s.", (Object)id);
            Preconditions.checkState((append.getEventNumber() >= lastEventNumber ? 1 : 0) != 0, (Object)"Event was already appended.");
            this.waitingAppends.put((Object)id, (Object)append);
        }
        this.setPendingBytes(this.getPendingBytes() + (long)append.getData().readableBytes());
        this.pauseOrResumeReading();
        this.performNextWrite();
    }

    @SuppressFBWarnings(justification="generated code")
    public RequestProcessor getNextRequestProcessor() {
        return this.nextRequestProcessor;
    }
}

