/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.client.impl;

import io.deephaven.client.impl.ApplicationService;
import io.deephaven.client.impl.Authentication;
import io.deephaven.client.impl.BearerHandler;
import io.deephaven.client.impl.ClientData;
import io.deephaven.client.impl.ConsoleSession;
import io.deephaven.client.impl.CustomObject;
import io.deephaven.client.impl.ExportId;
import io.deephaven.client.impl.ExportService;
import io.deephaven.client.impl.ExportStates;
import io.deephaven.client.impl.ExportTicketCreator;
import io.deephaven.client.impl.FieldChanges;
import io.deephaven.client.impl.HasTicketId;
import io.deephaven.client.impl.HasTypedTicket;
import io.deephaven.client.impl.ObjectService;
import io.deephaven.client.impl.ServerData;
import io.deephaven.client.impl.ServerObject;
import io.deephaven.client.impl.SessionBase;
import io.deephaven.client.impl.SessionImplConfig;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandleManager;
import io.deephaven.client.impl.TableHandleManagerBatch;
import io.deephaven.client.impl.TableHandleManagerDelegate;
import io.deephaven.client.impl.TableHandleManagerSerial;
import io.deephaven.client.impl.TableService;
import io.deephaven.client.impl.TableServiceAsyncImpl;
import io.deephaven.client.impl.TypedTicket;
import io.deephaven.client.impl.UnaryGrpcFuture;
import io.deephaven.client.impl.script.Changes;
import io.deephaven.proto.DeephavenChannel;
import io.deephaven.proto.backplane.grpc.AddTableRequest;
import io.deephaven.proto.backplane.grpc.AuthenticationConstantsRequest;
import io.deephaven.proto.backplane.grpc.AuthenticationConstantsResponse;
import io.deephaven.proto.backplane.grpc.ConfigServiceGrpc;
import io.deephaven.proto.backplane.grpc.ConfigValue;
import io.deephaven.proto.backplane.grpc.ConfigurationConstantsRequest;
import io.deephaven.proto.backplane.grpc.ConfigurationConstantsResponse;
import io.deephaven.proto.backplane.grpc.ConnectRequest;
import io.deephaven.proto.backplane.grpc.DeleteTableRequest;
import io.deephaven.proto.backplane.grpc.ExportRequest;
import io.deephaven.proto.backplane.grpc.FieldsChangeUpdate;
import io.deephaven.proto.backplane.grpc.HandshakeRequest;
import io.deephaven.proto.backplane.grpc.InputTableServiceGrpc;
import io.deephaven.proto.backplane.grpc.ListFieldsRequest;
import io.deephaven.proto.backplane.grpc.PublishRequest;
import io.deephaven.proto.backplane.grpc.ReleaseRequest;
import io.deephaven.proto.backplane.grpc.SessionServiceGrpc;
import io.deephaven.proto.backplane.grpc.StreamRequest;
import io.deephaven.proto.backplane.grpc.StreamResponse;
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.proto.backplane.script.grpc.BindTableToVariableRequest;
import io.deephaven.proto.backplane.script.grpc.ConsoleServiceGrpc;
import io.deephaven.proto.backplane.script.grpc.ExecuteCommandRequest;
import io.deephaven.proto.backplane.script.grpc.StartConsoleRequest;
import io.deephaven.qst.table.TableSpec;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.lang.model.SourceVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SessionImpl
extends SessionBase {
    private static final Logger log = LoggerFactory.getLogger(SessionImpl.class);
    private final SessionImplConfig config;
    private final DeephavenChannel bearerChannel;
    private final BearerHandler bearerHandler;
    private final ExportTicketCreator exportTicketCreator;
    private final ScheduledFuture<?> pingJob;
    private CompletableFuture<Void> closeFuture;

    public static SessionImpl create(SessionImplConfig config) throws InterruptedException {
        Authentication authentication = Authentication.authenticate(config.channel(), config.authenticationTypeAndValue());
        authentication.awaitOrCancel();
        return SessionImpl.create(config, authentication);
    }

    public static SessionImpl create(SessionImplConfig config, Authentication authentication) {
        authentication.throwOnError();
        DeephavenChannel bearerChannel = authentication.bearerChannel().orElseThrow(IllegalStateException::new);
        ConfigurationConstantsResponse response = authentication.configurationConstants().orElseThrow(IllegalStateException::new);
        Optional<Duration> httpSessionDuration = SessionImpl.parseHttpSessionDuration(response);
        if (!httpSessionDuration.isPresent()) {
            log.warn("Server did not return an 'http.session.durationMs', defaulting to pinging the server every minute.");
        }
        Duration pingFrequency = httpSessionDuration.map(d -> d.dividedBy(3L)).orElse(Duration.ofMinutes(1L));
        return new SessionImpl(config, bearerChannel, pingFrequency, authentication.bearerHandler());
    }

    private static Optional<Duration> parseHttpSessionDuration(ConfigurationConstantsResponse response) {
        return SessionImpl.getHttpSessionDurationMs(response).map(SessionImpl::stringValue).flatMap(SessionImpl::parseMillis);
    }

    private static String stringValue(ConfigValue value) {
        if (!value.hasStringValue()) {
            throw new IllegalArgumentException("Expected string value");
        }
        return value.getStringValue();
    }

    private static Optional<ConfigValue> getHttpSessionDurationMs(ConfigurationConstantsResponse response) {
        return Optional.ofNullable((ConfigValue)response.getConfigValuesMap().get("http.session.durationMs"));
    }

    private static Optional<Duration> parseMillis(String x) {
        try {
            return Optional.of(Duration.ofMillis(Long.parseLong(x)));
        }
        catch (NumberFormatException e) {
            return Optional.empty();
        }
    }

    private SessionImpl(SessionImplConfig config, DeephavenChannel bearerChannel, Duration pingFrequency, BearerHandler bearerHandler) {
        this.config = Objects.requireNonNull(config);
        this.bearerChannel = Objects.requireNonNull(bearerChannel);
        this.bearerHandler = Objects.requireNonNull(bearerHandler);
        this.exportTicketCreator = new ExportTicketCreator();
        this.pingJob = config.executor().scheduleAtFixedRate(() -> bearerChannel.config().getConfigurationConstants(ConfigurationConstantsRequest.getDefaultInstance(), (StreamObserver)PingObserverNoOp.INSTANCE), pingFrequency.toNanos(), pingFrequency.toNanos(), TimeUnit.NANOSECONDS);
    }

    public BearerHandler _hackBearerHandler() {
        return this.bearerHandler;
    }

    private ExportStates newExportStates() {
        return new ExportStates(this, this.bearerChannel.session(), this.bearerChannel.table(), this.exportTicketCreator);
    }

    @Override
    public TableService newStatefulTableService() {
        return new TableServiceImpl(this.newExportStates());
    }

    @Override
    public TableHandleManager batch() {
        return this.batch(this.config.mixinStacktrace());
    }

    @Override
    public TableHandleManager batch(boolean mixinStacktraces) {
        return new TableHandleManagerBatch(mixinStacktraces){

            @Override
            protected ExportService exportService() {
                return SessionImpl.this.newExportStates();
            }
        };
    }

    @Override
    public TableHandleManager serial() {
        return new TableHandleManagerSerial(){

            @Override
            protected ExportService exportService() {
                return SessionImpl.this.newExportStates();
            }

            @Override
            protected TableHandle handle(TableSpec table) {
                return io.deephaven.client.impl.TableServiceImpl.executeUnchecked(this.exportService(), table, null);
            }
        };
    }

    @Override
    public CompletableFuture<? extends ConsoleSession> console(String type) {
        ExportId consoleId = new ExportId("Console", this.exportTicketCreator.createExportId());
        StartConsoleRequest request = StartConsoleRequest.newBuilder().setSessionType(type).setResultId(consoleId.ticketId().proto()).build();
        return UnaryGrpcFuture.of(request, (arg_0, arg_1) -> ((ConsoleServiceGrpc.ConsoleServiceStub)this.channel().console()).startConsole(arg_0, arg_1), response -> new ConsoleSessionImpl(request));
    }

    @Override
    public CompletableFuture<Void> publish(String name, HasTicketId ticketId) {
        if (!SourceVersion.isName(name)) {
            throw new IllegalArgumentException("Invalid name");
        }
        BindTableToVariableRequest request = BindTableToVariableRequest.newBuilder().setVariableName(name).setTableId(ticketId.ticketId().proto()).build();
        return UnaryGrpcFuture.ignoreResponse(request, (arg_0, arg_1) -> ((ConsoleServiceGrpc.ConsoleServiceStub)this.channel().console()).bindTableToVariable(arg_0, arg_1));
    }

    @Override
    public CompletableFuture<Void> publish(HasTicketId resultId, HasTicketId sourceId) {
        PublishRequest request = PublishRequest.newBuilder().setSourceId(sourceId.ticketId().proto()).setResultId(resultId.ticketId().proto()).build();
        return UnaryGrpcFuture.ignoreResponse(request, (arg_0, arg_1) -> ((SessionServiceGrpc.SessionServiceStub)this.channel().session()).publishFromTicket(arg_0, arg_1));
    }

    @Override
    public CompletableFuture<ServerData> fetch(HasTypedTicket typedTicket) {
        TypedTicket tt = typedTicket.typedTicket();
        if (!tt.type().isPresent()) {
            throw new IllegalArgumentException("Type must be present to fetch an object");
        }
        StreamRequest connectRequest = StreamRequest.newBuilder().setConnect(ConnectRequest.newBuilder().setSourceId(tt.proto())).build();
        return UnaryGrpcFuture.of(connectRequest, this::messageStreamConnectOnly, this::toDataAndExports);
    }

    private void messageStreamConnectOnly(StreamRequest request, StreamObserver<StreamResponse> responseObserver) {
        StreamObserver observer = this.channel().object().messageStream(responseObserver);
        observer.onNext((Object)request);
        observer.onCompleted();
    }

    private ServerData toDataAndExports(StreamResponse value) {
        switch (value.getMessageCase()) {
            case DATA: {
                return ServerData.of(this, value.getData());
            }
        }
        throw new IllegalStateException(String.format("Unexpected stream response message type, %s", value.getMessageCase()));
    }

    @Override
    public ObjectService.MessageStream<ClientData> connect(HasTypedTicket typedTicket, ObjectService.MessageStream<ServerData> receiveStream) {
        TypedTicket tt = typedTicket.typedTicket();
        if (!tt.type().isPresent()) {
            throw new IllegalArgumentException("Type must be present to open messageStream with an object");
        }
        StreamRequest connectRequest = StreamRequest.newBuilder().setConnect(ConnectRequest.newBuilder().setSourceId(tt.proto()).build()).build();
        StreamObserver serverObserver = this.channel().object().messageStream((StreamObserver)new MessageStreamObserver(receiveStream));
        serverObserver.onNext((Object)connectRequest);
        return new MessageStreamImpl((StreamObserver<StreamRequest>)serverObserver);
    }

    @Override
    public CompletableFuture<? extends ServerObject> export(HasTypedTicket typedTicket) {
        return this.export(typedTicket, this::toServerObject);
    }

    @Override
    public CompletableFuture<? extends ObjectService.Fetchable> fetchable(HasTypedTicket typedTicket) {
        return this.export(typedTicket, this::toFetchableObject);
    }

    @Override
    public CompletableFuture<? extends ObjectService.Bidirectional> bidirectional(HasTypedTicket typedTicket) {
        return this.export(typedTicket, this::toBidirectionalObject);
    }

    private <T extends ServerObject> CompletableFuture<T> export(HasTypedTicket typedTicket, Function<ExportId, T> f) {
        TypedTicket tt = typedTicket.typedTicket();
        ExportId exportId = this.newExportId(tt.type().orElse(null));
        ServerObject serverObject = (ServerObject)f.apply(exportId);
        ExportRequest exportRequest = ExportRequest.newBuilder().setSourceId(tt.ticketId().proto()).setResultId(exportId.ticketId().proto()).build();
        return UnaryGrpcFuture.ignoreResponse(exportRequest, (arg_0, arg_1) -> ((SessionServiceGrpc.SessionServiceStub)this.channel().session()).exportFromTicket(arg_0, arg_1)).thenApply(x -> serverObject);
    }

    private ServerObject toServerObject(ExportId exportId) {
        return exportId.toServerObject(this);
    }

    private ObjectService.Fetchable toFetchableObject(ExportId exportId) {
        return new CustomObject(this, exportId);
    }

    private ObjectService.Bidirectional toBidirectionalObject(ExportId exportId) {
        return new CustomObject(this, exportId);
    }

    @Override
    public void close() {
        try {
            this.closeFuture().get(this.config.closeTimeout().toNanos(), TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("Interrupted waiting for session close");
        }
        catch (TimeoutException e) {
            log.warn("Timed out waiting for session close");
        }
        catch (ExecutionException e) {
            log.error("Exception waiting for session close", (Throwable)e);
        }
        catch (CancellationException e) {
            log.warn("Close cancelled", (Throwable)e);
        }
    }

    @Override
    public synchronized CompletableFuture<Void> closeFuture() {
        if (this.closeFuture == null) {
            this.pingJob.cancel(false);
            HandshakeRequest handshakeRequest = HandshakeRequest.getDefaultInstance();
            this.closeFuture = UnaryGrpcFuture.ignoreResponse(handshakeRequest, (arg_0, arg_1) -> ((SessionServiceGrpc.SessionServiceStub)this.channel().session()).closeSession(arg_0, arg_1));
        }
        return this.closeFuture;
    }

    @Override
    protected TableService delegate() {
        return this.newStatefulTableService();
    }

    @Override
    public ExportId newExportId() {
        return this.newExportId("Table");
    }

    private ExportId newExportId(String type) {
        return new ExportId(type, this.exportTicketCreator.createExportId());
    }

    @Override
    public CompletableFuture<Void> release(ExportId exportId) {
        ReleaseRequest request = ReleaseRequest.newBuilder().setId(exportId.ticketId().proto()).build();
        return UnaryGrpcFuture.ignoreResponse(request, (arg_0, arg_1) -> ((SessionServiceGrpc.SessionServiceStub)this.channel().session()).release(arg_0, arg_1));
    }

    @Override
    public DeephavenChannel channel() {
        return this.bearerChannel;
    }

    @Override
    public CompletableFuture<Void> addToInputTable(HasTicketId destination, HasTicketId source) {
        AddTableRequest request = AddTableRequest.newBuilder().setInputTable(destination.ticketId().proto()).setTableToAdd(source.ticketId().proto()).build();
        return UnaryGrpcFuture.ignoreResponse(request, (arg_0, arg_1) -> ((InputTableServiceGrpc.InputTableServiceStub)this.channel().inputTable()).addTableToInputTable(arg_0, arg_1));
    }

    @Override
    public CompletableFuture<Void> deleteFromInputTable(HasTicketId destination, HasTicketId source) {
        DeleteTableRequest request = DeleteTableRequest.newBuilder().setInputTable(destination.ticketId().proto()).setTableToRemove(source.ticketId().proto()).build();
        return UnaryGrpcFuture.ignoreResponse(request, (arg_0, arg_1) -> ((InputTableServiceGrpc.InputTableServiceStub)this.channel().inputTable()).deleteTableFromInputTable(arg_0, arg_1));
    }

    @Override
    public ApplicationService.Cancel subscribeToFields(ApplicationService.Listener listener) {
        ListFieldsRequest request = ListFieldsRequest.newBuilder().build();
        ListFieldsObserver observer = new ListFieldsObserver(listener);
        this.bearerChannel.application().listFields(request, (StreamObserver)observer);
        return observer;
    }

    ScheduledExecutorService executor() {
        return this.config.executor();
    }

    @Override
    public CompletableFuture<Map<String, ConfigValue>> getAuthenticationConstants() {
        return UnaryGrpcFuture.of(AuthenticationConstantsRequest.getDefaultInstance(), (arg_0, arg_1) -> ((ConfigServiceGrpc.ConfigServiceStub)this.channel().config()).getAuthenticationConstants(arg_0, arg_1), AuthenticationConstantsResponse::getConfigValuesMap);
    }

    @Override
    public CompletableFuture<Map<String, ConfigValue>> getConfigurationConstants() {
        return UnaryGrpcFuture.of(ConfigurationConstantsRequest.getDefaultInstance(), (arg_0, arg_1) -> ((ConfigServiceGrpc.ConfigServiceStub)this.channel().config()).getConfigurationConstants(arg_0, arg_1), ConfigurationConstantsResponse::getConfigValuesMap);
    }

    private class TableServiceImpl
    extends TableHandleManagerDelegate
    implements TableService {
        private final ExportStates exportStates;

        TableServiceImpl(ExportStates exportStates) {
            this.exportStates = Objects.requireNonNull(exportStates);
        }

        @Override
        public TableService.TableHandleFuture executeAsync(TableSpec table) {
            return TableServiceAsyncImpl.executeAsync((ExportService)this.exportStates, table);
        }

        @Override
        public List<? extends TableService.TableHandleFuture> executeAsync(Iterable<? extends TableSpec> tables) {
            return TableServiceAsyncImpl.executeAsync((ExportService)this.exportStates, tables);
        }

        @Override
        protected TableHandleManager delegate() {
            return SessionImpl.this.config.delegateToBatch() ? this.batch() : this.serial();
        }

        @Override
        public TableHandleManager batch() {
            return this.batch(SessionImpl.this.config.mixinStacktrace());
        }

        @Override
        public TableHandleManager batch(boolean mixinStacktrace) {
            return new TableHandleManagerBatch(mixinStacktrace){

                @Override
                protected ExportService exportService() {
                    return TableServiceImpl.this.exportStates;
                }
            };
        }

        @Override
        public TableHandleManager serial() {
            return new TableHandleManagerSerial(){

                @Override
                protected ExportService exportService() {
                    return TableServiceImpl.this.exportStates;
                }

                @Override
                protected TableHandle handle(TableSpec table) {
                    return io.deephaven.client.impl.TableServiceImpl.executeUnchecked(this.exportService(), table, null);
                }
            };
        }
    }

    private static class MessageStreamImpl
    implements ObjectService.MessageStream<ClientData> {
        private final StreamObserver<StreamRequest> serverObserver;

        public MessageStreamImpl(StreamObserver<StreamRequest> serverObserver) {
            this.serverObserver = Objects.requireNonNull(serverObserver);
        }

        @Override
        public void onData(ClientData message) {
            this.serverObserver.onNext((Object)StreamRequest.newBuilder().setData(message.proto()).build());
        }

        @Override
        public void onClose() {
            this.serverObserver.onCompleted();
        }
    }

    private class MessageStreamObserver
    implements StreamObserver<StreamResponse> {
        private final ObjectService.MessageStream<ServerData> clientStream;

        public MessageStreamObserver(ObjectService.MessageStream<ServerData> clientStream) {
            this.clientStream = Objects.requireNonNull(clientStream);
        }

        public void onNext(StreamResponse value) {
            this.clientStream.onData(SessionImpl.this.toDataAndExports(value));
        }

        public void onError(Throwable t) {
            this.clientStream.onClose();
        }

        public void onCompleted() {
            this.clientStream.onClose();
        }
    }

    private static enum PingObserverNoOp implements StreamObserver<ConfigurationConstantsResponse>
    {
        INSTANCE;


        public void onNext(ConfigurationConstantsResponse value) {
        }

        public void onError(Throwable t) {
        }

        public void onCompleted() {
        }
    }

    private static class ListFieldsObserver
    implements ApplicationService.Cancel,
    ClientResponseObserver<ListFieldsRequest, FieldsChangeUpdate> {
        private final ApplicationService.Listener listener;
        private ClientCallStreamObserver<?> stream;

        public ListFieldsObserver(ApplicationService.Listener listener) {
            this.listener = Objects.requireNonNull(listener);
        }

        @Override
        public void cancel() {
            this.stream.cancel("User cancelled", null);
        }

        public void beforeStart(ClientCallStreamObserver<ListFieldsRequest> requestStream) {
            this.stream = requestStream;
        }

        public void onNext(FieldsChangeUpdate value) {
            this.listener.onNext(new FieldChanges(value));
        }

        public void onError(Throwable t) {
            this.listener.onError(t);
        }

        public void onCompleted() {
            this.listener.onCompleted();
        }
    }

    private class ConsoleSessionImpl
    implements ConsoleSession {
        private final StartConsoleRequest request;

        public ConsoleSessionImpl(StartConsoleRequest request) {
            this.request = Objects.requireNonNull(request);
        }

        @Override
        public String type() {
            return this.request.getSessionType();
        }

        @Override
        public Ticket ticket() {
            return this.request.getResultId();
        }

        @Override
        public Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
            return this.executeCodeFuture(code).get(SessionImpl.this.config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
        }

        @Override
        public Changes executeScript(Path path) throws IOException, InterruptedException, ExecutionException, TimeoutException {
            return this.executeScriptFuture(path).get(SessionImpl.this.config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
        }

        @Override
        public CompletableFuture<Changes> executeCodeFuture(String code) {
            ExecuteCommandRequest request = ExecuteCommandRequest.newBuilder().setConsoleId(this.ticket()).setCode(code).build();
            return UnaryGrpcFuture.of(request, (arg_0, arg_1) -> ((ConsoleServiceGrpc.ConsoleServiceStub)SessionImpl.this.channel().console()).executeCommand(arg_0, arg_1), response -> {
                Changes.Builder builder = Changes.builder().changes(new FieldChanges(response.getChanges()));
                if (!response.getErrorMessage().isEmpty()) {
                    builder.errorMessage(response.getErrorMessage());
                }
                return builder.build();
            });
        }

        @Override
        public CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException {
            String code = String.join((CharSequence)System.lineSeparator(), Files.readAllLines(path, StandardCharsets.UTF_8));
            return this.executeCodeFuture(code);
        }

        @Override
        public CompletableFuture<Void> closeFuture() {
            ReleaseRequest request = ReleaseRequest.newBuilder().setId(this.request.getResultId()).build();
            return UnaryGrpcFuture.ignoreResponse(request, (arg_0, arg_1) -> ((SessionServiceGrpc.SessionServiceStub)SessionImpl.this.channel().session()).release(arg_0, arg_1));
        }

        @Override
        public void close() {
            try {
                this.closeFuture().get(SessionImpl.this.config.closeTimeout().toNanos(), TimeUnit.NANOSECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("Interrupted waiting for console close");
            }
            catch (TimeoutException e) {
                log.warn("Timed out waiting for console close");
            }
            catch (ExecutionException e) {
                log.error("Exception waiting for console close", (Throwable)e);
            }
        }
    }
}

