/*
 * Decompiled with CFR 0.152.
 */
package nosql.batch.update.aerospike.wal;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import nosql.batch.update.BatchUpdate;
import nosql.batch.update.aerospike.lock.AerospikeBatchLocks;
import nosql.batch.update.aerospike.wal.AerospikeBatchUpdateSerde;
import nosql.batch.update.wal.WalRecord;
import nosql.batch.update.wal.WalTimeRange;
import nosql.batch.update.wal.WriteAheadLogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AerospikeWriteAheadLogManager<LOCKS extends AerospikeBatchLocks<EV>, UPDATES, EV>
implements WriteAheadLogManager<LOCKS, UPDATES, Value> {
    private static final Logger logger = LoggerFactory.getLogger(AerospikeWriteAheadLogManager.class);
    private static final String UUID_BIN_NAME = "uuid";
    private static final String TIMESTAMP_BIN_NAME = "timestamp";
    private final IAerospikeClient client;
    private final String walNamespace;
    private final String walSetName;
    private final WritePolicy writePolicy;
    private final WritePolicy deletePolicy;
    private final AerospikeBatchUpdateSerde<LOCKS, UPDATES, EV> batchSerializer;
    private final Clock clock;

    public AerospikeWriteAheadLogManager(IAerospikeClient client, String walNamespace, String walSetName, AerospikeBatchUpdateSerde<LOCKS, UPDATES, EV> batchSerializer, Clock clock) {
        this.client = client;
        this.walNamespace = walNamespace;
        this.walSetName = walSetName;
        this.deletePolicy = this.writePolicy = this.configureWritePolicy(client.getWritePolicyDefault());
        this.batchSerializer = batchSerializer;
        this.clock = clock;
        this.createSecondaryIndexOnTimestamp();
    }

    private WritePolicy configureWritePolicy(WritePolicy writePolicyDefault) {
        WritePolicy writePolicy = new WritePolicy(writePolicyDefault);
        writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
        writePolicy.sendKey = true;
        writePolicy.expiration = -1;
        return writePolicy;
    }

    public Value writeBatch(BatchUpdate<LOCKS, UPDATES> batch) {
        Value batchId = AerospikeWriteAheadLogManager.generateBatchId();
        List<Bin> batchBins = this.batchSerializer.write(batch);
        ArrayList<Bin> bins = new ArrayList<Bin>(batchBins.size() + 1);
        bins.addAll(batchBins);
        bins.add(new Bin(UUID_BIN_NAME, batchId));
        bins.add(new Bin(TIMESTAMP_BIN_NAME, Value.get((long)this.clock.millis())));
        try {
            this.client.put(this.writePolicy, new Key(this.walNamespace, this.walSetName, batchId), bins.toArray(new Bin[0]));
            return batchId;
        }
        catch (AerospikeException ae) {
            if (ae.getResultCode() == 13) {
                logger.error("update data size to big: {}", (Object)batchBins.stream().mapToInt(bin -> bin.value.estimateSize()).sum());
            }
            throw ae;
        }
    }

    public static Value generateBatchId() {
        return Value.get((byte[])AerospikeWriteAheadLogManager.getBytesFromUUID(UUID.randomUUID()));
    }

    public boolean deleteBatch(Value batchId) {
        return this.client.delete(this.deletePolicy, new Key(this.walNamespace, this.walSetName, batchId));
    }

    public List<WalTimeRange> getTimeRanges(Duration staleThreshold, int batchSize) {
        Statement statement = AerospikeWriteAheadLogManager.staleBatchesStatement(staleThreshold, this.walNamespace, this.walSetName, this.clock);
        RecordSet recordSet = this.client.query(null, statement);
        ArrayList<Long> timestamps = new ArrayList<Long>();
        recordSet.iterator().forEachRemaining(keyRecord -> timestamps.add(keyRecord.record.getLong(TIMESTAMP_BIN_NAME)));
        Collections.sort(timestamps);
        return AerospikeWriteAheadLogManager.getTimeRangesForTimestamps(timestamps, batchSize);
    }

    public List<WalRecord<LOCKS, UPDATES, Value>> getStaleBatchesForRange(WalTimeRange timeRange) {
        Statement statement = AerospikeWriteAheadLogManager.staleBatchesStatement(this.walNamespace, this.walSetName, timeRange.getFromTimestamp(), timeRange.getToTimestamp());
        RecordSet recordSet = this.client.query(null, statement);
        ArrayList<WalRecord<LOCKS, UPDATES, Value>> staleTransactions = new ArrayList<WalRecord<LOCKS, UPDATES, Value>>();
        recordSet.iterator().forEachRemaining(keyRecord -> {
            Record record = keyRecord.record;
            staleTransactions.add(new WalRecord((Object)Value.get((Object)record.getValue(UUID_BIN_NAME)), record.getLong(TIMESTAMP_BIN_NAME), this.batchSerializer.read(record.bins)));
        });
        Collections.sort(staleTransactions);
        return staleTransactions;
    }

    public static Statement staleBatchesStatement(Duration staleThreshold, String walNamespace, String walSetName, Clock clock) {
        Statement statement = new Statement();
        statement.setNamespace(walNamespace);
        statement.setSetName(walSetName);
        statement.setFilter(Filter.range((String)TIMESTAMP_BIN_NAME, (long)0L, (long)Math.max(clock.millis() - staleThreshold.toMillis(), 0L), (CTX[])new CTX[0]));
        return statement;
    }

    public static Statement staleBatchesStatement(String walNamespace, String walSetName, long begin, long end) {
        Statement statement = new Statement();
        statement.setNamespace(walNamespace);
        statement.setSetName(walSetName);
        statement.setFilter(Filter.range((String)TIMESTAMP_BIN_NAME, (long)begin, (long)end, (CTX[])new CTX[0]));
        return statement;
    }

    public static List<WalTimeRange> getTimeRangesForTimestamps(List<Long> timestamps, int batchSize) {
        ArrayList<WalTimeRange> walTimeRanges = new ArrayList<WalTimeRange>();
        int fromIdx = 0;
        int size = timestamps.size();
        int toIdx = Math.min(batchSize, size) - 1;
        while (fromIdx < size) {
            long fromTimestamp = timestamps.get(fromIdx);
            long toTimestamp = timestamps.get(toIdx);
            walTimeRanges.add(new WalTimeRange(fromTimestamp, toTimestamp));
            for (fromIdx = toIdx; fromIdx < size && timestamps.get(fromIdx) == toTimestamp; ++fromIdx) {
            }
            toIdx = Math.min(fromIdx + batchSize, size) - 1;
        }
        return walTimeRanges;
    }

    static byte[] getBytesFromUUID(UUID uuid) {
        ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
        bb.putLong(uuid.getMostSignificantBits());
        bb.putLong(uuid.getLeastSignificantBits());
        return bb.array();
    }

    private void createSecondaryIndexOnTimestamp() {
        try {
            String indexName = this.walSetName + "_timestamp";
            this.client.createIndex(null, this.walNamespace, this.walSetName, indexName, TIMESTAMP_BIN_NAME, IndexType.NUMERIC).waitTillComplete(200, 0);
        }
        catch (AerospikeException ae) {
            if (ae.getResultCode() == 200) {
                logger.info("Will not create WAL secondary index as it already exists");
            }
            throw ae;
        }
    }

    public String getWalNamespace() {
        return this.walNamespace;
    }

    public String getWalSetName() {
        return this.walSetName;
    }

    public IAerospikeClient getClient() {
        return this.client;
    }
}

