/*
 * Decompiled with CFR 0.152.
 */
package com.exactpro.cradle.cassandra.dao.messages;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.mapper.MapperContext;
import com.datastax.oss.driver.api.mapper.entity.EntityHelper;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.relation.ColumnRelationBuilder;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.datastax.oss.driver.api.querybuilder.term.Term;
import com.exactpro.cradle.Direction;
import com.exactpro.cradle.Order;
import com.exactpro.cradle.cassandra.CassandraCradleStorage;
import com.exactpro.cradle.cassandra.dao.messages.DetailedMessageBatchEntity;
import com.exactpro.cradle.cassandra.dao.messages.MessageBatchOperator;
import com.exactpro.cradle.cassandra.dao.messages.TimeMessageEntity;
import com.exactpro.cradle.cassandra.dao.messages.TimeMessageOperator;
import com.exactpro.cradle.cassandra.utils.CassandraMessageUtils;
import com.exactpro.cradle.cassandra.utils.FilterUtils;
import com.exactpro.cradle.filters.ComparisonOperation;
import com.exactpro.cradle.filters.FilterForEquals;
import com.exactpro.cradle.messages.StoredMessageFilter;
import com.exactpro.cradle.messages.StoredMessageId;
import com.exactpro.cradle.utils.CradleStorageException;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageBatchQueryProvider {
    private static final Logger logger = LoggerFactory.getLogger(MessageBatchQueryProvider.class);
    private static final String LEFT_MESSAGE_INDEX = "left_message_index";
    private static final String RIGHT_MESSAGE_INDEX = "right_message_index";
    private final CqlSession session;
    private final EntityHelper<DetailedMessageBatchEntity> helper;
    private final Select selectStart;

    public MessageBatchQueryProvider(MapperContext context, EntityHelper<DetailedMessageBatchEntity> helper) {
        this.session = context.getSession();
        this.helper = helper;
        this.selectStart = ((Select)helper.selectStart().whereColumn("instance_id").isEqualTo((Term)QueryBuilder.bindMarker())).allowFiltering();
    }

    public CompletableFuture<MappedAsyncPagingIterable<DetailedMessageBatchEntity>> filterMessages(UUID instanceId, StoredMessageFilter filter, MessageBatchOperator mbOperator, TimeMessageOperator tmOperator, Function<BoundStatementBuilder, BoundStatementBuilder> attributes) {
        BoundStatement bs;
        Select select = this.selectStart;
        Order order = null;
        if (filter != null) {
            order = filter.getOrder();
            select = this.addFilter(select, filter);
        }
        select = this.orderBy(order, select);
        PreparedStatement ps = this.session.prepare(select.build());
        try {
            bs = this.bindParameters(ps, instanceId, filter, mbOperator, tmOperator, attributes);
        }
        catch (CradleStorageException e) {
            CompletableFuture<MappedAsyncPagingIterable<DetailedMessageBatchEntity>> error = new CompletableFuture<MappedAsyncPagingIterable<DetailedMessageBatchEntity>>();
            error.completeExceptionally(e);
            return error;
        }
        return this.session.executeAsync((Statement)bs).thenApply(r -> r.map(arg_0 -> this.helper.get(arg_0))).toCompletableFuture();
    }

    private Select orderBy(Order order, Select select) {
        if (order == null) {
            order = Order.DIRECT;
        }
        ClusteringOrder clusteringOrder = order == Order.DIRECT ? ClusteringOrder.ASC : ClusteringOrder.DESC;
        return select.orderBy("direction", clusteringOrder).orderBy("message_index", clusteringOrder);
    }

    private Select addFilter(Select select, StoredMessageFilter filter) {
        FilterForEquals direction;
        FilterForEquals streamName = filter.getStreamName();
        if (streamName != null) {
            select = FilterUtils.filterToWhere(streamName.getOperation(), (ColumnRelationBuilder<Select>)select.whereColumn("stream_name"), null);
        }
        if ((direction = filter.getDirection()) != null) {
            select = FilterUtils.filterToWhere(direction.getOperation(), (ColumnRelationBuilder<Select>)select.whereColumn("direction"), null);
        }
        boolean isLeftIndexSelected = false;
        boolean isRightIndexSelected = false;
        if (filter.getIndex() != null) {
            ComparisonOperation op = filter.getIndex().getOperation();
            if (op == ComparisonOperation.EQUALS) {
                select = FilterUtils.filterToWhere(ComparisonOperation.GREATER_OR_EQUALS, (ColumnRelationBuilder<Select>)select.whereColumn("message_index"), LEFT_MESSAGE_INDEX);
                select = FilterUtils.filterToWhere(ComparisonOperation.LESS_OR_EQUALS, (ColumnRelationBuilder<Select>)select.whereColumn("message_index"), RIGHT_MESSAGE_INDEX);
                return select;
            }
            if (filter.getLimit() > 0 && (op == ComparisonOperation.LESS || op == ComparisonOperation.LESS_OR_EQUALS)) {
                select = FilterUtils.filterToWhere(ComparisonOperation.GREATER_OR_EQUALS, (ColumnRelationBuilder<Select>)select.whereColumn("message_index"), LEFT_MESSAGE_INDEX);
            }
            switch (op) {
                case GREATER: {
                    op = ComparisonOperation.GREATER_OR_EQUALS;
                }
                case GREATER_OR_EQUALS: {
                    isLeftIndexSelected = true;
                    select = FilterUtils.filterToWhere(op, (ColumnRelationBuilder<Select>)select.whereColumn("message_index"), LEFT_MESSAGE_INDEX);
                    break;
                }
                case LESS: {
                    op = ComparisonOperation.LESS_OR_EQUALS;
                }
                default: {
                    isRightIndexSelected = true;
                    select = FilterUtils.filterToWhere(op, (ColumnRelationBuilder<Select>)select.whereColumn("message_index"), RIGHT_MESSAGE_INDEX);
                }
            }
        }
        if (!isLeftIndexSelected && filter.getTimestampFrom() != null) {
            select = FilterUtils.filterToWhere(ComparisonOperation.GREATER_OR_EQUALS, (ColumnRelationBuilder<Select>)select.whereColumn("message_index"), LEFT_MESSAGE_INDEX);
        }
        if (!isRightIndexSelected && filter.getTimestampTo() != null) {
            select = FilterUtils.filterToWhere(ComparisonOperation.LESS_OR_EQUALS, (ColumnRelationBuilder<Select>)select.whereColumn("message_index"), RIGHT_MESSAGE_INDEX);
        }
        if (filter.getLimit() > 0) {
            select.limit(filter.getLimit());
        }
        return select;
    }

    private BoundStatement bindParameters(PreparedStatement ps, UUID instanceId, StoredMessageFilter filter, MessageBatchOperator mbOperator, TimeMessageOperator tmOperator, Function<BoundStatementBuilder, BoundStatementBuilder> attributes) throws CradleStorageException {
        BoundStatementBuilder builder = (BoundStatementBuilder)ps.boundStatementBuilder(new Object[0]).setUuid("instance_id", instanceId);
        builder = attributes.apply(builder);
        if (filter != null) {
            builder = this.bindFilterParameters(builder, instanceId, filter, mbOperator, tmOperator, attributes);
        }
        return builder.build();
    }

    private DetailedMessageBatchEntity getMessageBatch(UUID instanceId, StoredMessageFilter filter, MessageBatchOperator operator, Function<BoundStatementBuilder, BoundStatementBuilder> attributes) throws CradleStorageException {
        if (filter.getStreamName() == null || filter.getDirection() == null) {
            throw new CradleStorageException("Both streamName and direction are required when filtering by message index");
        }
        StoredMessageId id = new StoredMessageId((String)((Object)filter.getStreamName().getValue()), (Direction)filter.getDirection().getValue(), ((Long)filter.getIndex().getValue()).longValue());
        try {
            return CassandraMessageUtils.getMessageBatch(id, operator, instanceId, attributes).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new CradleStorageException("Error while getting message batch for ID " + id, (Throwable)e);
        }
    }

    private BoundStatementBuilder bindFilterParameters(BoundStatementBuilder builder, UUID instanceId, StoredMessageFilter filter, MessageBatchOperator operator, TimeMessageOperator tmOperator, Function<BoundStatementBuilder, BoundStatementBuilder> attributes) throws CradleStorageException {
        Instant ts;
        if (filter.getStreamName() == null) {
            throw new CradleStorageException("Stream name is a mandatory filter field and can't be empty");
        }
        builder = (BoundStatementBuilder)builder.setString("stream_name", (String)((Object)filter.getStreamName().getValue()));
        FilterForEquals directionFilter = filter.getDirection();
        if ((filter.getTimestampFrom() != null || filter.getTimestampTo() != null) && directionFilter == null) {
            throw new CradleStorageException("Direction is a mandatory filter field for filtering by timestamp or index");
        }
        if (directionFilter != null) {
            builder = (BoundStatementBuilder)builder.setString("direction", ((Direction)directionFilter.getValue()).getLabel());
        }
        long leftIndex = 0L;
        long rightIndex = Long.MAX_VALUE;
        if (filter.getIndex() != null) {
            ComparisonOperation op = filter.getIndex().getOperation();
            DetailedMessageBatchEntity batch = this.getMessageBatch(instanceId, filter, operator, attributes);
            if (filter.getLimit() > 0 && (op == ComparisonOperation.LESS || op == ComparisonOperation.LESS_OR_EQUALS)) {
                try {
                    leftIndex = CassandraMessageUtils.findLeftMessageIndex(batch, filter, instanceId, operator, attributes);
                }
                catch (IOException e) {
                    logger.warn("Error while finding left batch index for stream '" + batch.getStreamName() + "', direction '" + batch.getDirection() + "' and index " + batch.getMessageIndex(), (Throwable)e);
                    leftIndex = batch.getMessageIndex();
                }
                builder = (BoundStatementBuilder)builder.setLong(LEFT_MESSAGE_INDEX, leftIndex);
            }
            long leftBatchIndex = batch != null ? batch.getMessageIndex() : ((Long)filter.getIndex().getValue()).longValue();
            long rightBatchIndex = batch != null ? batch.getLastMessageIndex() : ((Long)filter.getIndex().getValue()).longValue();
            switch (op) {
                case GREATER: 
                case GREATER_OR_EQUALS: {
                    leftIndex = leftBatchIndex;
                    builder = (BoundStatementBuilder)builder.setLong(LEFT_MESSAGE_INDEX, leftIndex);
                    break;
                }
                case LESS: 
                case LESS_OR_EQUALS: {
                    rightIndex = rightBatchIndex;
                    builder = (BoundStatementBuilder)builder.setLong(RIGHT_MESSAGE_INDEX, rightIndex);
                    break;
                }
                case EQUALS: {
                    builder = (BoundStatementBuilder)builder.setLong(LEFT_MESSAGE_INDEX, leftBatchIndex);
                    builder = (BoundStatementBuilder)builder.setLong(RIGHT_MESSAGE_INDEX, rightBatchIndex);
                    return builder;
                }
            }
        }
        if (filter.getTimestampFrom() != null) {
            ts = (Instant)filter.getTimestampFrom().getValue();
            try {
                long fromIndex = this.getNearestMessageIndexBefore(tmOperator, instanceId, (String)((Object)filter.getStreamName().getValue()), (Direction)directionFilter.getValue(), ts, attributes);
                if (fromIndex >= leftIndex) {
                    builder = (BoundStatementBuilder)builder.setLong(LEFT_MESSAGE_INDEX, fromIndex);
                }
            }
            catch (InterruptedException | ExecutionException e) {
                throw new CradleStorageException("Error getting message batch index for timestamp 'From=" + ts + '\'', (Throwable)e);
            }
        }
        if (filter.getTimestampTo() != null) {
            ts = (Instant)filter.getTimestampTo().getValue();
            try {
                long toIndex = this.getNearestMessageIndexAfter(tmOperator, instanceId, (String)((Object)filter.getStreamName().getValue()), (Direction)directionFilter.getValue(), ts, attributes);
                if (toIndex <= rightIndex) {
                    builder = (BoundStatementBuilder)builder.setLong(RIGHT_MESSAGE_INDEX, toIndex);
                }
            }
            catch (InterruptedException | ExecutionException e) {
                throw new CradleStorageException("Error getting message batch index for timestamp 'To=" + ts + '\'', (Throwable)e);
            }
        }
        return builder;
    }

    private long getNearestMessageIndexBefore(TimeMessageOperator tmOperator, UUID instanceId, String streamName, Direction direction, Instant instant, Function<BoundStatementBuilder, BoundStatementBuilder> attributes) throws ExecutionException, InterruptedException {
        LocalDateTime ldt = LocalDateTime.ofInstant(instant, CassandraCradleStorage.TIMEZONE_OFFSET);
        TimeMessageEntity entity = tmOperator.getNearestMessageBefore(instanceId, streamName, ldt.toLocalDate(), direction.getLabel(), ldt.toLocalTime(), attributes).get();
        return entity == null ? 0L : entity.getMessageIndex();
    }

    private long getNearestMessageIndexAfter(TimeMessageOperator tmOperator, UUID instanceId, String streamName, Direction direction, Instant instant, Function<BoundStatementBuilder, BoundStatementBuilder> attributes) throws ExecutionException, InterruptedException {
        LocalDateTime ldt = LocalDateTime.ofInstant(instant, CassandraCradleStorage.TIMEZONE_OFFSET);
        TimeMessageEntity entity = tmOperator.getNearestMessageAfter(instanceId, streamName, ldt.toLocalDate(), direction.getLabel(), ldt.toLocalTime(), attributes).get();
        return entity == null ? Long.MAX_VALUE : entity.getMessageIndex();
    }
}

