/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.common.websocket;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.fluxcapacitor.common.Backlog;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.RetryConfiguration;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.Command;
import io.fluxcapacitor.common.api.JsonType;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.QueryResult;
import io.fluxcapacitor.common.api.Request;
import io.fluxcapacitor.common.api.RequestBatch;
import io.fluxcapacitor.common.api.ResultBatch;
import io.fluxcapacitor.common.serialization.compression.CompressionUtils;
import io.fluxcapacitor.common.tracking.InMemoryTaskScheduler;
import io.fluxcapacitor.common.tracking.TaskScheduler;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.common.serialization.jackson.JacksonSerializer;
import io.fluxcapacitor.javaclient.common.websocket.SessionPool;
import io.fluxcapacitor.javaclient.configuration.client.WebSocketClient;
import io.fluxcapacitor.javaclient.publishing.AdhocDispatchInterceptor;
import io.fluxcapacitor.javaclient.publishing.DispatchInterceptor;
import jakarta.websocket.CloseReason;
import jakarta.websocket.ContainerProvider;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.PongMessage;
import jakarta.websocket.Session;
import jakarta.websocket.WebSocketContainer;
import java.beans.ConstructorProperties;
import java.io.OutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWebsocketClient
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(AbstractWebsocketClient.class);
    public static WebSocketContainer defaultWebSocketContainer = ContainerProvider.getWebSocketContainer();
    public static ObjectMapper defaultObjectMapper = ((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)).findAndAddModules()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)).build();
    private final SessionPool sessionPool;
    private final WebSocketClient client;
    private final WebSocketClient.ClientConfig clientConfig;
    private final ObjectMapper objectMapper;
    private final Map<Long, WebSocketRequest> requests = new ConcurrentHashMap<Long, WebSocketRequest>();
    private final Map<String, Backlog<Request>> sessionBacklogs = new ConcurrentHashMap<String, Backlog<Request>>();
    private final TaskScheduler pingScheduler;
    private final Map<String, PingRegistration> pingDeadlines = new ConcurrentHashMap<String, PingRegistration>();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final ExecutorService resultExecutor;
    private final boolean allowMetrics;
    private final AtomicReference<Object> fallbackSerializer = new AtomicReference();

    public AbstractWebsocketClient(URI endpointUri, WebSocketClient client, boolean allowMetrics) {
        this(endpointUri, client, allowMetrics, 1);
    }

    public AbstractWebsocketClient(URI endpointUri, WebSocketClient client, boolean allowMetrics, int numberOfSessions) {
        this(defaultWebSocketContainer, endpointUri, client, allowMetrics, Duration.ofSeconds(1L), defaultObjectMapper, numberOfSessions);
    }

    public AbstractWebsocketClient(WebSocketContainer container, URI endpointUri, WebSocketClient client, boolean allowMetrics, Duration reconnectDelay, ObjectMapper objectMapper, int numberOfSessions) {
        this.client = client;
        this.clientConfig = client.getClientConfig();
        this.objectMapper = objectMapper;
        this.allowMetrics = allowMetrics;
        this.pingScheduler = new InMemoryTaskScheduler(String.valueOf(this) + "-pingScheduler");
        this.resultExecutor = Executors.newFixedThreadPool(8, ObjectUtils.newThreadFactory(String.valueOf(this) + "-onMessage"));
        this.sessionPool = new SessionPool(numberOfSessions, () -> TimingUtils.retryOnFailure(() -> container.connectToServer(this, endpointUri), RetryConfiguration.builder().delay(reconnectDelay).errorTest(e -> !this.closed.get()).successLogger(s -> log.info("Successfully reconnected to endpoint {}", (Object)endpointUri)).exceptionLogger(status -> {
            if (status.getNumberOfTimesRetried() == 0) {
                log.warn("Failed to connect to endpoint {}; reason: {}. Retrying every {} ms...", endpointUri, status.getException().getMessage(), status.getRetryConfiguration().getDelay().toMillis());
            } else if (status.getNumberOfTimesRetried() % 100 == 0) {
                log.warn("Still trying to connect to endpoint {}. Last error: {}.", (Object)endpointUri, (Object)status.getException().getMessage());
            }
        }).build()));
    }

    protected <R extends QueryResult> CompletableFuture<R> send(Request request) {
        return new WebSocketRequest(request, FluxCapacitor.currentCorrelationData(), AdhocDispatchInterceptor.getAdhocInterceptor(MessageType.METRICS).orElse(null), FluxCapacitor.getOptionally().orElse(null)).send();
    }

    protected <R extends QueryResult> R sendAndWait(Request request) {
        return (R)((QueryResult)this.send(request).get());
    }

    protected CompletableFuture<Void> sendCommand(Command command) {
        return switch (command.getGuarantee()) {
            case Guarantee.NONE -> {
                this.sendAndForget(command);
                yield CompletableFuture.completedFuture(null);
            }
            case Guarantee.SENT -> this.sendAndForget(command);
            default -> this.send(command).thenApply(r -> null);
        };
    }

    private CompletableFuture<Void> sendAndForget(Command object) {
        return this.send(object, FluxCapacitor.currentCorrelationData(), this.sessionPool.get(object.routingKey()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> send(Request request, Map<String, String> correlationData, Session session) {
        try {
            CompletableFuture<Void> completableFuture = this.sessionBacklogs.computeIfAbsent(session.getId(), id -> Backlog.forConsumer(batch -> this.sendBatch((List<Request>)batch, session))).add(request);
            this.tryPublishMetrics(request, this.metricsMetadata().with(correlationData).with((Object)"sessionId", (Object)session.getId()).with((Object)"requestId", (Object)request.getRequestId()));
            return completableFuture;
        }
        catch (Throwable throwable) {
            this.tryPublishMetrics(request, this.metricsMetadata().with(correlationData).with((Object)"sessionId", (Object)session.getId()).with((Object)"requestId", (Object)request.getRequestId()));
            throw throwable;
        }
    }

    private void sendBatch(List<Request> requests, Session session) {
        block12: {
            JsonType object = requests.size() == 1 ? (JsonType)requests.getFirst() : new RequestBatch<Request>(requests);
            try (OutputStream outputStream = session.getBasicRemote().getSendStream();){
                byte[] bytes = this.objectMapper.writeValueAsBytes(object);
                if (session.isOpen()) {
                    outputStream.write(CompressionUtils.compress(bytes, this.clientConfig.getCompression()));
                }
            }
            catch (Exception e) {
                log.error("Failed to send request {}", (Object)object, (Object)e);
                if (Optional.ofNullable(e.getMessage()).map(m -> m.contains("Channel is closed")).orElse(false).booleanValue()) {
                    this.abort(session);
                    break block12;
                }
                throw e;
            }
        }
    }

    @OnMessage
    public void onMessage(byte[] bytes, Session session) {
        this.resultExecutor.execute(() -> {
            JsonType value;
            try {
                value = this.objectMapper.readValue(CompressionUtils.decompress(bytes, this.clientConfig.getCompression()), JsonType.class);
            }
            catch (Exception e) {
                log.error("Could not parse input. Expected a Json message.", e);
                return;
            }
            if (value instanceof ResultBatch) {
                String batchId = FluxCapacitor.generateId();
                ((ResultBatch)value).getResults().forEach(r -> this.resultExecutor.execute(() -> this.handleResult((QueryResult)r, batchId)));
            } else {
                WebSocketRequest webSocketRequest = this.requests.get(((QueryResult)value).getRequestId());
                if (webSocketRequest == null) {
                    log.warn("Could not find outstanding read request for id {} (session {})", (Object)((QueryResult)value).getRequestId(), (Object)session.getId());
                }
                this.handleResult((QueryResult)value, null);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleResult(QueryResult result, String batchId) {
        block6: {
            try {
                WebSocketRequest webSocketRequest = this.requests.remove(result.getRequestId());
                if (webSocketRequest == null) {
                    log.warn("Could not find outstanding read request for id {}", (Object)result.getRequestId());
                    break block6;
                }
                try {
                    Metadata metadata2 = this.metricsMetadata().with("requestId", webSocketRequest.request.getRequestId(), "msDuration", System.currentTimeMillis() - webSocketRequest.sendTimestamp).with(webSocketRequest.correlationData).with((Object)"batchId", (Object)batchId);
                    FluxCapacitor.getOptionally().or(() -> Optional.ofNullable(webSocketRequest.fluxCapacitor)).ifPresent(fc -> fc.execute(f -> Optional.ofNullable(webSocketRequest.adhocMetricsInterceptor).ifPresentOrElse(i -> AdhocDispatchInterceptor.runWithAdhocInterceptor(() -> this.tryPublishMetrics(result, metadata2), i, MessageType.METRICS), () -> this.tryPublishMetrics(result, metadata2))));
                }
                finally {
                    webSocketRequest.result.complete(result);
                }
            }
            catch (Throwable e) {
                log.error("Failed to handle result {}", (Object)result, (Object)e);
            }
        }
    }

    @OnOpen
    public void onOpen(Session session) {
        this.schedulePing(session);
    }

    protected PingRegistration schedulePing(Session session) {
        return this.pingDeadlines.compute(session.getId(), (k, v) -> {
            if (v != null) {
                v.cancel();
            }
            return !this.closed.get() ? new PingRegistration(this.pingScheduler.schedule(this.clientConfig.getPingDelay(), () -> this.sendPing(session))) : null;
        });
    }

    protected void sendPing(Session session) {
        if (!this.closed.get() && session.isOpen()) {
            PingRegistration registration = this.pingDeadlines.compute(session.getId(), (k, v) -> {
                if (v != null) {
                    v.cancel();
                }
                return new PingRegistration(this.pingScheduler.schedule(this.clientConfig.getPingTimeout(), () -> {
                    log.warn("Failed to get a ping response in time for session {}. Resetting connection", (Object)session.getId());
                    this.abort(session);
                }));
            });
            try {
                session.getBasicRemote().sendPing(ByteBuffer.wrap(registration.getId().getBytes()));
            }
            catch (Exception e) {
                log.warn("Failed to send ping message", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void abort(Session session) {
        CloseReason reason = new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, null);
        try {
            this.onClose(session, reason);
        }
        finally {
            try {
                session.close(reason);
            }
            catch (Throwable throwable) {}
        }
    }

    @OnMessage
    public void onPong(PongMessage message, Session session) {
        this.pingDeadlines.compute(session.getId(), (k, v) -> {
            if (v == null) {
                return v;
            }
            v.cancel();
            return this.schedulePing(session);
        });
    }

    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        this.sessionBacklogs.remove(session.getId());
        Optional.ofNullable(this.pingDeadlines.remove(session.getId())).ifPresent(PingRegistration::cancel);
        if (closeReason.getCloseCode().getCode() > CloseReason.CloseCodes.NO_STATUS_CODE.getCode()) {
            log.warn("Connection to endpoint {} closed with reason {}", (Object)session.getRequestURI(), (Object)closeReason);
        }
        this.retryOutstandingRequests(session.getId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void retryOutstandingRequests(String sessionId) {
        if (!this.closed.get() && !this.requests.isEmpty()) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Thread interrupted while trying to retry outstanding requests", e);
            }
            String string = sessionId.intern();
            synchronized (string) {
                this.requests.values().stream().filter(r -> sessionId.equals(r.sessionId)).forEach(r -> {
                    log.info("Retrying request {} using a new session (old session {})", (Object)r.request.getRequestId(), (Object)sessionId);
                    r.send();
                });
            }
        }
    }

    @OnError
    public void onError(Session session, Throwable e) {
        log.error("Client side error for web socket connected to endpoint {}", (Object)session.getRequestURI(), (Object)e);
    }

    @Override
    public void close() {
        this.close(false);
    }

    public String toString() {
        return this.getClass().getSimpleName();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void close(boolean clearOutstandingRequests) {
        if (this.closed.compareAndSet(false, true)) {
            AtomicBoolean atomicBoolean = this.closed;
            synchronized (atomicBoolean) {
                if (clearOutstandingRequests) {
                    this.requests.clear();
                }
                this.pingScheduler.shutdown();
                this.sessionPool.close();
                this.pingDeadlines.clear();
                if (!this.requests.isEmpty()) {
                    log.warn("{}: Closed websocket session to endpoint with {} outstanding requests", (Object)this.getClass().getSimpleName(), (Object)this.requests.size());
                }
            }
        }
    }

    protected void tryPublishMetrics(JsonType message, Metadata metadata2) {
        Object metric = message.toMetric();
        if (this.allowMetrics && !this.clientConfig.isDisableMetrics() && metric != null) {
            FluxCapacitor.getOptionally().ifPresentOrElse(f -> FluxCapacitor.publishMetrics(metric, metadata2), () -> this.client.getGatewayClient(MessageType.METRICS).append(Guarantee.STORED, Message.asMessage(message).addMetadata(metadata2).serialize(this.getFallbackSerializer())));
        }
    }

    protected Metadata metricsMetadata() {
        return Metadata.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Serializer getFallbackSerializer() {
        Object value = this.fallbackSerializer.get();
        if (value == null) {
            AtomicReference<Object> atomicReference = this.fallbackSerializer;
            synchronized (atomicReference) {
                value = this.fallbackSerializer.get();
                if (value == null) {
                    JacksonSerializer actualValue = new JacksonSerializer();
                    value = actualValue == null ? this.fallbackSerializer : actualValue;
                    this.fallbackSerializer.set(value);
                }
            }
        }
        return (Serializer)(value == this.fallbackSerializer ? null : value);
    }

    protected class WebSocketRequest {
        private final Request request;
        private final CompletableFuture<QueryResult> result = new CompletableFuture();
        private final Map<String, String> correlationData;
        private final DispatchInterceptor adhocMetricsInterceptor;
        private final FluxCapacitor fluxCapacitor;
        private volatile String sessionId;
        private volatile long sendTimestamp;

        protected <T extends QueryResult> CompletableFuture<T> send() {
            Session session;
            try {
                Session session2;
                Request request = this.request;
                if (request instanceof Command) {
                    Command c = (Command)request;
                    session2 = AbstractWebsocketClient.this.sessionPool.get(c.routingKey());
                } else {
                    session2 = AbstractWebsocketClient.this.sessionPool.get();
                }
                session = session2;
            }
            catch (Exception e) {
                log.error("Failed to get websocket session to send request {}", (Object)this.request, (Object)e);
                this.result.completeExceptionally(e);
                return this.result;
            }
            this.sessionId = session.getId();
            AbstractWebsocketClient.this.requests.put(this.request.getRequestId(), this);
            try {
                this.sendTimestamp = System.currentTimeMillis();
                AbstractWebsocketClient.this.send(this.request, this.correlationData, session);
            }
            catch (Exception e) {
                AbstractWebsocketClient.this.requests.remove(this.request.getRequestId());
                this.result.completeExceptionally(e);
            }
            return this.result;
        }

        @ConstructorProperties(value={"request", "correlationData", "adhocMetricsInterceptor", "fluxCapacitor"})
        public WebSocketRequest(Request request, Map<String, String> correlationData, DispatchInterceptor adhocMetricsInterceptor, FluxCapacitor fluxCapacitor) {
            this.request = request;
            this.correlationData = correlationData;
            this.adhocMetricsInterceptor = adhocMetricsInterceptor;
            this.fluxCapacitor = fluxCapacitor;
        }
    }

    protected static final class PingRegistration
    implements Registration {
        private final String id = FluxCapacitor.generateId();
        private final Registration delegate;

        @ConstructorProperties(value={"delegate"})
        public PingRegistration(Registration delegate) {
            this.delegate = delegate;
        }

        public String getId() {
            return this.id;
        }

        public Registration getDelegate() {
            return this.delegate;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PingRegistration)) {
                return false;
            }
            PingRegistration other = (PingRegistration)o;
            String this$id = this.getId();
            String other$id = other.getId();
            if (this$id == null ? other$id != null : !this$id.equals(other$id)) {
                return false;
            }
            Registration this$delegate = this.getDelegate();
            Registration other$delegate = other.getDelegate();
            return !(this$delegate == null ? other$delegate != null : !this$delegate.equals(other$delegate));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $id = this.getId();
            result = result * 59 + ($id == null ? 43 : $id.hashCode());
            Registration $delegate = this.getDelegate();
            result = result * 59 + ($delegate == null ? 43 : $delegate.hashCode());
            return result;
        }

        public String toString() {
            return "AbstractWebsocketClient.PingRegistration(id=" + this.getId() + ", delegate=" + String.valueOf(this.getDelegate()) + ")";
        }

        @Override
        public void cancel() {
            this.getDelegate().cancel();
        }

        @Override
        public Registration merge(Registration otherRegistration) {
            return this.getDelegate().merge(otherRegistration);
        }
    }
}

