/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.cassandra.impl;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.session.Session;
import io.vertx.cassandra.CassandraClient;
import io.vertx.cassandra.CassandraClientOptions;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.cassandra.impl.CassandraRowStreamImpl;
import io.vertx.cassandra.impl.ResultSetImpl;
import io.vertx.cassandra.impl.SessionHolder;
import io.vertx.cassandra.impl.tracing.QueryRequest;
import io.vertx.cassandra.impl.tracing.RequestTags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collector;

public class CassandraClientImpl
implements CassandraClient {
    public static final String HOLDERS_LOCAL_MAP_NAME = "__vertx.cassandraClient.sessionHolders";
    final VertxInternal vertx;
    private final VertxTracer tracer;
    private final String clientName;
    private final CassandraClientOptions options;
    private final Map<String, SessionHolder> holders;
    private final ContextInternal creatingContext;
    private boolean closed;

    public CassandraClientImpl(Vertx vertx, String clientName, CassandraClientOptions options) {
        Objects.requireNonNull(vertx, "vertx");
        Objects.requireNonNull(clientName, "clientName");
        Objects.requireNonNull(options, "options");
        this.vertx = (VertxInternal)vertx;
        this.tracer = ((VertxInternal)vertx).tracer();
        this.clientName = clientName;
        this.options = options;
        this.creatingContext = ((VertxInternal)vertx).getOrCreateContext();
        this.holders = vertx.sharedData().getLocalMap(HOLDERS_LOCAL_MAP_NAME);
        SessionHolder current = this.holders.compute(clientName, (k, h) -> h == null ? new SessionHolder() : h.increment());
        this.creatingContext.addCloseHook(new Closeable(){

            public void close(Promise<Void> completion) {
                CassandraClientImpl.this.close().onComplete(completion);
            }
        });
    }

    @Override
    public synchronized boolean isConnected() {
        if (this.closed) {
            return false;
        }
        CqlSession s = this.holders.get((Object)this.clientName).session;
        return s != null && !s.isClosed();
    }

    @Override
    public Future<List<Row>> executeWithFullFetch(String query) {
        return this.executeWithFullFetch((Statement)SimpleStatement.newInstance((String)query));
    }

    @Override
    public Future<List<Row>> executeWithFullFetch(Statement statement) {
        return this.execute(statement).flatMap(ResultSet::all);
    }

    @Override
    public Future<ResultSet> execute(String query) {
        return this.execute((Statement)SimpleStatement.newInstance((String)query));
    }

    @Override
    public <R> Future<R> execute(String query, Collector<Row, ?, R> collector) {
        return this.execute((Statement)SimpleStatement.newInstance((String)query), collector);
    }

    @Override
    public Future<ResultSet> execute(Statement statement) {
        return this.executeInternal(statement).map(rs -> new ResultSetImpl((AsyncResultSet)rs, (Vertx)this.vertx));
    }

    private Future<AsyncResultSet> executeInternal(Statement statement) {
        return this.getSession(this.vertx.getOrCreateContext()).flatMap(session -> {
            Object payload = this.tracer != null ? this.sendRequest((CqlSession)session, statement) : null;
            Future future = Future.fromCompletionStage((CompletionStage)session.executeAsync(statement), (Context)this.vertx.getContext());
            if (this.tracer != null) {
                future = future.onComplete(ar -> this.receiveResponse(payload, (AsyncResult<AsyncResultSet>)ar));
            }
            return future;
        });
    }

    private Object sendRequest(CqlSession session, Statement statement) {
        QueryRequest request = new QueryRequest(session, statement);
        return this.tracer.sendRequest((Context)this.vertx.getContext(), SpanKind.RPC, this.options.getTracingPolicy(), (Object)request, "Query", (k, v) -> {}, RequestTags.REQUEST_TAG_EXTRACTOR);
    }

    private void receiveResponse(Object payload, AsyncResult<AsyncResultSet> asyncResult) {
        this.tracer.receiveResponse((Context)this.vertx.getContext(), null, payload, asyncResult.cause(), TagExtractor.empty());
    }

    @Override
    public <R> Future<R> execute(Statement statement, Collector<Row, ?, R> collector) {
        return this.executeAndCollect(statement, collector);
    }

    private <C, R> Future<R> executeAndCollect(Statement statement, Collector<Row, C, R> collector) {
        Object container = collector.supplier().get();
        BiConsumer accumulator = collector.accumulator();
        Function finisher = collector.finisher();
        return this.queryStream(statement).flatMap(cassandraRowStream -> {
            Promise resultPromise = Promise.promise();
            cassandraRowStream.endHandler(end -> {
                Object result = finisher.apply(container);
                resultPromise.complete(result);
            });
            cassandraRowStream.handler(row -> accumulator.accept(container, (Row)row));
            cassandraRowStream.exceptionHandler(arg_0 -> ((Promise)resultPromise).fail(arg_0));
            return resultPromise.future();
        });
    }

    @Override
    public Future<PreparedStatement> prepare(String query) {
        return this.getSession(this.vertx.getOrCreateContext()).flatMap(session -> Future.fromCompletionStage((CompletionStage)session.prepareAsync(query), (Context)this.vertx.getContext()));
    }

    @Override
    public Future<PreparedStatement> prepare(SimpleStatement statement) {
        return this.getSession(this.vertx.getOrCreateContext()).flatMap(session -> Future.fromCompletionStage((CompletionStage)session.prepareAsync(statement), (Context)this.vertx.getContext()));
    }

    @Override
    public Future<CassandraRowStream> queryStream(String sql) {
        return this.queryStream((Statement)SimpleStatement.newInstance((String)sql));
    }

    @Override
    public Future<CassandraRowStream> queryStream(Statement statement) {
        return this.executeInternal(statement).map(rs -> {
            ResultSetImpl resultSet = new ResultSetImpl((AsyncResultSet)rs, (Vertx)this.vertx);
            CassandraRowStreamImpl stream = new CassandraRowStreamImpl((Context)this.vertx.getContext());
            stream.init(resultSet);
            return stream;
        });
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public Future<Void> close() {
        ContextInternal context = this.vertx.getOrCreateContext();
        if (!this.raiseCloseFlag()) return context.succeededFuture();
        while (true) {
            SessionHolder current = this.holders.get(this.clientName);
            SessionHolder next = current.decrement();
            if (next.refCount == 0) {
                if (!this.holders.remove(this.clientName, current)) continue;
                if (current.session == null) return context.succeededFuture();
                return Future.fromCompletionStage((CompletionStage)current.session.closeAsync(), (Context)context);
            }
            if (this.holders.replace(this.clientName, current, next)) return context.succeededFuture();
        }
    }

    @Override
    public Future<Metadata> metadata() {
        return this.getSession(this.vertx.getOrCreateContext()).map(Session::getMetadata);
    }

    private synchronized boolean raiseCloseFlag() {
        if (!this.closed) {
            this.closed = true;
            return true;
        }
        return false;
    }

    synchronized Future<CqlSession> getSession(ContextInternal context) {
        if (this.closed) {
            return context.failedFuture("Client is closed");
        }
        SessionHolder holder = this.holders.get(this.clientName);
        if (holder.session != null) {
            return context.succeededFuture((Object)holder.session);
        }
        return context.executeBlocking(this::connect);
    }

    private CqlSession connect() {
        SessionHolder current = this.holders.get(this.clientName);
        if (current == null) {
            throw new VertxException("Client closed while connecting", true);
        }
        if (current.session != null) {
            return current.session;
        }
        CqlSessionBuilder builder = this.options.dataStaxClusterBuilder();
        CqlSession session = (CqlSession)builder.build();
        current = this.holders.compute(this.clientName, (k, h) -> h == null ? null : h.connected(session));
        if (current != null) {
            return current.session;
        }
        try {
            session.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        throw new VertxException("Client closed while connecting", true);
    }
}

