/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.testserver.websocket;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.fluxcapacitor.common.Backlog;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.BooleanResult;
import io.fluxcapacitor.common.api.ClientEvent;
import io.fluxcapacitor.common.api.Command;
import io.fluxcapacitor.common.api.ConnectEvent;
import io.fluxcapacitor.common.api.DisconnectEvent;
import io.fluxcapacitor.common.api.JsonType;
import io.fluxcapacitor.common.api.QueryResult;
import io.fluxcapacitor.common.api.Request;
import io.fluxcapacitor.common.api.RequestBatch;
import io.fluxcapacitor.common.api.ResultBatch;
import io.fluxcapacitor.common.api.StringResult;
import io.fluxcapacitor.common.api.VoidResult;
import io.fluxcapacitor.common.handling.Handler;
import io.fluxcapacitor.common.handling.HandlerInspector;
import io.fluxcapacitor.common.handling.ParameterResolver;
import io.fluxcapacitor.common.serialization.NullCollectionsAsEmptyModule;
import io.fluxcapacitor.common.serialization.compression.CompressionAlgorithm;
import io.fluxcapacitor.common.serialization.compression.CompressionUtils;
import io.fluxcapacitor.testserver.metrics.MetricsLog;
import io.fluxcapacitor.testserver.metrics.NoOpMetricsLog;
import io.fluxcapacitor.testserver.websocket.Handle;
import io.undertow.util.SameThreadExecutor;
import jakarta.annotation.Nullable;
import jakarta.websocket.CloseReason;
import jakarta.websocket.Endpoint;
import jakarta.websocket.EndpointConfig;
import jakarta.websocket.Session;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Parameter;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class WebsocketEndpoint
extends Endpoint {
    private static final Logger log = LoggerFactory.getLogger(WebsocketEndpoint.class);
    private static final ObjectMapper defaultObjectMapper = ((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().findAndAddModules()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)).disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)).addModule(new NullCollectionsAsEmptyModule())).enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)).build();
    MetricsLog metricsLog = new NoOpMetricsLog();
    private final ObjectMapper objectMapper;
    private final Executor requestExecutor;
    private final Map<String, SessionBacklog> sessionBacklogs = new ConcurrentHashMap<String, SessionBacklog>();
    protected final AtomicBoolean shuttingDown = new AtomicBoolean();
    protected volatile boolean shutDown;
    private final Handler<ClientMessage> handler = HandlerInspector.createHandler((Object)this, Handle.class, Arrays.asList(new ParameterResolver<ClientMessage>(this){

        @Override
        public Function<ClientMessage, Object> resolve(Parameter p, Annotation a) {
            if (Objects.equals(p.getDeclaringExecutable().getParameters()[0], p)) {
                return ClientMessage::getPayload;
            }
            return null;
        }

        @Override
        public boolean determinesSpecificity() {
            return true;
        }
    }, (p, a) -> {
        if (p.getType().equals(Session.class)) {
            return ClientMessage::getSession;
        }
        return null;
    }));

    protected WebsocketEndpoint() {
        this.objectMapper = defaultObjectMapper;
        this.requestExecutor = Executors.newFixedThreadPool(64, ObjectUtils.newThreadFactory(this.getClass().getSimpleName()));
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown, ObjectUtils.newThreadName(this.getClass().getSimpleName() + "-shutdown")));
    }

    protected WebsocketEndpoint(@Nullable Executor requestExecutor) {
        this.objectMapper = defaultObjectMapper;
        this.requestExecutor = Optional.ofNullable(requestExecutor).orElse(SameThreadExecutor.INSTANCE);
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown, ObjectUtils.newThreadName(this.getClass().getSimpleName() + "-shutdown")));
    }

    @Override
    public void onOpen(Session session, EndpointConfig config) {
        if (this.shuttingDown.get()) {
            throw new IllegalStateException("Cannot accept client. Endpoint is shutting down");
        }
        this.sessionBacklogs.put(session.getId(), new SessionBacklog(Backlog.forConsumer(results -> this.sendResultBatch(session, (List<QueryResult>)results)), session));
        session.addMessageHandler(byte[].class, bytes -> {
            Runnable task = () -> {
                try {
                    JsonType request = this.deserializeRequest(session, (byte[])bytes);
                    if (this.shutDown) {
                        throw new IllegalStateException(String.format("Rejecting request %s from client %s with id %s because the service is shutting down", request, this.getClientName(session), this.getClientId(session)));
                    }
                    if (this.shuttingDown.get()) {
                        log.info("Silently ignoring request {} from client {} with id {} because the service is shutting down", request, this.getClientName(session), this.getClientId(session));
                        return;
                    }
                    this.handleMessage(session, request);
                }
                catch (Throwable e) {
                    log.error("Failed to handle request", e);
                }
            };
            this.requestExecutor.execute(task);
        });
        this.registerMetrics(new ConnectEvent(this.getClientName(session), this.getClientId(session), session.getId(), this.toString()));
    }

    protected JsonType deserializeRequest(Session session, byte[] bytes) {
        return this.objectMapper.readValue(CompressionUtils.decompress(bytes, this.getCompressionAlgorithm(session)), JsonType.class);
    }

    protected void handleMessage(Session session, JsonType message) {
        if (message instanceof RequestBatch) {
            RequestBatch batch = (RequestBatch)message;
            this.createTasks(batch, session).forEach(this.requestExecutor::execute);
        } else {
            try {
                Object result = this.handler.getInvoker(new ClientMessage(message, session)).orElseThrow().invoke();
                this.trySendResult(session, message, result);
            }
            catch (Throwable e) {
                log.error("Could not handle request {}", (Object)message, (Object)e);
            }
        }
    }

    private void trySendResult(Session session, JsonType message, Object result) {
        Command command;
        Request request;
        if (message instanceof Request && (!((request = (Request)message) instanceof Command) || (command = (Command)request).getGuarantee().compareTo(Guarantee.STORED) >= 0)) {
            if (result instanceof QueryResult) {
                QueryResult response = (QueryResult)result;
                this.doSendResult(session, response);
            } else if (result == null) {
                if (request instanceof Command) {
                    this.doSendResult(session, new VoidResult(request.getRequestId()));
                }
            } else if (result instanceof Boolean) {
                Boolean v = (Boolean)result;
                this.doSendResult(session, new BooleanResult(request.getRequestId(), v));
            } else if (result instanceof String) {
                String v = (String)result;
                this.doSendResult(session, new StringResult(request.getRequestId(), v));
            } else if (result instanceof CompletableFuture) {
                CompletableFuture future = (CompletableFuture)result;
                future.whenComplete((r, e) -> {
                    if (e != null) {
                        log.error("Request {} failed. Not sending back result to client.", (Object)message, e);
                    } else {
                        this.trySendResult(session, message, r);
                    }
                });
            } else {
                log.warn("Not able to send back result of type {} to client. Contents: {}. Request: {}", result.getClass(), result, request);
            }
        }
    }

    protected void doSendResult(Session session, QueryResult result) {
        Optional.ofNullable(this.sessionBacklogs.get(session.getId())).or(() -> this.findAlternativeBacklog(session)).ifPresentOrElse(backlog -> backlog.add(result), () -> log.info("Not sending result {}. Could not find any suitable sessions for client {}.", (Object)result, (Object)this.getClientId(session)));
    }

    protected Stream<Runnable> createTasks(RequestBatch<?> batch, Session session) {
        return batch.getRequests().stream().map(r -> () -> this.handleMessage(session, (JsonType)r));
    }

    protected void sendResultBatch(Session session, List<QueryResult> results) {
        block11: {
            try {
                JsonType result;
                JsonType jsonType = result = results.size() == 1 ? (JsonType)results.get(0) : new ResultBatch(results);
                if (session.isOpen()) {
                    try (OutputStream outputStream = session.getBasicRemote().getSendStream();){
                        byte[] bytes = this.objectMapper.writeValueAsBytes(result);
                        outputStream.write(CompressionUtils.compress(bytes, this.getCompressionAlgorithm(session)));
                        break block11;
                    }
                    catch (Exception e) {
                        log.error("Failed to send websocket result to client {}, id {}", this.getClientName(session), this.getClientId(session), e);
                    }
                    break block11;
                }
                this.findAlternativeBacklog(session).ifPresentOrElse(b -> b.add(results), () -> log.info("Not sending batch of {}. Could not find any suitable sessions for client {}.", (Object)results.size(), (Object)this.getClientId(session)));
            }
            catch (Throwable e) {
                log.error("Failed to send websocket result to client {}, id {}", this.getClientName(session), this.getClientId(session), e);
                throw e;
            }
        }
    }

    protected Optional<SessionBacklog> findAlternativeBacklog(Session closedSession) {
        String clientId = this.getClientId(closedSession);
        return this.sessionBacklogs.values().stream().filter(b -> clientId.equals(this.getClientId(b.getSession())) && !closedSession.getId().equals(b.getSession().getId())).findFirst();
    }

    @Override
    public void onClose(Session session, CloseReason closeReason) {
        this.sessionBacklogs.remove(session.getId());
        if (!this.shuttingDown.get()) {
            if (closeReason.getCloseCode() != CloseReason.CloseCodes.UNEXPECTED_CONDITION && closeReason.getCloseCode().getCode() > CloseReason.CloseCodes.NO_STATUS_CODE.getCode()) {
                log.warn("Websocket session to endpoint {} for client {} with id {} closed abnormally: {}", this.getClass().getSimpleName(), this.getClientName(session), this.getClientId(session), closeReason);
            }
            this.registerMetrics(new DisconnectEvent(this.getClientName(session), this.getClientId(session), session.getId(), this.toString(), closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase()));
        }
    }

    @Override
    public void onError(Session session, Throwable e) {
        log.error("Error in session for client {} with id {}", this.getClientName(session), this.getClientId(session), e);
        try {
            session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "The websocket closed because of an error"));
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    protected void shutDown() {
        if (this.shuttingDown.compareAndSet(false, true)) {
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            finally {
                this.shutDown = true;
                this.sessionBacklogs.values().stream().map(SessionBacklog::getSession).filter(Session::isOpen).forEach(s -> {
                    try {
                        s.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                });
            }
        }
    }

    protected CompressionAlgorithm getCompressionAlgorithm(Session session) {
        List<String> compression = session.getRequestParameterMap().get("compression");
        if (compression == null) {
            return null;
        }
        return CompressionAlgorithm.valueOf(compression.get(0));
    }

    protected String getProjectId(Session session) {
        return Optional.ofNullable(session.getRequestParameterMap().get("projectId")).map(list -> (String)list.get(0)).orElse("public");
    }

    protected String getClientId(Session session) {
        return session.getRequestParameterMap().get("clientId").get(0);
    }

    protected String getClientName(Session session) {
        return session.getRequestParameterMap().get("clientName").get(0);
    }

    protected void registerMetrics(ClientEvent event) {
        this.metricsLog.registerMetrics(event);
    }

    public WebsocketEndpoint metricsLog(MetricsLog metricsLog) {
        this.metricsLog = metricsLog;
        return this;
    }

    protected ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }

    protected static final class SessionBacklog {
        private final Backlog<QueryResult> delegate;
        private final Session session;

        @ConstructorProperties(value={"delegate", "session"})
        public SessionBacklog(Backlog<QueryResult> delegate, Session session) {
            this.delegate = delegate;
            this.session = session;
        }

        public Backlog<QueryResult> getDelegate() {
            return this.delegate;
        }

        public Session getSession() {
            return this.session;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SessionBacklog)) {
                return false;
            }
            SessionBacklog other = (SessionBacklog)o;
            Backlog<QueryResult> this$delegate = this.getDelegate();
            Backlog<QueryResult> other$delegate = other.getDelegate();
            if (this$delegate == null ? other$delegate != null : !this$delegate.equals(other$delegate)) {
                return false;
            }
            Session this$session = this.getSession();
            Session other$session = other.getSession();
            return !(this$session == null ? other$session != null : !this$session.equals(other$session));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Backlog<QueryResult> $delegate = this.getDelegate();
            result = result * 59 + ($delegate == null ? 43 : $delegate.hashCode());
            Session $session = this.getSession();
            result = result * 59 + ($session == null ? 43 : $session.hashCode());
            return result;
        }

        public String toString() {
            return "WebsocketEndpoint.SessionBacklog(delegate=" + String.valueOf(this.getDelegate()) + ", session=" + String.valueOf(this.getSession()) + ")";
        }

        public CompletableFuture<Void> add(QueryResult ... values2) {
            return this.getDelegate().add((QueryResult[])values2);
        }

        public CompletableFuture<Void> add(Collection<? extends QueryResult> values2) {
            return this.getDelegate().add(values2);
        }

        public Registration registerMonitor(Consumer<List<QueryResult>> monitor) {
            return this.getDelegate().registerMonitor(monitor);
        }

        public void shutDown() {
            this.getDelegate().shutDown();
        }
    }

    protected static final class ClientMessage {
        private final JsonType payload;
        private final Session session;

        @ConstructorProperties(value={"payload", "session"})
        public ClientMessage(JsonType payload, Session session) {
            this.payload = payload;
            this.session = session;
        }

        public JsonType getPayload() {
            return this.payload;
        }

        public Session getSession() {
            return this.session;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ClientMessage)) {
                return false;
            }
            ClientMessage other = (ClientMessage)o;
            JsonType this$payload = this.getPayload();
            JsonType other$payload = other.getPayload();
            if (this$payload == null ? other$payload != null : !this$payload.equals(other$payload)) {
                return false;
            }
            Session this$session = this.getSession();
            Session other$session = other.getSession();
            return !(this$session == null ? other$session != null : !this$session.equals(other$session));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            JsonType $payload = this.getPayload();
            result = result * 59 + ($payload == null ? 43 : $payload.hashCode());
            Session $session = this.getSession();
            result = result * 59 + ($session == null ? 43 : $session.hashCode());
            return result;
        }

        public String toString() {
            return "WebsocketEndpoint.ClientMessage(payload=" + String.valueOf(this.getPayload()) + ", session=" + String.valueOf(this.getSession()) + ")";
        }
    }
}

