/*
 * Decompiled with CFR 0.152.
 */
package com.exactpro.cradle.cassandra;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.datastax.oss.driver.api.querybuilder.term.Term;
import com.exactpro.cradle.CradleObjectsFactory;
import com.exactpro.cradle.CradleStorage;
import com.exactpro.cradle.Direction;
import com.exactpro.cradle.TimeRelation;
import com.exactpro.cradle.cassandra.CassandraSemaphore;
import com.exactpro.cradle.cassandra.CassandraStorageSettings;
import com.exactpro.cradle.cassandra.TablesCreator;
import com.exactpro.cradle.cassandra.connection.CassandraConnection;
import com.exactpro.cradle.cassandra.connection.CassandraConnectionSettings;
import com.exactpro.cradle.cassandra.dao.AsyncOperator;
import com.exactpro.cradle.cassandra.dao.CassandraDataMapper;
import com.exactpro.cradle.cassandra.dao.CassandraDataMapperBuilder;
import com.exactpro.cradle.cassandra.dao.CassandraOperators;
import com.exactpro.cradle.cassandra.dao.intervals.CassandraIntervalsWorker;
import com.exactpro.cradle.cassandra.dao.intervals.IntervalSupplies;
import com.exactpro.cradle.cassandra.dao.messages.DetailedMessageBatchEntity;
import com.exactpro.cradle.cassandra.dao.messages.MessageBatchOperator;
import com.exactpro.cradle.cassandra.dao.messages.StreamEntity;
import com.exactpro.cradle.cassandra.dao.messages.TimeMessageEntity;
import com.exactpro.cradle.cassandra.dao.messages.TimeMessageOperator;
import com.exactpro.cradle.cassandra.dao.messages.converters.TimeMessageConverter;
import com.exactpro.cradle.cassandra.dao.testevents.ChildrenDatesEventEntity;
import com.exactpro.cradle.cassandra.dao.testevents.DateEventEntity;
import com.exactpro.cradle.cassandra.dao.testevents.DateTimeEventEntity;
import com.exactpro.cradle.cassandra.dao.testevents.DetailedTestEventEntity;
import com.exactpro.cradle.cassandra.dao.testevents.TestEventEntity;
import com.exactpro.cradle.cassandra.dao.testevents.TestEventMetadataEntity;
import com.exactpro.cradle.cassandra.dao.testevents.converters.DateEventEntityConverter;
import com.exactpro.cradle.cassandra.iterators.MessagesIteratorAdapter;
import com.exactpro.cradle.cassandra.iterators.PagedIterator;
import com.exactpro.cradle.cassandra.iterators.StoredMessageBatchAdapter;
import com.exactpro.cradle.cassandra.iterators.TestEventDataIteratorAdapter;
import com.exactpro.cradle.cassandra.iterators.TestEventMetadataIteratorAdapter;
import com.exactpro.cradle.cassandra.retries.FixedNumberRetryPolicy;
import com.exactpro.cradle.cassandra.retries.PageSizeAdjustingPolicy;
import com.exactpro.cradle.cassandra.retries.PagingSupplies;
import com.exactpro.cradle.cassandra.retries.SelectExecutionPolicy;
import com.exactpro.cradle.cassandra.retries.SelectQueryExecutor;
import com.exactpro.cradle.cassandra.utils.CassandraMessageUtils;
import com.exactpro.cradle.cassandra.utils.QueryExecutor;
import com.exactpro.cradle.intervals.IntervalsWorker;
import com.exactpro.cradle.messages.StoredMessage;
import com.exactpro.cradle.messages.StoredMessageBatch;
import com.exactpro.cradle.messages.StoredMessageFilter;
import com.exactpro.cradle.messages.StoredMessageId;
import com.exactpro.cradle.testevents.StoredTestEvent;
import com.exactpro.cradle.testevents.StoredTestEventId;
import com.exactpro.cradle.testevents.StoredTestEventMetadata;
import com.exactpro.cradle.testevents.StoredTestEventWrapper;
import com.exactpro.cradle.utils.CradleStorageException;
import com.exactpro.cradle.utils.MessageUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraCradleStorage
extends CradleStorage {
    public static final long EMPTY_MESSAGE_INDEX = -1L;
    private Logger logger = LoggerFactory.getLogger(CassandraCradleStorage.class);
    public static final ZoneOffset TIMEZONE_OFFSET = ZoneOffset.UTC;
    private final CassandraConnection connection;
    private final CassandraStorageSettings settings;
    private final CassandraSemaphore semaphore;
    private final CradleObjectsFactory objectsFactory;
    private CassandraOperators ops;
    private UUID instanceUuid;
    private Function<BoundStatementBuilder, BoundStatementBuilder> writeAttrs;
    private Function<BoundStatementBuilder, BoundStatementBuilder> readAttrs;
    private Function<BoundStatementBuilder, BoundStatementBuilder> strictReadAttrs;
    private int resultPageSize;
    private SelectExecutionPolicy multiRowResultExecPolicy;
    private SelectExecutionPolicy singleRowResultExecPolicy;
    private QueryExecutor exec;
    private SelectQueryExecutor selectExecutor;
    private PagingSupplies pagingSupplies;
    private IntervalsWorker intervalsWorker;

    public CassandraCradleStorage(CassandraConnection connection, CassandraStorageSettings settings) {
        CassandraConnectionSettings conSettings = connection.getSettings();
        this.connection = connection;
        this.settings = settings;
        this.semaphore = new CassandraSemaphore(conSettings.getMaxParallelQueries());
        this.objectsFactory = new CradleObjectsFactory(settings.getMaxMessageBatchSize(), settings.getMaxTestEventBatchSize());
        this.resultPageSize = conSettings.getResultPageSize();
        this.multiRowResultExecPolicy = conSettings.getSelectExecutionPolicy();
        if (this.multiRowResultExecPolicy == null) {
            this.multiRowResultExecPolicy = new PageSizeAdjustingPolicy(this.resultPageSize == 0 ? 5000 : this.resultPageSize, 2);
        }
        this.singleRowResultExecPolicy = conSettings.getSingleRowResultExecutionPolicy();
        if (this.singleRowResultExecPolicy == null) {
            this.singleRowResultExecPolicy = new FixedNumberRetryPolicy(5);
        }
    }

    public UUID getInstanceUuid() {
        return this.instanceUuid;
    }

    protected String doInit(String instanceName, boolean prepareStorage) throws CradleStorageException {
        this.logger.info("Connecting to Cassandra...");
        try {
            this.connection.start();
        }
        catch (Exception e) {
            throw new CradleStorageException("Could not open Cassandra connection", (Throwable)e);
        }
        try {
            CqlSession session = this.connection.getSession();
            this.exec = new QueryExecutor(session, this.settings.getTimeout(), this.settings.getWriteConsistencyLevel(), this.settings.getReadConsistencyLevel());
            this.selectExecutor = new SelectQueryExecutor(session, this.semaphore, this.multiRowResultExecPolicy, this.singleRowResultExecPolicy);
            this.pagingSupplies = new PagingSupplies(session, this.multiRowResultExecPolicy);
            if (prepareStorage) {
                this.logger.info("Creating/updating schema...");
                this.createTables();
                this.logger.info("All needed tables created");
            } else {
                this.logger.info("Schema creation/update skipped");
            }
            this.instanceUuid = this.getInstanceId(instanceName);
            CassandraDataMapper dataMapper = new CassandraDataMapperBuilder(session).build();
            this.ops = this.createOperators(dataMapper, this.settings);
            Duration timeout = Duration.ofMillis(this.settings.getTimeout());
            this.writeAttrs = builder -> (BoundStatementBuilder)((BoundStatementBuilder)builder.setConsistencyLevel(this.settings.getWriteConsistencyLevel())).setTimeout(timeout);
            this.readAttrs = builder -> (BoundStatementBuilder)((BoundStatementBuilder)((BoundStatementBuilder)builder.setConsistencyLevel(this.settings.getReadConsistencyLevel())).setTimeout(timeout)).setPageSize(this.resultPageSize);
            this.strictReadAttrs = builder -> (BoundStatementBuilder)((BoundStatementBuilder)((BoundStatementBuilder)builder.setConsistencyLevel(ConsistencyLevel.ALL)).setTimeout(timeout)).setPageSize(this.resultPageSize);
            IntervalSupplies intervalSupplies = new IntervalSupplies(this.ops.getIntervalOperator(), this.ops.getIntervalConverter(), this.pagingSupplies);
            this.intervalsWorker = new CassandraIntervalsWorker(this.semaphore, this.instanceUuid, this.writeAttrs, this.readAttrs, intervalSupplies);
            return this.instanceUuid.toString();
        }
        catch (IOException e) {
            throw new CradleStorageException("Could not initialize storage", (Throwable)e);
        }
    }

    protected void doDispose() {
        this.logger.info("Disconnecting from Cassandra...");
        try {
            this.connection.stop();
        }
        catch (Exception e) {
            this.logger.error("Error while closing Cassandra connection", (Throwable)e);
        }
    }

    protected void doStoreMessageBatch(StoredMessageBatch batch) throws IOException {
        try {
            this.doStoreMessageBatchAsync(batch).get();
        }
        catch (Exception e) {
            throw new IOException("Error while storing message batch " + batch.getId(), e);
        }
    }

    protected CompletableFuture<Void> doStoreMessageBatchAsync(StoredMessageBatch batch) {
        return this.writeMessage(batch, true);
    }

    protected void doStoreTimeMessage(StoredMessage message) throws IOException {
        try {
            this.doStoreTimeMessageAsync(message).get();
        }
        catch (Exception e) {
            throw new IOException("Error while storing time/message data for message " + message.getId(), e);
        }
    }

    protected CompletableFuture<Void> doStoreTimeMessageAsync(StoredMessage message) {
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> {
            TimeMessageEntity timeEntity = new TimeMessageEntity(message, this.instanceUuid);
            this.logger.trace("Executing time/message storing query for message {}", (Object)message.getId());
            return this.ops.getTimeMessageOperator().writeMessage(timeEntity, this.writeAttrs);
        });
        return future.thenAcceptAsync(e -> {});
    }

    protected void doStoreProcessedMessageBatch(StoredMessageBatch batch) throws IOException {
        try {
            this.doStoreProcessedMessageBatchAsync(batch).get();
        }
        catch (Exception e) {
            throw new IOException("Error while storing processed message batch " + batch.getId(), e);
        }
    }

    protected CompletableFuture<Void> doStoreProcessedMessageBatchAsync(StoredMessageBatch batch) {
        return this.writeMessage(batch, false);
    }

    protected void doStoreTestEvent(StoredTestEvent event) throws IOException {
        try {
            this.doStoreTestEventAsync(event).get();
        }
        catch (Exception e) {
            throw new IOException("Error while storing test event " + event.getId(), e);
        }
    }

    protected CompletableFuture<Void> doStoreTestEventAsync(StoredTestEvent event) {
        ArrayList futures = new ArrayList();
        try {
            futures.add(this.storeTimeEvent(new DetailedTestEventEntity(event, this.instanceUuid)));
            futures.add(this.storeDateTime(new DateTimeEventEntity(event, this.instanceUuid)));
            futures.add(this.storeChildrenDates(new ChildrenDatesEventEntity(event, this.instanceUuid)));
        }
        catch (IOException e) {
            CompletableFuture error = new CompletableFuture();
            error.completeExceptionally(e);
            futures.add(error);
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }

    protected void doUpdateParentTestEvents(StoredTestEvent event) throws IOException {
        if (event.isSuccess()) {
            return;
        }
        try {
            this.doUpdateParentTestEventsAsync(event).get();
        }
        catch (Exception e) {
            throw new IOException("Error while updating parents of " + event.getId() + " test event", e);
        }
    }

    protected CompletableFuture<Void> doUpdateParentTestEventsAsync(StoredTestEvent event) {
        if (event.isSuccess()) {
            return CompletableFuture.completedFuture(null);
        }
        return this.failEventAndParents(event.getParentId());
    }

    protected StoredMessage doGetMessage(StoredMessageId id) throws IOException {
        try {
            return this.doGetMessageAsync(id).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting message " + id, e);
        }
    }

    protected CompletableFuture<StoredMessage> doGetMessageAsync(StoredMessageId id) {
        return this.readMessage(id, true);
    }

    protected Collection<StoredMessage> doGetMessageBatch(StoredMessageId id) throws IOException {
        try {
            return this.doGetMessageBatchAsync(id).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting message batch " + id, e);
        }
    }

    protected CompletableFuture<Collection<StoredMessage>> doGetMessageBatchAsync(StoredMessageId id) {
        CompletableFuture<DetailedMessageBatchEntity> entityFuture = this.readMessageBatchEntity(id, true);
        return entityFuture.thenApplyAsync(entity -> {
            if (entity == null) {
                return null;
            }
            try {
                return MessageUtils.bytesToMessages((ByteBuffer)entity.getContent(), (boolean)entity.isCompressed());
            }
            catch (IOException e) {
                throw new CompletionException("Error while reading message batch", e);
            }
        });
    }

    protected StoredMessage doGetProcessedMessage(StoredMessageId id) throws IOException {
        try {
            return this.doGetProcessedMessageAsync(id).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting processed message " + id, e);
        }
    }

    protected CompletableFuture<StoredMessage> doGetProcessedMessageAsync(StoredMessageId id) {
        return this.readMessage(id, false);
    }

    protected long doGetFirstMessageIndex(String streamName, Direction direction) throws IOException {
        return this.getFirstIndex(this.ops.getMessageBatchOperator(), streamName, direction);
    }

    protected long doGetLastMessageIndex(String streamName, Direction direction) throws IOException {
        return this.getLastIndex(this.ops.getMessageBatchOperator(), streamName, direction);
    }

    protected long doGetFirstProcessedMessageIndex(String streamName, Direction direction) throws IOException {
        return this.getFirstIndex(this.ops.getProcessedMessageBatchOperator(), streamName, direction);
    }

    protected long doGetLastProcessedMessageIndex(String streamName, Direction direction) throws IOException {
        return this.getLastIndex(this.ops.getProcessedMessageBatchOperator(), streamName, direction);
    }

    protected StoredMessageId doGetNearestMessageId(String streamName, Direction direction, Instant timestamp, TimeRelation timeRelation) throws IOException {
        try {
            return this.doGetNearestMessageIdAsync(streamName, direction, timestamp, timeRelation).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting nearest message ID", e);
        }
    }

    protected CompletableFuture<StoredMessageId> doGetNearestMessageIdAsync(String streamName, Direction direction, Instant timestamp, TimeRelation timeRelation) {
        CompletableFuture<TimeMessageEntity> timeMessageEntityFuture = this.readTimeMessageEntity(streamName, direction, timestamp, timeRelation);
        return timeMessageEntityFuture.thenCompose(entity -> {
            if (entity == null) {
                return CompletableFuture.completedFuture(null);
            }
            return CompletableFuture.completedFuture(entity.createMessageId());
        });
    }

    private CompletableFuture<TimeMessageEntity> readTimeMessageEntity(String streamName, Direction direction, Instant timestamp, TimeRelation timeRelation) {
        LocalDateTime messageDateTime = LocalDateTime.ofInstant(timestamp, TIMEZONE_OFFSET);
        TimeMessageOperator tmOperator = this.ops.getTimeMessageOperator();
        TimeMessageConverter converter = this.ops.getTimeMessageConverter();
        return timeRelation == TimeRelation.BEFORE ? this.selectExecutor.executeSingleRowResultQuery(() -> tmOperator.getNearestMessageBefore(this.instanceUuid, streamName, messageDateTime.toLocalDate(), direction.getLabel(), messageDateTime.toLocalTime(), this.readAttrs), converter, "getting nearest message time before " + timestamp) : this.selectExecutor.executeSingleRowResultQuery(() -> tmOperator.getNearestMessageAfter(this.instanceUuid, streamName, messageDateTime.toLocalDate(), direction.getLabel(), messageDateTime.toLocalTime(), this.readAttrs), converter, "getting nearest message time after " + timestamp);
    }

    protected StoredTestEventWrapper doGetTestEvent(StoredTestEventId id) throws IOException {
        try {
            return this.doGetTestEventAsync(id).get();
        }
        catch (Exception e) {
            throw new IOException("Could not get test event", e);
        }
    }

    protected CompletableFuture<StoredTestEventWrapper> doGetTestEventAsync(StoredTestEventId id) {
        String dtQueryInfo = String.format("getting date/time for test event '%s'", id);
        CompletableFuture<DateTimeEventEntity> future = this.selectExecutor.executeSingleRowResultQuery(() -> this.ops.getTestEventOperator().get(this.instanceUuid, id.toString(), this.readAttrs), this.ops.getDateTimeEventEntityConverter(), dtQueryInfo);
        return future.thenCompose(dtEntity -> {
            if (dtEntity == null) {
                return CompletableFuture.completedFuture(null);
            }
            String steQueryInfo = String.format("getting full test event by id '%s'", id);
            return this.selectExecutor.executeSingleRowResultQuery(() -> this.ops.getTimeTestEventOperator().get(this.instanceUuid, dtEntity.getStartDate(), dtEntity.getStartTime(), id.toString(), this.readAttrs), this.ops.getDetailedTestEventConverter(), steQueryInfo).thenApply(entity -> {
                try {
                    return entity != null ? entity.toStoredTestEventWrapper(this.objectsFactory) : null;
                }
                catch (Exception error) {
                    throw new CompletionException("Error while converting data into test event", error);
                }
            });
        });
    }

    public IntervalsWorker getIntervalsWorker() {
        return this.intervalsWorker;
    }

    protected Iterable<StoredMessage> doGetMessages(StoredMessageFilter filter) throws IOException {
        try {
            return this.doGetMessagesAsync(filter).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting messages filtered by " + filter, e);
        }
    }

    protected CompletableFuture<Iterable<StoredMessage>> doGetMessagesAsync(StoredMessageFilter filter) {
        String queryInfo = "getting messages filtered by " + filter;
        return this.doGetDetailedMessageBatchEntities(filter, queryInfo).thenApply(it -> new MessagesIteratorAdapter(filter, (MappedAsyncPagingIterable<DetailedMessageBatchEntity>)it, this.pagingSupplies, this.ops.getMessageBatchConverter(), queryInfo));
    }

    protected Iterable<StoredMessageBatch> doGetMessagesBatches(StoredMessageFilter filter) throws IOException {
        try {
            return this.doGetMessagesBatchesAsync(filter).get();
        }
        catch (Exception e) {
            throw new IOException("Error while getting messages filtered by " + filter, e);
        }
    }

    protected CompletableFuture<Iterable<StoredMessageBatch>> doGetMessagesBatchesAsync(StoredMessageFilter filter) {
        String queryInfo = "getting message batches filtered by " + filter;
        return this.doGetDetailedMessageBatchEntities(filter, queryInfo).thenApply(it -> new StoredMessageBatchAdapter((MappedAsyncPagingIterable<DetailedMessageBatchEntity>)it, this.pagingSupplies, this.ops.getMessageBatchConverter(), queryInfo, filter == null ? 0 : filter.getLimit()));
    }

    private CompletableFuture<MappedAsyncPagingIterable<DetailedMessageBatchEntity>> doGetDetailedMessageBatchEntities(StoredMessageFilter filter, String queryInfo) {
        MessageBatchOperator mbOp = this.ops.getMessageBatchOperator();
        TimeMessageOperator tmOp = this.ops.getTimeMessageOperator();
        return this.selectExecutor.executeMultiRowResultQuery(() -> mbOp.filterMessages(this.instanceUuid, filter, mbOp, tmOp, this.readAttrs), this.ops.getMessageBatchConverter(), queryInfo);
    }

    protected Iterable<StoredTestEventWrapper> doGetRootTestEvents(Instant from, Instant to) throws CradleStorageException, IOException {
        try {
            return this.doGetRootTestEventsAsync(from, to).get();
        }
        catch (CradleStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Error while getting root test events", e);
        }
    }

    protected Iterable<StoredTestEventMetadata> doGetRootTestEventsMetadata(Instant from, Instant to) throws CradleStorageException, IOException {
        try {
            return this.doGetRootTestEventsMetadataAsync(from, to).get();
        }
        catch (CradleStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Error while getting root test events' metadata", e);
        }
    }

    protected CompletableFuture<Iterable<StoredTestEventWrapper>> doGetRootTestEventsAsync(Instant from, Instant to) throws CradleStorageException {
        return this.doGetTestEventsAsync(null, from, to);
    }

    protected CompletableFuture<Iterable<StoredTestEventMetadata>> doGetRootTestEventsMetadataAsync(Instant from, Instant to) throws CradleStorageException {
        return this.doGetTestEventsMetadataAsync(null, from, to);
    }

    protected Iterable<StoredTestEventWrapper> doGetTestEvents(StoredTestEventId parentId, Instant from, Instant to) throws CradleStorageException, IOException {
        try {
            return this.doGetTestEventsAsync(parentId, from, to).get();
        }
        catch (CradleStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Error while getting child test events", e);
        }
    }

    protected Iterable<StoredTestEventMetadata> doGetTestEventsMetadata(StoredTestEventId parentId, Instant from, Instant to) throws CradleStorageException, IOException {
        try {
            return this.doGetTestEventsMetadataAsync(parentId, from, to).get();
        }
        catch (CradleStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Error while getting child test events' metadata", e);
        }
    }

    protected CompletableFuture<Iterable<StoredTestEventWrapper>> doGetTestEventsAsync(StoredTestEventId parentId, Instant from, Instant to) throws CradleStorageException {
        TestEventsQueryParams params = new TestEventsQueryParams(parentId, from, to);
        String queryInfo = String.format("get %s from range %s..%s", parentId == null ? "root" : "child test events of '" + parentId + "'", from, to);
        return this.selectExecutor.executeMultiRowResultQuery(() -> this.ops.getTimeTestEventOperator().getTestEvents(this.instanceUuid, params.getParentId(), params.getFromDate(), params.getFromTime(), params.getToTime(), this.readAttrs), this.ops.getTestEventConverter(), queryInfo).thenApply(r -> new TestEventDataIteratorAdapter((MappedAsyncPagingIterable<TestEventEntity>)r, this.objectsFactory, this.pagingSupplies, this.ops.getTestEventConverter(), queryInfo));
    }

    protected CompletableFuture<Iterable<StoredTestEventMetadata>> doGetTestEventsMetadataAsync(StoredTestEventId parentId, Instant from, Instant to) throws CradleStorageException {
        TestEventsQueryParams params = new TestEventsQueryParams(parentId, from, to);
        String queryInfo = String.format("get %s from range %s..%s", parentId == null ? "root" : "child test events' metadata of '" + parentId + "'", from, to);
        return this.selectExecutor.executeMultiRowResultQuery(() -> this.ops.getTimeTestEventOperator().getTestEventsMetadata(this.instanceUuid, params.getParentId(), params.getFromDate(), params.getFromTime(), params.getToTime(), this.readAttrs), this.ops.getTestEventMetadataConverter(), queryInfo).thenApply(r -> new TestEventMetadataIteratorAdapter((MappedAsyncPagingIterable<TestEventMetadataEntity>)r, this.pagingSupplies, this.ops.getTestEventMetadataConverter(), queryInfo));
    }

    protected Iterable<StoredTestEventWrapper> doGetTestEvents(Instant from, Instant to) throws CradleStorageException, IOException {
        try {
            return this.doGetTestEventsAsync(from, to).get();
        }
        catch (CradleStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Error while getting test events", e);
        }
    }

    protected Iterable<StoredTestEventMetadata> doGetTestEventsMetadata(Instant from, Instant to) throws CradleStorageException, IOException {
        try {
            return this.doGetTestEventsMetadataAsync(from, to).get();
        }
        catch (CradleStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Error while getting test events' metadata", e);
        }
    }

    protected CompletableFuture<Iterable<StoredTestEventWrapper>> doGetTestEventsAsync(Instant from, Instant to) throws CradleStorageException {
        TestEventsQueryParams params = new TestEventsQueryParams(from, to);
        String queryInfo = String.format("get test events from range %s..%s", from, to);
        return this.selectExecutor.executeMultiRowResultQuery(() -> this.ops.getTimeTestEventOperator().getTestEvents(this.instanceUuid, params.getFromDate(), params.getFromTime(), params.getToTime(), this.readAttrs), this.ops.getTestEventConverter(), queryInfo).thenApply(entity -> new TestEventDataIteratorAdapter((MappedAsyncPagingIterable<TestEventEntity>)entity, this.objectsFactory, this.pagingSupplies, this.ops.getTestEventConverter(), queryInfo));
    }

    protected CompletableFuture<Iterable<StoredTestEventMetadata>> doGetTestEventsMetadataAsync(Instant from, Instant to) throws CradleStorageException {
        TestEventsQueryParams params = new TestEventsQueryParams(from, to);
        String queryInfo = String.format("get test events' metadata from range %s..%s", from, to);
        return this.selectExecutor.executeMultiRowResultQuery(() -> this.ops.getTimeTestEventOperator().getTestEventsMetadata(this.instanceUuid, params.getFromDate(), params.getFromTime(), params.getToTime(), this.readAttrs), this.ops.getTestEventMetadataConverter(), queryInfo).thenApply(entity -> new TestEventMetadataIteratorAdapter((MappedAsyncPagingIterable<TestEventMetadataEntity>)entity, this.pagingSupplies, this.ops.getTestEventMetadataConverter(), queryInfo));
    }

    protected Collection<String> doGetStreams() {
        ArrayList<String> result = new ArrayList<String>();
        for (StreamEntity entity : this.ops.getMessageBatchOperator().getStreams(this.readAttrs)) {
            if (!this.instanceUuid.equals(entity.getInstanceId())) continue;
            result.add(entity.getStreamName());
        }
        result.sort(null);
        return result;
    }

    protected Collection<Instant> doGetRootTestEventsDates() throws IOException {
        return this.doGetTestEventsDates(new StoredTestEventId(""));
    }

    protected Collection<Instant> doGetTestEventsDates(StoredTestEventId parentId) throws IOException {
        DateEventEntityConverter converter = this.ops.getDateEventEntityConverter();
        String queryInfo = String.format("getting %s event dates", "".equals(parentId.toString()) ? "root" : "children of " + parentId);
        CompletionStage future = this.selectExecutor.executeMultiRowResultQuery(() -> this.ops.getTestEventChildrenDatesOperator().get(this.instanceUuid, parentId.toString(), this.readAttrs), converter, queryInfo).thenApply(rows -> new PagedIterator<DateEventEntity>((MappedAsyncPagingIterable<DateEventEntity>)rows, this.pagingSupplies, converter, queryInfo));
        try {
            TreeSet<Instant> result = new TreeSet<Instant>();
            ((PagedIterator)((CompletableFuture)future).get()).forEachRemaining(entity -> result.add(entity.getStartDate().atStartOfDay(TIMEZONE_OFFSET).toInstant()));
            return result;
        }
        catch (InterruptedException | ExecutionException e) {
            throw new IOException("Error occurred while " + queryInfo, e);
        }
    }

    public CradleObjectsFactory getObjectsFactory() {
        return this.objectsFactory;
    }

    protected void createTables() throws IOException {
        new TablesCreator(this.exec, this.settings).createAll();
    }

    protected CassandraOperators createOperators(CassandraDataMapper dataMapper, CassandraStorageSettings settings) {
        return new CassandraOperators(dataMapper, settings);
    }

    protected CassandraStorageSettings getSettings() {
        return this.settings;
    }

    public Function<BoundStatementBuilder, BoundStatementBuilder> getWriteAttrs() {
        return this.writeAttrs;
    }

    public Function<BoundStatementBuilder, BoundStatementBuilder> getReadAttrs() {
        return this.readAttrs;
    }

    public Function<BoundStatementBuilder, BoundStatementBuilder> getStrictReadAttrs() {
        return this.strictReadAttrs;
    }

    protected UUID getInstanceId(String instanceName) throws IOException {
        UUID id;
        Select selectFrom = (Select)QueryBuilder.selectFrom((String)this.settings.getKeyspace(), (String)"instances").column("id").whereColumn("name").isEqualTo((Term)QueryBuilder.literal((Object)instanceName));
        Row resultRow = (Row)this.exec.executeQuery(selectFrom.asCql(), false).one();
        if (resultRow != null) {
            id = (UUID)resultRow.get("id", GenericType.UUID);
        } else {
            id = UUID.randomUUID();
            Insert insert = QueryBuilder.insertInto((String)this.settings.getKeyspace(), (String)"instances").value("id", (Term)QueryBuilder.literal((Object)id)).value("name", (Term)QueryBuilder.literal((Object)instanceName)).ifNotExists();
            this.exec.executeQuery(insert.asCql(), true);
        }
        return id;
    }

    protected QueryExecutor getQueryExecutor() {
        return this.exec;
    }

    protected CassandraSemaphore getSemaphore() {
        return this.semaphore;
    }

    private CompletableFuture<Void> writeMessage(StoredMessageBatch batch, boolean rawMessage) {
        CompletableFuture future = new AsyncOperator(this.semaphore).getFuture(() -> {
            DetailedMessageBatchEntity entity;
            try {
                entity = new DetailedMessageBatchEntity(batch, this.instanceUuid);
            }
            catch (IOException e) {
                CompletableFuture error = new CompletableFuture();
                error.completeExceptionally(e);
                return error;
            }
            this.logger.trace("Executing message batch storing query");
            MessageBatchOperator op = rawMessage ? this.ops.getMessageBatchOperator() : this.ops.getProcessedMessageBatchOperator();
            return op.writeMessageBatch(entity, this.writeAttrs);
        });
        return future.thenAcceptAsync(e -> {});
    }

    private CompletableFuture<DetailedMessageBatchEntity> readMessageBatchEntity(StoredMessageId messageId, boolean rawMessage) {
        MessageBatchOperator op = rawMessage ? this.ops.getMessageBatchOperator() : this.ops.getProcessedMessageBatchOperator();
        return this.selectExecutor.executeSingleRowResultQuery(() -> CassandraMessageUtils.getMessageBatch(messageId, op, this.instanceUuid, this.readAttrs), this.ops.getMessageBatchConverter(), "getting message batch by id " + messageId);
    }

    private CompletableFuture<StoredMessage> readMessage(StoredMessageId id, boolean rawMessage) {
        CompletableFuture<DetailedMessageBatchEntity> entityFuture = this.readMessageBatchEntity(id, rawMessage);
        return entityFuture.thenApply(entity -> {
            if (entity == null) {
                return null;
            }
            try {
                return MessageUtils.bytesToOneMessage((ByteBuffer)entity.getContent(), (boolean)entity.isCompressed(), (StoredMessageId)id);
            }
            catch (IOException e) {
                throw new CompletionException("Error while reading message", e);
            }
        });
    }

    private long getFirstIndex(MessageBatchOperator op, String streamName, Direction direction) throws IOException {
        String queryInfo = String.format("getting first message for stream '%s' and direction '%s'", streamName, direction);
        CompletableFuture<Row> future = this.selectExecutor.executeSingleRowResultQuery(() -> op.getFirstIndex(this.instanceUuid, streamName, direction.getLabel(), this.readAttrs), r -> r, queryInfo);
        try {
            Row row = future.get();
            return row == null ? -1L : row.getLong("message_index");
        }
        catch (Exception e) {
            throw new IOException("Error occurred while " + queryInfo, e);
        }
    }

    private long getLastIndex(MessageBatchOperator op, String streamName, Direction direction) throws IOException {
        String queryInfo = String.format("getting last message for stream '%s' and direction '%s'", streamName, direction);
        CompletableFuture<Row> future = this.selectExecutor.executeSingleRowResultQuery(() -> op.getLastIndex(this.instanceUuid, streamName, direction.getLabel(), this.readAttrs), r -> r, queryInfo);
        try {
            Row row = future.get();
            return row == null ? -1L : row.getLong("last_message_index");
        }
        catch (Exception e) {
            throw new IOException("Error occurred while " + queryInfo, e);
        }
    }

    protected CompletableFuture<Void> storeDateTime(DateTimeEventEntity entity) {
        return new AsyncOperator(this.semaphore).getFuture(() -> {
            this.logger.trace("Executing test event storing query");
            return this.ops.getTestEventOperator().write(entity, this.writeAttrs);
        });
    }

    protected CompletableFuture<Void> storeChildrenDates(ChildrenDatesEventEntity entity) {
        return new AsyncOperator(this.semaphore).getFuture(() -> {
            this.logger.trace("Executing date/child event storing query");
            return this.ops.getTestEventChildrenDatesOperator().writeTestEventDate(entity, this.writeAttrs);
        });
    }

    protected CompletableFuture<Void> storeTimeEvent(DetailedTestEventEntity entity) {
        return new AsyncOperator(this.semaphore).getFuture(() -> {
            this.logger.trace("Executing time/event storing query");
            return this.ops.getTimeTestEventOperator().writeTestEvent(entity, this.writeAttrs);
        });
    }

    protected void doUpdateEventStatus(StoredTestEventWrapper event, boolean success) throws IOException {
        try {
            this.doUpdateEventStatusAsync(event, success).get();
        }
        catch (Exception e) {
            throw new IOException("Error while updating status of event " + event.getId(), e);
        }
    }

    protected CompletableFuture<Void> doUpdateEventStatusAsync(StoredTestEventWrapper event, boolean success) {
        String id = event.getId().toString();
        LocalDateTime ldt = LocalDateTime.ofInstant(event.getStartTimestamp(), TIMEZONE_OFFSET);
        LocalDate ld = ldt.toLocalDate();
        LocalTime lt = ldt.toLocalTime();
        return new AsyncOperator(this.semaphore).getFuture(() -> this.ops.getTimeTestEventOperator().updateStatus(this.instanceUuid, ld, lt, id, success, this.writeAttrs));
    }

    protected CompletableFuture<Void> failEventAndParents(StoredTestEventId eventId) {
        return this.getTestEventAsync(eventId).thenComposeAsync(event -> {
            if (event == null || !event.isSuccess()) {
                return CompletableFuture.completedFuture(null);
            }
            CompletableFuture<Void> update = this.doUpdateEventStatusAsync((StoredTestEventWrapper)event, false);
            if (event.getParentId() != null) {
                return update.thenComposeAsync(u -> this.failEventAndParents(event.getParentId()));
            }
            return update;
        });
    }

    private static class TestEventsQueryParams {
        private final LocalDateTime fromDateTime;
        private final LocalDateTime toDateTime;
        private final String parentId;

        public TestEventsQueryParams(StoredTestEventId parentId, Instant from, Instant to) throws CradleStorageException {
            this.fromDateTime = LocalDateTime.ofInstant(from, TIMEZONE_OFFSET);
            this.toDateTime = LocalDateTime.ofInstant(to, TIMEZONE_OFFSET);
            this.parentId = parentId == null ? "" : parentId.toString();
            this.checkTimeBoundaries(this.fromDateTime, this.toDateTime, from, to);
        }

        public TestEventsQueryParams(Instant from, Instant to) throws CradleStorageException {
            this(null, from, to);
        }

        private void checkTimeBoundaries(LocalDateTime fromDateTime, LocalDateTime toDateTime, Instant originalFrom, Instant originalTo) throws CradleStorageException {
            LocalDate toDate;
            LocalDate fromDate = fromDateTime.toLocalDate();
            if (!fromDate.equals(toDate = toDateTime.toLocalDate())) {
                throw new CradleStorageException("Left and right boundaries should be of the same date, but got '" + originalFrom + "' and '" + originalTo + "'");
            }
        }

        public String getParentId() {
            return this.parentId;
        }

        public LocalDate getFromDate() {
            return this.fromDateTime.toLocalDate();
        }

        public LocalTime getFromTime() {
            return this.fromDateTime.toLocalTime();
        }

        public LocalTime getToTime() {
            return this.toDateTime.toLocalTime();
        }
    }
}

