/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.CleanableResponseHandler;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Strings;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CloseableConnection;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

public abstract class DisruptableMockTransport
extends MockTransport {
    private final DiscoveryNode localNode;
    private static final Logger logger = LogManager.getLogger(DisruptableMockTransport.class);
    private final DeterministicTaskQueue deterministicTaskQueue;
    private final List<Runnable> blackholedRequests = new ArrayList<Runnable>();
    private final Set<String> blockedActions = new HashSet<String>();

    public DisruptableMockTransport(DiscoveryNode localNode, DeterministicTaskQueue deterministicTaskQueue) {
        this.localNode = localNode;
        this.deterministicTaskQueue = deterministicTaskQueue;
    }

    protected abstract ConnectionStatus getConnectionStatus(DiscoveryNode var1);

    protected abstract Optional<DisruptableMockTransport> getDisruptableMockTransport(TransportAddress var1);

    protected abstract void execute(Runnable var1);

    public DiscoveryNode getLocalNode() {
        return this.localNode;
    }

    @Override
    public TransportService createTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor, Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
        return new TransportService(settings, (Transport)this, threadPool, interceptor, localNodeFactory, clusterSettings, new TaskManager(settings, threadPool, taskHeaders), Tracer.NOOP);
    }

    @Override
    public void openConnection(final DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener) {
        Optional<DisruptableMockTransport> optionalMatchingTransport = this.getDisruptableMockTransport(node.getAddress());
        if (optionalMatchingTransport.isPresent()) {
            final DisruptableMockTransport matchingTransport = optionalMatchingTransport.get();
            ConnectionStatus connectionStatus = this.getConnectionStatus(matchingTransport.getLocalNode());
            if (connectionStatus != ConnectionStatus.CONNECTED) {
                listener.onFailure((Exception)((Object)new ConnectTransportException(node, "node [" + String.valueOf(node) + "] is [" + String.valueOf((Object)connectionStatus) + "] not [CONNECTED]")));
            } else {
                listener.onResponse((Object)new CloseableConnection(){

                    public DiscoveryNode getNode() {
                        return node;
                    }

                    public TransportVersion getTransportVersion() {
                        return TransportVersion.current();
                    }

                    public void sendRequest(final long requestId, final String action, TransportRequest request, TransportRequestOptions options) throws TransportException {
                        if (DisruptableMockTransport.this.blockedActions.contains(action)) {
                            DisruptableMockTransport.this.execute(new RebootSensitiveRunnable(){

                                @Override
                                public void ifRebooted() {
                                    DisruptableMockTransport.this.cleanupResponseHandler(requestId);
                                }

                                @Override
                                public void run() {
                                    DisruptableMockTransport.this.handleError(requestId, (TransportException)new RemoteTransportException(node.getName(), node.getAddress(), action, (Throwable)new ElasticsearchException("action [" + action + "] is blocked", new Object[0])));
                                }

                                public String toString() {
                                    return "error response delivery for blocked action [" + action + "] on node [" + String.valueOf(node) + "]";
                                }
                            });
                        } else {
                            DisruptableMockTransport.this.onSendRequest(requestId, action, request, options, matchingTransport);
                        }
                    }
                });
            }
        } else {
            listener.onFailure((Exception)((Object)new ConnectTransportException(node, "node " + String.valueOf(node) + " does not exist")));
        }
    }

    protected void onSendRequest(final long requestId, final String action, final TransportRequest request, TransportRequestOptions options, final DisruptableMockTransport destinationTransport) {
        assert (!destinationTransport.getLocalNode().equals((Object)this.getLocalNode())) : "non-local message from " + String.valueOf(this.getLocalNode()) + " to itself";
        request.mustIncRef();
        destinationTransport.execute(new RebootSensitiveRunnable(){

            /*
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            @Override
            public void run() {
                try {
                    ConnectionStatus connectionStatus = DisruptableMockTransport.this.getConnectionStatus(destinationTransport.getLocalNode());
                    switch (connectionStatus.ordinal()) {
                        case 2: 
                        case 3: {
                            DisruptableMockTransport.this.onBlackholedDuringSend(requestId, action, destinationTransport);
                            return;
                        }
                        case 1: {
                            DisruptableMockTransport.this.onDisconnectedDuringSend(requestId, action, destinationTransport);
                            return;
                        }
                        case 0: {
                            DisruptableMockTransport.this.onConnectedDuringSend(requestId, action, request, destinationTransport);
                            return;
                        }
                        default: {
                            throw new AssertionError((Object)("unexpected status: " + String.valueOf((Object)connectionStatus)));
                        }
                    }
                }
                finally {
                    request.decRef();
                }
            }

            @Override
            public void ifRebooted() {
                request.decRef();
                DisruptableMockTransport.this.execute(new RebootSensitiveRunnable(){

                    @Override
                    public void ifRebooted() {
                        DisruptableMockTransport.this.cleanupResponseHandler(requestId);
                    }

                    @Override
                    public void run() {
                        DisruptableMockTransport.this.handleRemoteError(requestId, new NodeNotConnectedException(destinationTransport.getLocalNode(), "node rebooted"));
                    }

                    public String toString() {
                        return "error response (reboot) to " + this.internalToString();
                    }
                });
            }

            public String toString() {
                return this.internalToString();
            }

            private String internalToString() {
                return DisruptableMockTransport.this.getRequestDescription(requestId, action, destinationTransport.getLocalNode());
            }
        });
    }

    private Runnable getDisconnectException(final long requestId, final String action, final DiscoveryNode destination) {
        return new RebootSensitiveRunnable(){

            @Override
            public void ifRebooted() {
                DisruptableMockTransport.this.cleanupResponseHandler(requestId);
            }

            @Override
            public void run() {
                DisruptableMockTransport.this.handleError(requestId, (TransportException)new ConnectTransportException(destination, "disconnected"));
            }

            public String toString() {
                return "disconnection response to " + DisruptableMockTransport.this.getRequestDescription(requestId, action, destination);
            }
        };
    }

    private String getRequestDescription(long requestId, String action, DiscoveryNode destination) {
        return Strings.format((String)"[%s][%s] from %s to %s", (Object[])new Object[]{requestId, action, this.getLocalNode(), destination});
    }

    private void onBlackholedDuringSend(final long requestId, final String action, final DisruptableMockTransport destinationTransport) {
        logger.trace("dropping {}", (Object)this.getRequestDescription(requestId, action, destinationTransport.getLocalNode()));
        this.blackholedRequests.add(new Runnable(){

            @Override
            public void run() {
                DisruptableMockTransport.this.onDisconnectedDuringSend(requestId, action, destinationTransport);
            }

            public String toString() {
                return "deferred handling of dropped " + DisruptableMockTransport.this.getRequestDescription(requestId, action, destinationTransport.getLocalNode());
            }
        });
    }

    private void onDisconnectedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) {
        this.execute(this.getDisconnectException(requestId, action, destinationTransport.getLocalNode()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onConnectedDuringSend(final long requestId, final String action, TransportRequest request, final DisruptableMockTransport destinationTransport) {
        TransportRequest copiedRequest;
        RequestHandlerRegistry requestHandler = destinationTransport.getRequestHandlers().getHandler(action);
        DiscoveryNode destination = destinationTransport.getLocalNode();
        final String requestDescription = this.getRequestDescription(requestId, action, destination);
        TransportChannel transportChannel = new TransportChannel(){

            public String getProfileName() {
                return "default";
            }

            public void sendResponse(final TransportResponse response) {
                response.mustIncRef();
                final Releasable releasable = Releasables.assertOnce(() -> ((TransportResponse)response).decRef());
                DisruptableMockTransport.this.execute(new RebootSensitiveRunnable(){

                    @Override
                    public void ifRebooted() {
                        try (Releasable releasable2 = releasable;){
                            DisruptableMockTransport.this.cleanupResponseHandler(requestId);
                        }
                    }

                    /*
                     * Enabled force condition propagation
                     * Lifted jumps to return sites
                     */
                    @Override
                    public void run() {
                        try (Releasable releasable2 = releasable;){
                            ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(DisruptableMockTransport.this.getLocalNode());
                            switch (connectionStatus.ordinal()) {
                                case 0: 
                                case 3: {
                                    DisruptableMockTransport.this.handleResponse(requestId, response);
                                    return;
                                }
                                case 1: 
                                case 2: {
                                    logger.trace("delaying response to {}: channel is {}", (Object)requestDescription, (Object)connectionStatus);
                                    DisruptableMockTransport.this.onBlackholedDuringSend(requestId, action, destinationTransport);
                                    return;
                                }
                                default: {
                                    throw new AssertionError((Object)("unexpected status: " + String.valueOf((Object)connectionStatus)));
                                }
                            }
                        }
                    }

                    public String toString() {
                        return "response to " + requestDescription;
                    }
                });
            }

            public void sendResponse(final Exception exception) {
                DisruptableMockTransport.this.execute(new RebootSensitiveRunnable(){

                    @Override
                    public void ifRebooted() {
                        DisruptableMockTransport.this.cleanupResponseHandler(requestId);
                    }

                    @Override
                    public void run() {
                        ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(DisruptableMockTransport.this.getLocalNode());
                        switch (connectionStatus.ordinal()) {
                            case 0: 
                            case 3: {
                                DisruptableMockTransport.this.handleRemoteError(requestId, exception);
                                break;
                            }
                            case 1: 
                            case 2: {
                                logger.trace("delaying exception response to {}: channel is {}", (Object)requestDescription, (Object)connectionStatus);
                                DisruptableMockTransport.this.onBlackholedDuringSend(requestId, action, destinationTransport);
                                break;
                            }
                            default: {
                                throw new AssertionError((Object)("unexpected status: " + String.valueOf((Object)connectionStatus)));
                            }
                        }
                    }

                    public String toString() {
                        return "error response to " + requestDescription;
                    }
                });
            }
        };
        try {
            copiedRequest = ESTestCase.copyWriteable(request, this.writeableRegistry(), arg_0 -> ((RequestHandlerRegistry)requestHandler).newRequest(arg_0));
        }
        catch (IOException e) {
            throw new AssertionError("exception de/serializing request", e);
        }
        try {
            requestHandler.processMessageReceived(copiedRequest, transportChannel);
        }
        catch (Exception e) {
            try {
                transportChannel.sendResponse(e);
            }
            catch (Exception ee) {
                logger.warn("failed to send failure", (Throwable)e);
            }
        }
        finally {
            copiedRequest.decRef();
        }
    }

    private void cleanupResponseHandler(long requestId) {
        TransportResponseHandler handler = this.getTransportResponseHandler(requestId);
        while (handler instanceof TransportService.ContextRestoreResponseHandler) {
            TransportService.ContextRestoreResponseHandler contextRestoreHandler = (TransportService.ContextRestoreResponseHandler)handler;
            handler = contextRestoreHandler.unwrap();
        }
        if (handler instanceof CleanableResponseHandler) {
            CleanableResponseHandler cleanableResponseHandler = (CleanableResponseHandler)handler;
            cleanableResponseHandler.runCleanup();
        }
    }

    public boolean deliverBlackholedRequests() {
        if (this.blackholedRequests.isEmpty()) {
            return false;
        }
        this.blackholedRequests.forEach(this.deterministicTaskQueue::scheduleNow);
        this.blackholedRequests.clear();
        return true;
    }

    public void addActionBlock(String action) {
        this.blockedActions.add(action);
    }

    public void clearActionBlocks() {
        this.blockedActions.clear();
    }

    public static enum ConnectionStatus {
        CONNECTED,
        DISCONNECTED,
        BLACK_HOLE,
        BLACK_HOLE_REQUESTS_ONLY;

    }

    public static interface RebootSensitiveRunnable
    extends Runnable {
        public void ifRebooted();
    }
}

