/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.connection.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.connection.impl.ClientConnection;
import io.pravega.client.connection.impl.ConnectionFactory;
import io.pravega.client.connection.impl.Flow;
import io.pravega.client.connection.impl.FlowClientConnection;
import io.pravega.common.Exceptions;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.FailingReplyProcessor;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.Reply;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommands;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlowHandler
extends FailingReplyProcessor
implements AutoCloseable {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FlowHandler.class);
    private static final int FLOW_DISABLED = 0;
    private final PravegaNodeUri location;
    private ClientConnection channel;
    private final MetricNotifier metricNotifier;
    @VisibleForTesting
    private final KeepAliveTask keepAliveTask = new KeepAliveTask();
    private ScheduledFuture<?> keepAliveFuture;
    private final AtomicBoolean recentMessage = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);
    @VisibleForTesting
    private final ConcurrentHashMap<Integer, ReplyProcessor> flowIdReplyProcessorMap = new ConcurrentHashMap();
    private final AtomicBoolean disableFlow = new AtomicBoolean(false);

    private FlowHandler(PravegaNodeUri location, MetricNotifier updateMetric) {
        this.location = location;
        this.metricNotifier = updateMetric;
    }

    static CompletableFuture<FlowHandler> openConnection(PravegaNodeUri location, MetricNotifier updateMetric, ConnectionFactory connectionFactory) {
        FlowHandler flowHandler = new FlowHandler(location, updateMetric);
        return connectionFactory.establishConnection(location, flowHandler).thenApply(connection -> {
            flowHandler.channel = connection;
            flowHandler.keepAliveFuture = connectionFactory.getInternalExecutor().scheduleAtFixedRate(flowHandler.keepAliveTask, 20L, 10L, TimeUnit.SECONDS);
            try {
                connection.send(new WireCommands.Hello(11, 5));
            }
            catch (ConnectionFailedException e) {
                throw Exceptions.sneakyThrow(e);
            }
            return flowHandler;
        });
    }

    public ClientConnection createFlow(Flow flow, ReplyProcessor rp) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkState(!this.disableFlow.get(), "Ensure flows are enabled.");
        int flowID = flow.getFlowId();
        log.info("Creating Flow {} for endpoint {}. ", (Object)flow.getFlowId(), (Object)this.location);
        if (this.flowIdReplyProcessorMap.put(flowID, rp) != null) {
            throw new IllegalArgumentException("Multiple flows cannot be created with the same Flow id " + flowID);
        }
        return new FlowClientConnection(this.location.toString(), this.channel, flowID, this);
    }

    public ClientConnection createConnectionWithFlowDisabled(ReplyProcessor rp) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkState(!this.disableFlow.getAndSet(true), "Flows are disabled, incorrect usage pattern.");
        log.info("Creating a new connection with flow disabled for endpoint {}.", (Object)this.location);
        this.flowIdReplyProcessorMap.put(0, rp);
        return new FlowClientConnection(this.location.toString(), this.channel, 0, this);
    }

    void closeFlow(FlowClientConnection clientConnection) {
        int flow = clientConnection.getFlowId();
        log.info("Closing Flow {} for endpoint {}", (Object)flow, (Object)clientConnection.getConnectionName());
        this.flowIdReplyProcessorMap.remove(flow);
        if (flow == 0) {
            this.close();
        }
    }

    public int getOpenFlowCount() {
        return this.flowIdReplyProcessorMap.size();
    }

    void setRecentMessage() {
        this.recentMessage.set(true);
    }

    @Override
    public void process(Reply cmd) {
        if (log.isDebugEnabled()) {
            log.debug("{} processing reply {} with flow {}", new Object[]{this.location, cmd, Flow.from(cmd.getRequestId())});
        }
        this.setRecentMessage();
        if (cmd instanceof WireCommands.Hello) {
            this.flowIdReplyProcessorMap.forEach((flowId, rp) -> {
                try {
                    rp.hello((WireCommands.Hello)cmd);
                }
                catch (Exception e) {
                    log.warn("Encountered exception invoking ReplyProcessor.hello for flow id {}", flowId, (Object)e);
                }
            });
            return;
        }
        if (cmd instanceof WireCommands.KeepAlive) {
            return;
        }
        ReplyProcessor processor = this.getReplyProcessor(cmd);
        if (processor != null) {
            try {
                processor.process(cmd);
            }
            catch (Exception e) {
                log.warn("ReplyProcessor.process failed for reply {} due to {}", (Object)cmd, (Object)e.getMessage());
                processor.processingFailure(e);
            }
        } else if (cmd instanceof WireCommands.ReleasableCommand) {
            ((WireCommands.ReleasableCommand)((Object)cmd)).release();
        }
    }

    @Override
    public void errorMessage(WireCommands.ErrorMessage errorMessage) {
        log.info("Received an errorMessage containing an unhandled {} on segment {}", (Object)errorMessage.getErrorCode().getExceptionType().getSimpleName(), (Object)errorMessage.getSegment());
        this.processingFailure(errorMessage.getThrowableException());
    }

    @Override
    public void processingFailure(Exception error) {
        this.invokeProcessingFailureForAllFlows(error);
    }

    private void invokeProcessingFailureForAllFlows(Throwable cause) {
        this.flowIdReplyProcessorMap.forEach((flowId, rp) -> {
            try {
                log.debug("Exception observed for flow id {} due to {}", flowId, (Object)cause.getMessage());
                rp.processingFailure(new ConnectionFailedException(cause));
            }
            catch (Exception e) {
                log.warn("Encountered exception invoking ReplyProcessor.processingFailure for flow id {}", flowId, (Object)e);
            }
        });
    }

    @Override
    public void connectionDropped() {
        this.close();
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (this.keepAliveFuture != null) {
                this.keepAliveFuture.cancel(false);
            }
            log.info("Connection closed observed with endpoint {}", (Object)this.location);
            this.flowIdReplyProcessorMap.forEach((flowId, rp) -> {
                try {
                    log.debug("Connection dropped for flow id {}", flowId);
                    rp.connectionDropped();
                }
                catch (Exception e) {
                    log.warn("Encountered exception invoking ReplyProcessor for flow id {}", flowId, (Object)e);
                }
            });
            this.channel.close();
        }
    }

    public final boolean isClosed() {
        return this.closed.get();
    }

    private ReplyProcessor getReplyProcessor(Reply cmd) {
        int flowId = this.disableFlow.get() ? 0 : Flow.toFlowID(cmd.getRequestId());
        ReplyProcessor processor = this.flowIdReplyProcessorMap.get(flowId);
        if (processor == null) {
            log.warn("No ReplyProcessor found for the provided flowId {}. Ignoring response", (Object)flowId);
            if (cmd instanceof WireCommands.ReleasableCommand) {
                ((WireCommands.ReleasableCommand)((Object)cmd)).release();
            }
        }
        return processor;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public MetricNotifier getMetricNotifier() {
        return this.metricNotifier;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    KeepAliveTask getKeepAliveTask() {
        return this.keepAliveTask;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    ConcurrentHashMap<Integer, ReplyProcessor> getFlowIdReplyProcessorMap() {
        return this.flowIdReplyProcessorMap;
    }

    @VisibleForTesting
    final class KeepAliveTask
    implements Runnable {
        private final AtomicInteger concurrentlyRunning = new AtomicInteger(0);

        KeepAliveTask() {
        }

        @Override
        public void run() {
            try {
                if (!FlowHandler.this.recentMessage.getAndSet(false) && !FlowHandler.this.closed.get()) {
                    int running = this.concurrentlyRunning.getAndIncrement();
                    if (running > 0) {
                        this.handleError(new TimeoutException("KeepAliveTask: Connection write was blocked for too long."));
                    }
                    FlowHandler.this.channel.send(new WireCommands.KeepAlive());
                    this.concurrentlyRunning.decrementAndGet();
                }
            }
            catch (Exception e) {
                this.handleError(e);
            }
        }

        @VisibleForTesting
        void handleError(Exception e) {
            log.warn("Failed to send KeepAlive to {}. Closing this connection.", (Object)FlowHandler.this.location, (Object)e);
            FlowHandler.this.close();
        }
    }
}

