/*
 * Decompiled with CFR 0.152.
 */
package com.exactpro.cradle.cassandra;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.datastax.oss.driver.api.querybuilder.term.Term;
import com.exactpro.cradle.CradleObjectsFactory;
import com.exactpro.cradle.CradleStorage;
import com.exactpro.cradle.Direction;
import com.exactpro.cradle.Order;
import com.exactpro.cradle.TimeRelation;
import com.exactpro.cradle.cassandra.CassandraSemaphore;
import com.exactpro.cradle.cassandra.CassandraStorageSettings;
import com.exactpro.cradle.cassandra.TablesCreator;
import com.exactpro.cradle.cassandra.connection.CassandraConnection;
import com.exactpro.cradle.cassandra.connection.CassandraConnectionSettings;
import com.exactpro.cradle.cassandra.dao.AsyncOperator;
import com.exactpro.cradle.cassandra.dao.CassandraDataMapper;
import com.exactpro.cradle.cassandra.dao.CassandraDataMapperBuilder;
import com.exactpro.cradle.cassandra.dao.CassandraOperators;
import com.exactpro.cradle.cassandra.dao.intervals.CassandraIntervalsWorker;
import com.exactpro.cradle.cassandra.dao.intervals.IntervalSupplies;
import com.exactpro.cradle.cassandra.dao.messages.DetailedMessageBatchEntity;
import com.exactpro.cradle.cassandra.dao.messages.MessageBatchOperator;
import com.exactpro.cradle.cassandra.dao.messages.MessageTestEventEntity;
import com.exactpro.cradle.cassandra.dao.messages.MessageTestEventOperator;
import com.exactpro.cradle.cassandra.dao.messages.StreamEntity;
import com.exactpro.cradle.cassandra.dao.messages.TimeMessageEntity;
import com.exactpro.cradle.cassandra.dao.messages.TimeMessageOperator;
import com.exactpro.cradle.cassandra.dao.testevents.DetailedTestEventEntity;
import com.exactpro.cradle.cassandra.dao.testevents.RootTestEventDateEntity;
import com.exactpro.cradle.cassandra.dao.testevents.RootTestEventEntity;
import com.exactpro.cradle.cassandra.dao.testevents.RootTestEventOperator;
import com.exactpro.cradle.cassandra.dao.testevents.TestEventChildDateEntity;
import com.exactpro.cradle.cassandra.dao.testevents.TestEventChildEntity;
import com.exactpro.cradle.cassandra.dao.testevents.TestEventMessagesEntity;
import com.exactpro.cradle.cassandra.dao.testevents.TestEventMessagesOperator;
import com.exactpro.cradle.cassandra.dao.testevents.TimeTestEventEntity;
import com.exactpro.cradle.cassandra.iterators.MessagesIteratorAdapter;
import com.exactpro.cradle.cassandra.iterators.RootTestEventsMetadataIteratorAdapter;
import com.exactpro.cradle.cassandra.iterators.StoredMessageBatchAdapter;
import com.exactpro.cradle.cassandra.iterators.TestEventChildrenMetadataIteratorAdapter;
import com.exactpro.cradle.cassandra.iterators.TimeTestEventsMetadataIteratorAdapter;
import com.exactpro.cradle.cassandra.linkers.CassandraTestEventsMessagesLinker;
import com.exactpro.cradle.cassandra.linkers.LinkerSupplies;
import com.exactpro.cradle.cassandra.retries.CompleteEventsGetter;
import com.exactpro.cradle.cassandra.retries.FixedNumberRetryPolicy;
import com.exactpro.cradle.cassandra.retries.PageSizeAdjustingPolicy;
import com.exactpro.cradle.cassandra.retries.PagingSupplies;
import com.exactpro.cradle.cassandra.retries.SelectExecutionPolicy;
import com.exactpro.cradle.cassandra.retries.SelectQueryExecutor;
import com.exactpro.cradle.cassandra.utils.CassandraMessageUtils;
import com.exactpro.cradle.cassandra.utils.QueryExecutor;
import com.exactpro.cradle.intervals.IntervalsWorker;
import com.exactpro.cradle.messages.StoredMessage;
import com.exactpro.cradle.messages.StoredMessageBatch;
import com.exactpro.cradle.messages.StoredMessageFilter;
import com.exactpro.cradle.messages.StoredMessageId;
import com.exactpro.cradle.testevents.StoredTestEvent;
import com.exactpro.cradle.testevents.StoredTestEventId;
import com.exactpro.cradle.testevents.StoredTestEventMetadata;
import com.exactpro.cradle.testevents.StoredTestEventWrapper;
import com.exactpro.cradle.testevents.TestEventsMessagesLinker;
import com.exactpro.cradle.utils.CradleStorageException;
import com.exactpro.cradle.utils.MessageUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraCradleStorage
extends CradleStorage {
    public static final long EMPTY_MESSAGE_INDEX = -1L;
    private Logger logger = LoggerFactory.getLogger(CassandraCradleStorage.class);
    public static final ZoneOffset TIMEZONE_OFFSET = ZoneOffset.UTC;
    private final CassandraConnection connection;
    private final CassandraStorageSettings settings;
    private final CassandraSemaphore semaphore;
    private final CradleObjectsFactory objectsFactory;
    private CassandraOperators ops;
    private UUID instanceUuid;
    private Function<BoundStatementBuilder, BoundStatementBuilder> writeAttrs;
    private Function<BoundStatementBuilder, BoundStatementBuilder> readAttrs;
    private Function<BoundStatementBuilder, BoundStatementBuilder> strictReadAttrs;
    private int resultPageSize;
    private SelectExecutionPolicy multiRowResultExecPolicy;
    private SelectExecutionPolicy singleRowResultExecPolicy;
    private QueryExecutor exec;
    private SelectQueryExecutor selectExecutor;
    private CompleteEventsGetter completeEventsGetter;
    private PagingSupplies pagingSupplies;
    private TestEventsMessagesLinker testEventsMessagesLinker;
    private IntervalsWorker intervalsWorker;

    public CassandraCradleStorage(CassandraConnection connection, CassandraStorageSettings settings) {
        CassandraConnectionSettings conSettings = connection.getSettings();
        this.connection = connection;
        this.settings = settings;
        this.semaphore = new CassandraSemaphore(conSettings.getMaxParallelQueries());
        this.objectsFactory = new CradleObjectsFactory(settings.getMaxMessageBatchSize(), settings.getMaxTestEventBatchSize());
        this.resultPageSize = conSettings.getResultPageSize();
        this.multiRowResultExecPolicy = conSettings.getSelectExecutionPolicy();
        if (this.multiRowResultExecPolicy == null) {
            this.multiRowResultExecPolicy = new PageSizeAdjustingPolicy(this.resultPageSize == 0 ? 5000 : this.resultPageSize, 2);
        }
        this.singleRowResultExecPolicy = conSettings.getSingleRowResultExecutionPolicy();
        if (this.singleRowResultExecPolicy == null) {
            this.singleRowResultExecPolicy = new FixedNumberRetryPolicy(5);
        }
    }

    public UUID getInstanceUuid() {
        return this.instanceUuid;
    }

    protected String doInit(String instanceName, boolean prepareStorage) throws CradleStorageException {
        this.logger.info("Connecting to Cassandra...");
        try {
            this.connection.start();
        }
        catch (Exception e) {
            throw new CradleStorageException("Could not open Cassandra connection", (Throwable)e);
        }
        try {
            CqlSession session = this.connection.getSession();
            this.exec = new QueryExecutor(session, this.settings.getTimeout(), this.settings.getWriteConsistencyLevel(), this.settings.getReadConsistencyLevel());
            this.selectExecutor = new SelectQueryExecutor(session, this.multiRowResultExecPolicy, this.singleRowResultExecPolicy);
            this.pagingSupplies = new PagingSupplies(session, this.multiRowResultExecPolicy);
            if (prepareStorage) {
                this.logger.info("Creating/updating schema...");
                this.createTables();
                this.logger.info("All needed tables created");
            } else {
                this.logger.info("Schema creation/update skipped");
            }
            this.instanceUuid = this.getInstanceId(instanceName);
            CassandraDataMapper dataMapper = new CassandraDataMapperBuilder(session).build();
            this.ops = this.createOperators(dataMapper, this.settings);
            Duration timeout = Duration.ofMillis(this.settings.getTimeout());
            this.writeAttrs = builder -> (BoundStatementBuilder)((BoundStatementBuilder)builder.setConsistencyLevel(this.settings.getWriteConsistencyLevel())).setTimeout(timeout);
            this.readAttrs = builder -> (BoundStatementBuilder)((BoundStatementBuilder)((BoundStatementBuilder)builder.setConsistencyLevel(this.settings.getReadConsistencyLevel())).setTimeout(timeout)).setPageSize(this.resultPageSize);
            this.strictReadAttrs = builder -> (BoundStatementBuilder)((BoundStatementBuilder)((BoundStatementBuilder)builder.setConsistencyLevel(ConsistencyLevel.ALL)).setTimeout(timeout)).setPageSize(this.resultPageSize);
            LinkerSupplies supplies = new LinkerSupplies(this.ops.getTestEventMessagesOperator(), this.ops.getMessageTestEventOperator(), this.ops.getTestEventMessagesConverter(), this.ops.getMessageTestEventConverter());
            this.testEventsMessagesLinker = new CassandraTestEventsMessagesLinker(supplies, this.instanceUuid, this.readAttrs, this.semaphore, this.selectExecutor, this.pagingSupplies);
            this.completeEventsGetter = new CompleteEventsGetter(this.instanceUuid, this.readAttrs, this.multiRowResultExecPolicy, this.ops.getTestEventOperator(), this.ops.getTestEventConverter(), this.pagingSupplies);
            IntervalSupplies intervalSupplies = new IntervalSupplies(this.ops.getIntervalOperator(), this.ops.getIntervalConverter(), this.pagingSupplies);
            this.intervalsWorker = new CassandraIntervalsWorker(this.semaphore, this.instanceUuid, this.writeAttrs, this.readAttrs, intervalSupplies);
            return this.instanceUuid.toString();
        }
        catch (IOException e) {
            throw new CradleStorageException("Could not initialize storage", (Throwable)e);
        }
    }

    protected void doDispose() throws CradleStorageException {
        this.logger.info("Disconnecting from Cassandra...");
        try {
            this.connection.stop();
        }
        catch (Exception e) {
            this.logger.error("Error while closing Cassandra connection", (Throwable)e);
        }
    }

    protected void doStoreMessageBatch(StoredMessageBatch batch) throws IOException {
        try {
            this.doStoreMessageBatchAsync(batch).get();
        }
        catch (Exception e) {
            throw new IOException("Error while storing message batch " + batch.getId(), e);
        }
    }

    protected CompletableFuture<Void> doStoreMessageBatchAsync(StoredMessageBatch batch) {
        return this.writeMessage(batch, true);
    }

    protected void doStoreTimeMessage(StoredMessage message) throws IOException {
        try {
            this.doStoreTimeMessageAsync(message).get();
        }
        catch (Exception e) {
            throw new IOException("Error while storing time/message data for message " + message.getId(), e);
        }
    }

    protected CompletableFuture<Void> doStoreTimeMessageAsync(StoredMessage message) {
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> {
            TimeMessageEntity timeEntity = new TimeMessageEntity(message, this.instanceUuid);
            this.logger.trace("Executing time/message storing query for message {}", (Object)message.getId());
            return this.ops.getTimeMessageOperator().writeMessage(timeEntity, this.writeAttrs);
        });
        return future.thenAccept(e -> {});
    }

    protected void doStoreProcessedMessageBatch(StoredMessageBatch batch) throws IOException {
        try {
            this.doStoreProcessedMessageBatchAsync(batch).get();
        }
        catch (Exception e) {
            throw new IOException("Error while storing processed message batch " + batch.getId(), e);
        }
    }

    protected CompletableFuture<Void> doStoreProcessedMessageBatchAsync(StoredMessageBatch batch) {
        return this.writeMessage(batch, false);
    }

    protected void doStoreTestEvent(StoredTestEvent event) throws IOException {
        try {
            this.doStoreTestEventAsync(event).get();
        }
        catch (Exception e) {
            throw new IOException("Error while storing test event " + event.getId(), e);
        }
    }

    protected CompletableFuture<Void> doStoreTestEventAsync(StoredTestEvent event) {
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
        futures.add(this.storeEvent(event).thenAccept(r -> {}));
        futures.add(this.storeTimeEvent(event).thenAccept(r -> {}));
        if (event.getParentId() != null) {
            futures.add(this.storeEventInParent(event).thenAccept(r -> {}));
            futures.add(this.storeEventDateInParent(event).thenAccept(r -> {}));
        } else {
            futures.add(this.storeRootEvent(event).thenAccept(r -> {}));
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }

    protected void doUpdateParentTestEvents(StoredTestEvent event) throws IOException {
        if (event.isSuccess()) {
            return;
        }
        try {
            this.doUpdateParentTestEventsAsync(event).get();
        }
        catch (Exception e) {
            throw new IOException("Error while updating parents of " + event.getId() + " test event", e);
        }
    }

    protected CompletableFuture<Void> doUpdateParentTestEventsAsync(StoredTestEvent event) {
        if (event.isSuccess()) {
            return CompletableFuture.completedFuture(null);
        }
        return this.failEventAndParents(event.getParentId());
    }

    protected void doStoreTestEventMessagesLink(StoredTestEventId eventId, StoredTestEventId batchId, Collection<StoredMessageId> messageIds) throws IOException {
        try {
            this.doStoreTestEventMessagesLinkAsync(eventId, batchId, messageIds).get();
        }
        catch (Exception e) {
            throw new IOException("Error while storing link between test event " + eventId + " and " + messageIds.size() + " message(s)", e);
        }
    }

    protected CompletableFuture<Void> doStoreTestEventMessagesLinkAsync(StoredTestEventId eventId, StoredTestEventId batchId, Collection<StoredMessageId> messageIds) {
        List<String> messageIdsStrings = messageIds.stream().map(StoredMessageId::toString).collect(Collectors.toList());
        String eventIdString = eventId.toString();
        return CompletableFuture.allOf(this.storeMessagesOfTestEvent(eventIdString, messageIdsStrings));
    }

    protected StoredMessage doGetMessage(StoredMessageId id) throws IOException {
        try {
            return this.doGetMessageAsync(id).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting message " + id, e);
        }
    }

    protected CompletableFuture<StoredMessage> doGetMessageAsync(StoredMessageId id) {
        return this.readMessage(id, true);
    }

    protected Collection<StoredMessage> doGetMessageBatch(StoredMessageId id) throws IOException {
        try {
            return this.doGetMessageBatchAsync(id).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting message batch " + id, e);
        }
    }

    protected CompletableFuture<Collection<StoredMessage>> doGetMessageBatchAsync(StoredMessageId id) {
        CompletableFuture<DetailedMessageBatchEntity> entityFuture = this.readMessageBatchEntity(id, true);
        return entityFuture.thenApply(entity -> {
            if (entity == null) {
                return null;
            }
            try {
                return MessageUtils.bytesToMessages((ByteBuffer)entity.getContent(), (boolean)entity.isCompressed());
            }
            catch (IOException e) {
                throw new CompletionException("Error while reading message batch", e);
            }
        });
    }

    protected StoredMessage doGetProcessedMessage(StoredMessageId id) throws IOException {
        try {
            return this.doGetProcessedMessageAsync(id).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting processed message " + id, e);
        }
    }

    protected CompletableFuture<StoredMessage> doGetProcessedMessageAsync(StoredMessageId id) {
        return this.readMessage(id, false);
    }

    protected long doGetFirstMessageIndex(String streamName, Direction direction) throws IOException {
        return this.getFirstIndex(this.ops.getMessageBatchOperator(), streamName, direction);
    }

    protected long doGetLastMessageIndex(String streamName, Direction direction) throws IOException {
        return this.getLastIndex(this.ops.getMessageBatchOperator(), streamName, direction);
    }

    protected long doGetFirstProcessedMessageIndex(String streamName, Direction direction) throws IOException {
        return this.getFirstIndex(this.ops.getProcessedMessageBatchOperator(), streamName, direction);
    }

    protected long doGetLastProcessedMessageIndex(String streamName, Direction direction) throws IOException {
        return this.getLastIndex(this.ops.getProcessedMessageBatchOperator(), streamName, direction);
    }

    protected StoredMessageId doGetNearestMessageId(String streamName, Direction direction, Instant timestamp, TimeRelation timeRelation) throws IOException {
        try {
            return this.doGetNearestMessageIdAsync(streamName, direction, timestamp, timeRelation).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting nearest message ID", e);
        }
    }

    protected CompletableFuture<StoredMessageId> doGetNearestMessageIdAsync(String streamName, Direction direction, Instant timestamp, TimeRelation timeRelation) {
        CompletableFuture<TimeMessageEntity> timeMessageEntityFuture = this.readTimeMessageEntity(streamName, direction, timestamp, timeRelation);
        return timeMessageEntityFuture.thenCompose(entity -> {
            if (entity == null) {
                return CompletableFuture.completedFuture(null);
            }
            return CompletableFuture.completedFuture(entity.createMessageId());
        });
    }

    private CompletableFuture<TimeMessageEntity> readTimeMessageEntity(String streamName, Direction direction, Instant timestamp, TimeRelation timeRelation) {
        LocalDateTime messageDateTime = LocalDateTime.ofInstant(timestamp, TIMEZONE_OFFSET);
        CompletableFuture<TimeMessageEntity> result = timeRelation == TimeRelation.BEFORE ? this.ops.getTimeMessageOperator().getNearestMessageBefore(this.instanceUuid, streamName, messageDateTime.toLocalDate(), direction.getLabel(), messageDateTime.toLocalTime(), this.readAttrs) : this.ops.getTimeMessageOperator().getNearestMessageAfter(this.instanceUuid, streamName, messageDateTime.toLocalDate(), direction.getLabel(), messageDateTime.toLocalTime(), this.readAttrs);
        return result;
    }

    protected StoredTestEventWrapper doGetTestEvent(StoredTestEventId id) throws IOException {
        try {
            return this.doGetTestEventAsync(id).get();
        }
        catch (Exception e) {
            throw new IOException("Could not get test event", e);
        }
    }

    protected CompletableFuture<StoredTestEventWrapper> doGetTestEventAsync(StoredTestEventId id) {
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> this.ops.getTestEventOperator().get(this.instanceUuid, id.toString(), this.readAttrs));
        return future.thenApply(e -> {
            try {
                return e != null ? e.toStoredTestEventWrapper() : null;
            }
            catch (Exception error) {
                throw new CompletionException("Could not get test event", error);
            }
        });
    }

    protected Iterable<StoredTestEventWrapper> doGetCompleteTestEvents(Set<StoredTestEventId> ids) throws IOException {
        try {
            return this.doGetCompleteTestEventsAsync(ids).get();
        }
        catch (Exception e) {
            throw new IOException("Could not get test events", e);
        }
    }

    protected CompletableFuture<Iterable<StoredTestEventWrapper>> doGetCompleteTestEventsAsync(Set<StoredTestEventId> id) {
        return new AsyncOperator(this.semaphore).getFuture(() -> this.completeEventsGetter.get(id, "get test events " + id));
    }

    public TestEventsMessagesLinker getTestEventsMessagesLinker() {
        return this.testEventsMessagesLinker;
    }

    public IntervalsWorker getIntervalsWorker() {
        return this.intervalsWorker;
    }

    protected Iterable<StoredMessage> doGetMessages(StoredMessageFilter filter) throws IOException {
        try {
            return this.doGetMessagesAsync(filter).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting messages filtered by " + filter, e);
        }
    }

    protected CompletableFuture<Iterable<StoredMessage>> doGetMessagesAsync(StoredMessageFilter filter) {
        String queryInfo = "get messages filtered by " + filter;
        return this.doGetDetailedMessageBatchEntities(filter, queryInfo).thenApply(it -> new MessagesIteratorAdapter(filter, (MappedAsyncPagingIterable<DetailedMessageBatchEntity>)it, this.pagingSupplies, this.ops.getMessageBatchConverter(), queryInfo));
    }

    protected Iterable<StoredMessageBatch> doGetMessagesBatches(StoredMessageFilter filter) throws IOException {
        try {
            return this.doGetMessagesBatchesAsync(filter).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting messages filtered by " + filter, e);
        }
    }

    protected CompletableFuture<Iterable<StoredMessageBatch>> doGetMessagesBatchesAsync(StoredMessageFilter filter) {
        String queryInfo = "get message batches filtered by " + filter;
        return this.doGetDetailedMessageBatchEntities(filter, queryInfo).thenApply(it -> new StoredMessageBatchAdapter((MappedAsyncPagingIterable<DetailedMessageBatchEntity>)it, this.pagingSupplies, this.ops.getMessageBatchConverter(), queryInfo, this.objectsFactory, filter == null ? 0 : filter.getLimit()));
    }

    private CompletableFuture<MappedAsyncPagingIterable<DetailedMessageBatchEntity>> doGetDetailedMessageBatchEntities(StoredMessageFilter filter, String queryInfo) {
        MessageBatchOperator mbOp = this.ops.getMessageBatchOperator();
        TimeMessageOperator tmOp = this.ops.getTimeMessageOperator();
        return new AsyncOperator(this.semaphore).getFuture(() -> this.selectExecutor.executeMultiRowResultQuery(() -> mbOp.filterMessages(this.instanceUuid, filter, mbOp, tmOp, this.readAttrs), this.ops.getMessageBatchConverter(), queryInfo));
    }

    protected Iterable<StoredTestEventMetadata> doGetRootTestEvents(Instant from, Instant to, Order order) throws CradleStorageException, IOException {
        try {
            return this.doGetRootTestEventsAsync(from, to, order).get();
        }
        catch (CradleStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Error while getting root test events", e);
        }
    }

    protected CompletableFuture<Iterable<StoredTestEventMetadata>> doGetRootTestEventsAsync(Instant from, Instant to, Order order) throws CradleStorageException {
        LocalDateTime fromDateTime = LocalDateTime.ofInstant(from, TIMEZONE_OFFSET);
        LocalDateTime toDateTime = LocalDateTime.ofInstant(to, TIMEZONE_OFFSET);
        this.checkTimeBoundaries(fromDateTime, toDateTime, from, to);
        LocalTime fromTime = fromDateTime.toLocalTime();
        LocalTime toTime = toDateTime.toLocalTime();
        String queryInfo = "get root test events from range " + from + ".." + to + " in " + order + " order";
        RootTestEventOperator op = this.ops.getRootTestEventOperator();
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> this.selectExecutor.executeMultiRowResultQuery(order == Order.DIRECT ? () -> op.getTestEventsDirect(this.instanceUuid, fromDateTime.toLocalDate(), fromTime, toTime, this.readAttrs) : () -> op.getTestEventsReverse(this.instanceUuid, fromDateTime.toLocalDate(), fromTime, toTime, this.readAttrs), this.ops.getRootTestEventConverter(), queryInfo));
        return future.thenApply(result -> new RootTestEventsMetadataIteratorAdapter((MappedAsyncPagingIterable<RootTestEventEntity>)result, this.pagingSupplies, this.ops.getRootTestEventConverter(), queryInfo));
    }

    protected Iterable<StoredTestEventMetadata> doGetTestEvents(StoredTestEventId parentId, Instant from, Instant to, Order order) throws CradleStorageException, IOException {
        try {
            return this.doGetTestEventsAsync(parentId, from, to, order).get();
        }
        catch (CradleStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Error while getting child test events", e);
        }
    }

    protected CompletableFuture<Iterable<StoredTestEventMetadata>> doGetTestEventsAsync(StoredTestEventId parentId, Instant from, Instant to, Order order) throws CradleStorageException {
        LocalDateTime fromDateTime = LocalDateTime.ofInstant(from, TIMEZONE_OFFSET);
        LocalDateTime toDateTime = LocalDateTime.ofInstant(to, TIMEZONE_OFFSET);
        this.checkTimeBoundaries(fromDateTime, toDateTime, from, to);
        LocalTime fromTime = fromDateTime.toLocalTime();
        LocalTime toTime = toDateTime.toLocalTime();
        String queryInfo = "get child test events of " + parentId + " from range " + from + ".." + to + " in " + order + " order";
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> this.selectExecutor.executeMultiRowResultQuery(() -> order == Order.DIRECT ? this.ops.getTestEventChildrenOperator().getTestEventsDirect(this.instanceUuid, parentId.toString(), fromDateTime.toLocalDate(), fromTime, toTime, this.readAttrs) : this.ops.getTestEventChildrenOperator().getTestEventsReverse(this.instanceUuid, parentId.toString(), fromDateTime.toLocalDate(), fromTime, toTime, this.readAttrs), this.ops.getTestEventChildConverter(), queryInfo));
        return future.thenApply(result -> new TestEventChildrenMetadataIteratorAdapter((MappedAsyncPagingIterable<TestEventChildEntity>)result, this.pagingSupplies, this.ops.getTestEventChildConverter(), queryInfo));
    }

    protected Iterable<StoredTestEventMetadata> doGetTestEvents(Instant from, Instant to, Order order) throws CradleStorageException, IOException {
        try {
            return this.doGetTestEventsAsync(from, to, order).get();
        }
        catch (CradleStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Error while getting test events", e);
        }
    }

    protected CompletableFuture<Iterable<StoredTestEventMetadata>> doGetTestEventsAsync(Instant from, Instant to, Order order) throws CradleStorageException {
        LocalDateTime fromDateTime = LocalDateTime.ofInstant(from, TIMEZONE_OFFSET);
        LocalDateTime toDateTime = LocalDateTime.ofInstant(to, TIMEZONE_OFFSET);
        this.checkTimeBoundaries(fromDateTime, toDateTime, from, to);
        LocalTime fromTime = fromDateTime.toLocalTime();
        LocalTime toTime = toDateTime.toLocalTime();
        String queryInfo = "get test events from range " + from + ".." + to + " in " + order + " order";
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> this.selectExecutor.executeMultiRowResultQuery(() -> order == Order.DIRECT ? this.ops.getTimeTestEventOperator().getTestEventsDirect(this.instanceUuid, fromDateTime.toLocalDate(), fromTime, toTime, this.readAttrs) : this.ops.getTimeTestEventOperator().getTestEventsReverse(this.instanceUuid, fromDateTime.toLocalDate(), fromTime, toTime, this.readAttrs), this.ops.getTimeTestEventConverter(), queryInfo));
        return future.thenApply(result -> new TimeTestEventsMetadataIteratorAdapter((MappedAsyncPagingIterable<TimeTestEventEntity>)result, this.pagingSupplies, this.ops.getTimeTestEventConverter(), queryInfo));
    }

    protected Collection<String> doGetStreams() throws IOException {
        ArrayList<String> result = new ArrayList<String>();
        for (StreamEntity entity : this.ops.getMessageBatchOperator().getStreams(this.readAttrs)) {
            if (!this.instanceUuid.equals(entity.getInstanceId())) continue;
            result.add(entity.getStreamName());
        }
        result.sort(null);
        return result;
    }

    protected Collection<Instant> doGetRootTestEventsDates() throws IOException {
        ArrayList<Instant> result = new ArrayList<Instant>();
        for (RootTestEventDateEntity entity : this.ops.getRootTestEventOperator().getDates(this.readAttrs)) {
            if (!this.instanceUuid.equals(entity.getInstanceId())) continue;
            result.add(entity.getStartDate().atStartOfDay(TIMEZONE_OFFSET).toInstant());
        }
        result.sort(null);
        return result;
    }

    protected Collection<Instant> doGetTestEventsDates(StoredTestEventId parentId) throws IOException {
        ArrayList<Instant> result = new ArrayList<Instant>();
        for (TestEventChildDateEntity entity : this.ops.getTestEventChildrenDatesOperator().get(this.instanceUuid, parentId.toString(), this.readAttrs)) {
            result.add(entity.getStartDate().atStartOfDay(TIMEZONE_OFFSET).toInstant());
        }
        return result;
    }

    public CradleObjectsFactory getObjectsFactory() {
        return this.objectsFactory;
    }

    protected void createTables() throws IOException {
        new TablesCreator(this.exec, this.settings).createAll();
    }

    protected CassandraOperators createOperators(CassandraDataMapper dataMapper, CassandraStorageSettings settings) {
        return new CassandraOperators(dataMapper, settings);
    }

    protected CassandraStorageSettings getSettings() {
        return this.settings;
    }

    public Function<BoundStatementBuilder, BoundStatementBuilder> getWriteAttrs() {
        return this.writeAttrs;
    }

    public Function<BoundStatementBuilder, BoundStatementBuilder> getReadAttrs() {
        return this.readAttrs;
    }

    public Function<BoundStatementBuilder, BoundStatementBuilder> getStrictReadAttrs() {
        return this.strictReadAttrs;
    }

    protected UUID getInstanceId(String instanceName) throws IOException {
        UUID id;
        Select selectFrom = (Select)QueryBuilder.selectFrom((String)this.settings.getKeyspace(), (String)"instances").column("id").whereColumn("name").isEqualTo((Term)QueryBuilder.literal((Object)instanceName));
        Row resultRow = (Row)this.exec.executeQuery(selectFrom.asCql(), false).one();
        if (resultRow != null) {
            id = (UUID)resultRow.get("id", GenericType.UUID);
        } else {
            id = UUID.randomUUID();
            Insert insert = QueryBuilder.insertInto((String)this.settings.getKeyspace(), (String)"instances").value("id", (Term)QueryBuilder.literal((Object)id)).value("name", (Term)QueryBuilder.literal((Object)instanceName)).ifNotExists();
            this.exec.executeQuery(insert.asCql(), true);
        }
        return id;
    }

    protected QueryExecutor getQueryExecutor() {
        return this.exec;
    }

    protected CassandraSemaphore getSemaphore() {
        return this.semaphore;
    }

    private CompletableFuture<Void> writeMessage(StoredMessageBatch batch, boolean rawMessage) {
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> {
            DetailedMessageBatchEntity entity;
            try {
                entity = new DetailedMessageBatchEntity(batch, this.instanceUuid);
            }
            catch (IOException e) {
                CompletableFuture error = new CompletableFuture();
                error.completeExceptionally(e);
                return error;
            }
            this.logger.trace("Executing message batch storing query");
            MessageBatchOperator op = rawMessage ? this.ops.getMessageBatchOperator() : this.ops.getProcessedMessageBatchOperator();
            return op.writeMessageBatch(entity, this.writeAttrs);
        });
        return future.thenAccept(e -> {});
    }

    private CompletableFuture<DetailedMessageBatchEntity> readMessageBatchEntity(StoredMessageId messageId, boolean rawMessage) {
        MessageBatchOperator op = rawMessage ? this.ops.getMessageBatchOperator() : this.ops.getProcessedMessageBatchOperator();
        return new AsyncOperator(this.semaphore).getFuture(() -> CassandraMessageUtils.getMessageBatch(messageId, op, this.instanceUuid, this.readAttrs));
    }

    private CompletableFuture<StoredMessage> readMessage(StoredMessageId id, boolean rawMessage) {
        CompletableFuture<DetailedMessageBatchEntity> entityFuture = this.readMessageBatchEntity(id, rawMessage);
        return entityFuture.thenApply(entity -> {
            if (entity == null) {
                return null;
            }
            try {
                return MessageUtils.bytesToOneMessage((ByteBuffer)entity.getContent(), (boolean)entity.isCompressed(), (StoredMessageId)id);
            }
            catch (IOException e) {
                throw new CompletionException("Error while reading message", e);
            }
        });
    }

    private void checkTimeBoundaries(LocalDateTime fromDateTime, LocalDateTime toDateTime, Instant originalFrom, Instant originalTo) throws CradleStorageException {
        LocalDate toDate;
        LocalDate fromDate = fromDateTime.toLocalDate();
        if (!fromDate.equals(toDate = toDateTime.toLocalDate())) {
            throw new CradleStorageException("Left and right boundaries should be of the same date, but got '" + originalFrom + "' and '" + originalTo + "'");
        }
    }

    private long getFirstIndex(MessageBatchOperator op, String streamName, Direction direction) throws IOException {
        String queryInfo = "get first message for stream '" + streamName + " and direction '" + direction + "'";
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> this.selectExecutor.executeSingleRowResultQuery(() -> op.getFirstIndex(this.instanceUuid, streamName, direction.getLabel(), this.readAttrs), r -> r, queryInfo));
        try {
            Row row = (Row)future.get();
            return row == null ? -1L : row.getLong("message_index");
        }
        catch (Exception e) {
            throw new IOException("Error while getting index of the first message for stream '" + streamName + " and direction '" + direction + "'", e);
        }
    }

    private long getLastIndex(MessageBatchOperator op, String streamName, Direction direction) throws IOException {
        String queryInfo = "get last message for stream '" + streamName + " and direction '" + direction + "'";
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> this.selectExecutor.executeSingleRowResultQuery(() -> op.getLastIndex(this.instanceUuid, streamName, direction.getLabel(), this.readAttrs), r -> r, queryInfo));
        try {
            Row row = (Row)future.get();
            return row == null ? -1L : row.getLong("last_message_index");
        }
        catch (Exception e) {
            throw new IOException("Error while getting index of the last message for stream '" + streamName + " and direction '" + direction + "'", e);
        }
    }

    protected CompletableFuture<DetailedTestEventEntity> storeEvent(StoredTestEvent event) {
        return new AsyncOperator(this.semaphore).getFuture(() -> {
            DetailedTestEventEntity entity;
            try {
                entity = new DetailedTestEventEntity(event, this.instanceUuid);
            }
            catch (IOException e) {
                CompletableFuture error = new CompletableFuture();
                error.completeExceptionally(e);
                return error;
            }
            this.logger.trace("Executing test event storing query");
            return this.ops.getTestEventOperator().write(entity, this.writeAttrs);
        });
    }

    protected CompletableFuture<TimeTestEventEntity> storeTimeEvent(StoredTestEvent event) {
        return new AsyncOperator(this.semaphore).getFuture(() -> {
            TimeTestEventEntity timeEntity;
            try {
                timeEntity = new TimeTestEventEntity(event, this.instanceUuid);
            }
            catch (IOException e) {
                CompletableFuture error = new CompletableFuture();
                error.completeExceptionally(e);
                return error;
            }
            this.logger.trace("Executing time/event storing query");
            return this.ops.getTimeTestEventOperator().writeTestEvent(timeEntity, this.writeAttrs);
        });
    }

    protected CompletableFuture<RootTestEventEntity> storeRootEvent(StoredTestEvent event) {
        return new AsyncOperator(this.semaphore).getFuture(() -> {
            RootTestEventEntity entity = new RootTestEventEntity(event, this.instanceUuid);
            this.logger.trace("Executing root event storing query");
            return this.ops.getRootTestEventOperator().writeTestEvent(entity, this.writeAttrs);
        });
    }

    protected CompletableFuture<TestEventChildEntity> storeEventInParent(StoredTestEvent event) {
        return new AsyncOperator(this.semaphore).getFuture(() -> {
            TestEventChildEntity entity;
            try {
                entity = new TestEventChildEntity(event, this.instanceUuid);
            }
            catch (IOException e) {
                CompletableFuture error = new CompletableFuture();
                error.completeExceptionally(e);
                return error;
            }
            this.logger.trace("Executing parent/event storing query");
            return this.ops.getTestEventChildrenOperator().writeTestEvent(entity, this.writeAttrs);
        });
    }

    protected CompletableFuture<TestEventChildDateEntity> storeEventDateInParent(StoredTestEvent event) {
        return new AsyncOperator(this.semaphore).getFuture(() -> {
            TestEventChildDateEntity entity = new TestEventChildDateEntity(event, this.instanceUuid);
            this.logger.trace("Executing parent/event date storing query");
            return this.ops.getTestEventChildrenDatesOperator().writeTestEventDate(entity, this.writeAttrs);
        });
    }

    protected CompletableFuture<Void> storeMessagesOfTestEvent(String eventId, List<String> messageIds) {
        ArrayList futures = new ArrayList();
        TestEventMessagesOperator op = this.ops.getTestEventMessagesOperator();
        int msgsSize = messageIds.size();
        for (int left = 0; left < msgsSize; ++left) {
            int right = Math.min(left + 10, msgsSize);
            HashSet<String> curMsgsIds = new HashSet<String>(messageIds.subList(left, right));
            this.logger.trace("Linking {} message(s) to test event {}", (Object)curMsgsIds.size(), (Object)eventId);
            TestEventMessagesEntity entity = new TestEventMessagesEntity();
            entity.setInstanceId(this.getInstanceUuid());
            entity.setEventId(eventId);
            entity.setMessageIds(curMsgsIds);
            futures.add(new AsyncOperator(this.semaphore).getFuture(() -> op.writeMessages(entity, this.writeAttrs)));
            left = right - 1;
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }

    protected CompletableFuture<Void> storeTestEventOfMessages(List<String> messageIds, String eventId, StoredTestEventId batchId) {
        String batchIdString = batchId != null ? batchId.toString() : null;
        ArrayList futures = new ArrayList();
        MessageTestEventOperator op = this.ops.getMessageTestEventOperator();
        for (String id : messageIds) {
            this.logger.trace("Linking test event {} to message {}", (Object)eventId, (Object)id);
            MessageTestEventEntity entity = new MessageTestEventEntity();
            entity.setInstanceId(this.getInstanceUuid());
            entity.setMessageId(id);
            entity.setEventId(eventId);
            if (batchIdString != null) {
                entity.setBatchId(batchIdString);
            }
            futures.add(new AsyncOperator(this.semaphore).getFuture(() -> op.writeTestEvent(entity, this.writeAttrs)));
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }

    protected void doUpdateEventStatus(StoredTestEventWrapper event, boolean success) throws IOException {
        try {
            this.doUpdateEventStatusAsync(event, success).get();
        }
        catch (Exception e) {
            throw new IOException("Error while updating status of event " + event.getId(), e);
        }
    }

    protected CompletableFuture<Void> doUpdateEventStatusAsync(StoredTestEventWrapper event, boolean success) {
        String id = event.getId().toString();
        String parentId = event.getParentId() != null ? event.getParentId().toString() : null;
        LocalDateTime ldt = LocalDateTime.ofInstant(event.getStartTimestamp(), TIMEZONE_OFFSET);
        LocalDate ld = ldt.toLocalDate();
        LocalTime lt = ldt.toLocalTime();
        CompletableFuture result1 = new AsyncOperator(this.semaphore).getFuture(() -> this.ops.getTestEventOperator().updateStatus(this.instanceUuid, id, success, this.writeAttrs));
        CompletableFuture result2 = new AsyncOperator(this.semaphore).getFuture(() -> this.ops.getTimeTestEventOperator().updateStatus(this.instanceUuid, ld, lt, id, success, this.writeAttrs));
        CompletableFuture result3 = parentId != null ? new AsyncOperator(this.semaphore).getFuture(() -> this.ops.getTestEventChildrenOperator().updateStatus(this.instanceUuid, parentId, ld, lt, id, success, this.writeAttrs)) : new AsyncOperator(this.semaphore).getFuture(() -> this.ops.getRootTestEventOperator().updateStatus(this.instanceUuid, ld, lt, id, success, this.writeAttrs));
        return CompletableFuture.allOf(result1, result2, result3);
    }

    protected CompletableFuture<Void> failEventAndParents(StoredTestEventId eventId) {
        return this.getTestEventAsync(eventId).thenComposeAsync(event -> {
            if (event == null || !event.isSuccess()) {
                return CompletableFuture.completedFuture(null);
            }
            CompletableFuture<Void> update = this.doUpdateEventStatusAsync((StoredTestEventWrapper)event, false);
            if (event.getParentId() != null) {
                return update.thenComposeAsync(u -> this.failEventAndParents(event.getParentId()));
            }
            return update;
        });
    }
}

