/*
 * Decompiled with CFR 0.152.
 */
package io.stargate.db.dse.impl;

import com.datastax.bdp.db.util.ProductType;
import com.datastax.bdp.db.util.ProductVersion;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.stargate.auth.AuthorizationService;
import io.stargate.db.AuthenticatedUser;
import io.stargate.db.Authenticator;
import io.stargate.db.Batch;
import io.stargate.db.BoundStatement;
import io.stargate.db.ClientInfo;
import io.stargate.db.EventListener;
import io.stargate.db.PagingPosition;
import io.stargate.db.Parameters;
import io.stargate.db.Persistence;
import io.stargate.db.Result;
import io.stargate.db.SimpleStatement;
import io.stargate.db.Statement;
import io.stargate.db.datastore.common.AbstractCassandraPersistence;
import io.stargate.db.dse.impl.AuthenticatorWrapper;
import io.stargate.db.dse.impl.ClientStateWithPublicAddress;
import io.stargate.db.dse.impl.Conversion;
import io.stargate.db.dse.impl.EventListenerWrapper;
import io.stargate.db.dse.impl.FakeConnection;
import io.stargate.db.dse.impl.SchemaConverter;
import io.stargate.db.dse.impl.SimpleCallbackSchemaChangeListener;
import io.stargate.db.dse.impl.StargateQueryHandler;
import io.stargate.db.dse.impl.interceptors.DefaultQueryInterceptor;
import io.stargate.db.dse.impl.interceptors.ProxyProtocolQueryInterceptor;
import io.stargate.db.dse.impl.interceptors.QueryInterceptor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.cassandra.auth.IAuthContext;
import org.apache.cassandra.auth.RoleResource;
import org.apache.cassandra.auth.user.UserRolesAndPermissions;
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.concurrent.TPCTaskType;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.PageSize;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.exceptions.AuthenticationException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.MigrationManager;
import org.apache.cassandra.schema.SchemaChangeListener;
import org.apache.cassandra.schema.SchemaManager;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.ViewTableMetadata;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.stargate.exceptions.PersistenceException;
import org.apache.cassandra.transport.Connection;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.ServerConnection;
import org.apache.cassandra.transport.messages.BatchMessage;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.ExecuteMessage;
import org.apache.cassandra.transport.messages.PrepareMessage;
import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MD5Digest;
import org.apache.cassandra.utils.flow.RxThreads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DsePersistence
extends AbstractCassandraPersistence<Config, KeyspaceMetadata, TableMetadata, ColumnMetadata, UserType, IndexMetadata, ViewTableMetadata> {
    private static final Logger logger = LoggerFactory.getLogger(DsePersistence.class);
    public static final Boolean USE_PROXY_PROTOCOL = Boolean.parseBoolean(System.getProperty("stargate.use_proxy_protocol", "false"));
    private static final boolean USE_TRANSITIONAL_AUTH = Boolean.getBoolean("stargate.cql_use_transitional_auth");
    private static final String EXTERNAL_USER_ROLE_NAME = System.getProperty("stargate.auth.proxy.external.users.as", "cassandra");
    private static final org.apache.cassandra.auth.AuthenticatedUser EXTERNAL_AUTH_USER = new ExternalAuthenticatedUser(EXTERNAL_USER_ROLE_NAME);
    private static final int STARTUP_DELAY_MS = Integer.getInteger("stargate.startup_delay_ms", 3 * MigrationManager.MIGRATION_DELAY_IN_MS);
    private CassandraDaemon cassandraDaemon;
    private Authenticator authenticator;
    private QueryInterceptor interceptor;
    private SchemaChangeListener schemaChangeListener;
    private AtomicReference<AuthorizationService> authorizationService;

    public DsePersistence() {
        super("DataStax Enterprise");
    }

    private StargateQueryHandler stargateHandler() {
        return (StargateQueryHandler)ClientState.getCQLQueryHandler();
    }

    protected SchemaConverter newSchemaConverter() {
        return new SchemaConverter();
    }

    protected Iterable<KeyspaceMetadata> currentInternalSchema() {
        return Iterables.transform((Iterable)Keyspace.all(), Keyspace::getMetadata);
    }

    protected void registerInternalSchemaListener(final Runnable runOnSchemaChange) {
        this.schemaChangeListener = new SimpleCallbackSchemaChangeListener(){

            @Override
            void onSchemaChange() {
                runOnSchemaChange.run();
            }
        };
        SchemaManager.instance.registerListener(this.schemaChangeListener);
    }

    protected void unregisterInternalSchemaListener() {
        if (this.schemaChangeListener != null) {
            SchemaManager.instance.unregisterListener(this.schemaChangeListener);
        }
    }

    protected void initializePersistence(Config config) {
        System.setProperty("cassandra.custom_query_handler_class", StargateQueryHandler.class.getName());
        System.setProperty("cassandra-foreground", "true");
        System.setProperty("cassandra.consistent.rangemovement", "false");
        if (Boolean.parseBoolean(System.getProperty("stargate.bind_to_listen_address"))) {
            System.setProperty("com.sun.management.jmxremote.host", System.getProperty("stargate.listen_address"));
        }
        DatabaseDescriptor.daemonInitialization((boolean)true, (Config)config);
        this.cassandraDaemon = new CassandraDaemon(true);
        AtomicReference throwableFromMainThread = new AtomicReference();
        Thread.setDefaultUncaughtExceptionHandler((thread, t) -> {
            if (thread.getName().equals("DSE main thread")) {
                throwableFromMainThread.set(t);
            }
        });
        this.cassandraDaemon.activate(false);
        Throwable t2 = (Throwable)throwableFromMainThread.get();
        if (t2 != null) {
            throw new RuntimeException("Unable to start DSE persistence layer", t2);
        }
        Gossiper.instance.addLocalApplicationState(ApplicationState.X10, StorageService.instance.valueFactory.dsefsState("stargate"));
        this.waitForSchema(STARTUP_DELAY_MS);
        this.interceptor = new DefaultQueryInterceptor();
        if (USE_PROXY_PROTOCOL.booleanValue()) {
            this.interceptor = new ProxyProtocolQueryInterceptor(this.interceptor);
        }
        this.interceptor.initialize();
        this.stargateHandler().register(this.interceptor);
        this.stargateHandler().setAuthorizationService(this.authorizationService);
        this.authenticator = new AuthenticatorWrapper(DatabaseDescriptor.getAuthenticator());
    }

    protected void destroyPersistence() {
        if (this.cassandraDaemon != null) {
            this.cassandraDaemon.deactivate();
            this.cassandraDaemon = null;
        }
    }

    public void registerEventListener(EventListener listener) {
        SchemaManager.instance.registerListener((SchemaChangeListener)new EventListenerWrapper(listener));
        this.interceptor.register(listener);
    }

    public Authenticator getAuthenticator() {
        return this.authenticator;
    }

    public void setRpcReady(boolean status) {
        StorageService.instance.setNativeTransportReady(status);
    }

    public Persistence.Connection newConnection(ClientInfo clientInfo) {
        return new DseConnection(clientInfo);
    }

    public Persistence.Connection newConnection() {
        return new DseConnection();
    }

    public ByteBuffer unsetValue() {
        return ByteBufferUtil.UNSET_BYTE_BUFFER;
    }

    private static boolean shouldCheckSchema(InetAddress ep) {
        EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep);
        return epState != null && !Gossiper.instance.isDeadState(epState);
    }

    private static boolean isStorageNode(InetAddress ep) {
        return !Gossiper.instance.isGossipOnlyMember(ep);
    }

    public boolean isInSchemaAgreement() {
        return Gossiper.instance.getLiveMembers().stream().filter(DsePersistence::shouldCheckSchema).map(arg_0 -> ((Gossiper)Gossiper.instance).getSchemaVersion(arg_0)).distinct().count() <= 1L;
    }

    public boolean isInSchemaAgreementWithStorage() {
        InetAddress localAddress = FBUtilities.getBroadcastAddress();
        return Gossiper.instance.getLiveMembers().stream().filter(DsePersistence::shouldCheckSchema).filter(ep -> DsePersistence.isStorageNode(ep) || localAddress.equals(ep)).map(arg_0 -> ((Gossiper)Gossiper.instance).getSchemaVersion(arg_0)).distinct().count() <= 1L;
    }

    @VisibleForTesting
    boolean isStorageInSchemaAgreement() {
        return Gossiper.instance.getLiveMembers().stream().filter(DsePersistence::shouldCheckSchema).filter(DsePersistence::isStorageNode).map(arg_0 -> ((Gossiper)Gossiper.instance).getSchemaVersion(arg_0)).distinct().count() <= 1L;
    }

    public boolean isSchemaAgreementAchievable() {
        try {
            if (this.isInSchemaAgreement()) {
                return true;
            }
            if (!this.isStorageInSchemaAgreement()) {
                logger.debug("Storage nodes are not in schema agreement");
                return true;
            }
            Set outstandingRequests = PullRequestGetter.nonCompletedPullRequests();
            if (!outstandingRequests.isEmpty()) {
                return true;
            }
            logger.error("Schema agreement is not achievable");
            return false;
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    public boolean supportsSAI() {
        return true;
    }

    public Map<String, List<String>> cqlSupportedOptions() {
        List pageUnits = Arrays.stream(PageSize.PageUnit.values()).map(Enum::toString).collect(Collectors.toList());
        return ImmutableMap.builder().put((Object)"PAGE_UNIT", pageUnits).put((Object)"SERVER_VERSION", Collections.singletonList(ProductVersion.getDSEVersionString())).put((Object)"PRODUCT_TYPE", Collections.singletonList(ProductType.product.name())).put((Object)"EMULATE_DBAAS_DEFAULTS", Collections.singletonList(String.valueOf(DatabaseDescriptor.isEmulateDbaasDefaults()))).put((Object)"CQL_VERSION", (Object)ImmutableList.of((Object)QueryProcessor.CQL_VERSION.toString())).build();
    }

    public void executeAuthResponse(Runnable handler) {
        RxThreads.subscribeOnIo((Completable)Completable.fromRunnable((Runnable)handler), (TPCTaskType)TPCTaskType.AUTHENTICATION).subscribe();
    }

    private void waitForSchema(int delayMillis) {
        boolean isConnectedAndInAgreement = false;
        for (int i = 0; i < delayMillis; i += 1000) {
            if (Gossiper.instance.getLiveTokenOwners().size() > 0 && this.isInSchemaAgreement()) {
                logger.debug("current schema version: {}", (Object)SchemaManager.instance.getVersion());
                isConnectedAndInAgreement = true;
                break;
            }
            Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
        }
        if (!isConnectedAndInAgreement) {
            logger.warn("Unable to connect to live token owner and/or reach schema agreement after {} milliseconds", (Object)delayMillis);
        }
    }

    private static ClientState clientStateForExternalCalls(@Nonnull ClientInfo clientInfo) {
        if (clientInfo.publicAddress().isPresent()) {
            return new ClientStateWithPublicAddress(clientInfo.remoteAddress(), (InetSocketAddress)clientInfo.publicAddress().get());
        }
        return ClientState.forExternalCalls((SocketAddress)clientInfo.remoteAddress(), null);
    }

    public void setAuthorizationService(AtomicReference<AuthorizationService> authorizationService) {
        this.authorizationService = authorizationService;
    }

    private static class ExternalAuthenticatedUser
    extends org.apache.cassandra.auth.AuthenticatedUser {
        public ExternalAuthenticatedUser(String roleName) {
            super(RoleResource.role((String)roleName), true, true, null);
        }
    }

    private static class PullRequestGetter {
        private static final Method nonCompletedPullRequestsMethod;
        private static final Object scheduler;

        private PullRequestGetter() {
        }

        private static Set<?> nonCompletedPullRequests() {
            try {
                return (Set)nonCompletedPullRequestsMethod.invoke(scheduler, new Object[0]);
            }
            catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }

        static {
            try {
                Field schedulerField = MigrationManager.class.getDeclaredField("scheduler");
                schedulerField.setAccessible(true);
                scheduler = schedulerField.get(MigrationManager.instance);
                nonCompletedPullRequestsMethod = scheduler.getClass().getDeclaredMethod("nonCompletedPullRequest", new Class[0]);
                nonCompletedPullRequestsMethod.setAccessible(true);
            }
            catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
    }

    private class DseConnection
    extends AbstractCassandraPersistence.AbstractConnection {
        private final ClientState clientState;
        private final ServerConnection fakeServerConnection;

        private DseConnection(ClientInfo clientInfo) {
            this(clientInfo, DsePersistence.clientStateForExternalCalls(clientInfo));
        }

        private DseConnection() {
            this(null, ClientState.forInternalCalls());
        }

        private DseConnection(ClientInfo clientInfo, ClientState clientState) {
            super(clientInfo);
            this.clientState = clientState;
            this.fakeServerConnection = new FakeConnection(clientInfo == null ? null : clientInfo.remoteAddress(), ProtocolVersion.CURRENT);
            if (!DsePersistence.this.authenticator.requireAuthentication()) {
                clientState.login(org.apache.cassandra.auth.AuthenticatedUser.ANONYMOUS_USER);
            }
        }

        public Persistence persistence() {
            return DsePersistence.this;
        }

        protected void loginInternally(AuthenticatedUser user) {
            try {
                Single loginSingle = user.isFromExternalAuth() && USE_TRANSITIONAL_AUTH ? this.clientState.login(EXTERNAL_AUTH_USER) : this.clientState.login(new org.apache.cassandra.auth.AuthenticatedUser(user.name()));
                ClientState clientState = (ClientState)loginSingle.blockingGet();
            }
            catch (AuthenticationException e) {
                throw new org.apache.cassandra.stargate.exceptions.AuthenticationException((Throwable)e);
            }
        }

        public Optional<String> usedKeyspace() {
            return Optional.ofNullable(this.clientState.getRawKeyspace());
        }

        private Single<QueryState> newQueryState() {
            if (this.clientState.getUser() == null) {
                return Single.just((Object)new QueryState(ClientState.forExternalCalls((org.apache.cassandra.auth.AuthenticatedUser)org.apache.cassandra.auth.AuthenticatedUser.ANONYMOUS_USER), UserRolesAndPermissions.SYSTEM));
            }
            if (this.clientState.getUser().isAnonymous()) {
                return Single.just((Object)new QueryState(this.clientState, UserRolesAndPermissions.SYSTEM));
            }
            return DatabaseDescriptor.getAuthManager().getUserRolesAndPermissions(this.clientState.getUser().getName(), this.clientState.getUser().getName(), IAuthContext.ANY).map(u -> new QueryState(this.clientState, u));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private <T extends Result> CompletableFuture<T> executeRequest(Parameters parameters, long queryStartNanoTime, Supplier<Message.Request> requestSupplier) {
            try {
                ExecutorLocals.set(null);
                if (parameters.protocolVersion().isGreaterOrEqualTo(org.apache.cassandra.stargate.transport.ProtocolVersion.V4)) {
                    ClientWarn.instance.captureWarnings();
                }
                Single<QueryState> queryState = this.newQueryState();
                Message.Request request = requestSupplier.get();
                if (parameters.tracingRequested()) {
                    request.setTracingRequested();
                }
                request.setCustomPayload((Map)parameters.customPayload().orElse(null));
                request.attach((Connection)this.fakeServerConnection);
                CompletableFuture future = new CompletableFuture();
                Disposable unused = request.execute(queryState, queryStartNanoTime).map(response -> {
                    try {
                        Result result;
                        if (response instanceof ErrorMessage) {
                            throw this.convertExceptionWithWarnings((Throwable)((ErrorMessage)response).error);
                        }
                        Result result2 = result = Conversion.toResult((ResultMessage)response, Conversion.toInternal(parameters.protocolVersion()), ClientWarn.instance.getAndClearWarnings());
                        return result2;
                    }
                    finally {
                        ClientWarn.instance.resetWarnings();
                    }
                }).subscribe(future::complete, ex -> {
                    if (!(ex instanceof PersistenceException)) {
                        ex = this.convertExceptionWithWarnings((Throwable)ex);
                    }
                    future.completeExceptionally((Throwable)ex);
                });
                CompletableFuture completableFuture = future;
                return completableFuture;
            }
            catch (Exception e) {
                CompletableFuture exceptionalFuture = new CompletableFuture();
                exceptionalFuture.completeExceptionally((Throwable)this.convertExceptionWithWarnings(e));
                CompletableFuture completableFuture = exceptionalFuture;
                return completableFuture;
            }
            finally {
                ClientWarn.instance.resetWarnings();
            }
        }

        private PersistenceException convertExceptionWithWarnings(Throwable t) {
            PersistenceException pe = Conversion.convertInternalException(t);
            pe.setWarnings(ClientWarn.instance.getAndClearWarnings());
            return pe;
        }

        public CompletableFuture<Result> execute(Statement statement, Parameters parameters, long queryStartNanoTime) {
            return this.executeRequest(parameters, queryStartNanoTime, () -> {
                QueryOptions options = Conversion.toInternal(statement.values(), statement.boundNames().orElse(null), parameters);
                if (statement instanceof SimpleStatement) {
                    String queryString = ((SimpleStatement)statement).queryString();
                    return new QueryMessage(queryString, options);
                }
                MD5Digest id = Conversion.toInternal(((BoundStatement)statement).preparedId());
                return new ExecuteMessage(id, null, options);
            });
        }

        public CompletableFuture<Result.Prepared> prepare(String query, Parameters parameters) {
            return this.executeRequest(parameters, System.nanoTime(), () -> new PrepareMessage(query, (String)parameters.defaultKeyspace().orElse(null)));
        }

        public CompletableFuture<Result> batch(Batch batch, Parameters parameters, long queryStartNanoTime) {
            return this.executeRequest(parameters, queryStartNanoTime, () -> {
                QueryOptions options = Conversion.toInternal(Collections.emptyList(), null, parameters);
                BatchStatement.Type internalBatchType = Conversion.toInternal(batch.type());
                ArrayList<Object> queryOrIdList = new ArrayList<Object>(batch.size());
                ArrayList<List> allValues = new ArrayList<List>(batch.size());
                for (Statement statement : batch.statements()) {
                    queryOrIdList.add(this.queryOrId(statement));
                    allValues.add(statement.values());
                }
                return new BatchMessage(internalBatchType, queryOrIdList, allValues, options);
            });
        }

        public ByteBuffer makePagingState(PagingPosition pos, Parameters parameters) {
            TableMetadata table = SchemaManager.instance.validateTable(pos.tableName().keyspace(), pos.tableName().name());
            return Conversion.toPagingState(table, pos, parameters);
        }

        private Object queryOrId(Statement statement) {
            if (statement instanceof SimpleStatement) {
                return ((SimpleStatement)statement).queryString();
            }
            return Conversion.toInternal(((BoundStatement)statement).preparedId());
        }
    }
}

