/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.janusgraph.aerospike.operations;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Value;
import com.aerospike.client.policy.ClientPolicy;
import com.playtika.janusgraph.aerospike.AerospikePolicyProvider;
import com.playtika.janusgraph.aerospike.ConfigOptions;
import com.playtika.janusgraph.aerospike.operations.AerospikeOperations;
import com.playtika.janusgraph.aerospike.operations.BasicMutateOperations;
import com.playtika.janusgraph.aerospike.operations.BasicScanOperations;
import com.playtika.janusgraph.aerospike.operations.IdsCleanupOperations;
import com.playtika.janusgraph.aerospike.operations.MutateOperations;
import com.playtika.janusgraph.aerospike.operations.Operations;
import com.playtika.janusgraph.aerospike.operations.ReadOperations;
import com.playtika.janusgraph.aerospike.operations.ScanOperations;
import com.playtika.janusgraph.aerospike.operations.UnsupportedScanOperations;
import com.playtika.janusgraph.aerospike.operations.batch.BatchLocks;
import com.playtika.janusgraph.aerospike.operations.batch.BatchOperationsUtil;
import com.playtika.janusgraph.aerospike.operations.batch.BatchUpdates;
import com.playtika.janusgraph.aerospike.operations.batch.WalOperations;
import com.playtika.janusgraph.aerospike.util.NamedThreadFactory;
import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import nosql.batch.update.BatchOperations;
import nosql.batch.update.BatchUpdater;
import nosql.batch.update.aerospike.lock.AerospikeLock;
import nosql.batch.update.aerospike.wal.AerospikeExclusiveLocker;
import nosql.batch.update.wal.ExclusiveLocker;
import nosql.batch.update.wal.WriteAheadLogCompleter;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.util.time.TimestampProvider;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BasicOperations
implements Operations {
    private static final Logger logger = LoggerFactory.getLogger(BasicOperations.class);
    public static final String JANUS_GROUP_NAME = "janus";
    public static final String AEROSPIKE_PREFIX = "aerospike";
    public static final String BATCH_PREFIX = "batch";
    public static final String IDS_CLEANUP_PREFIX = "ids-cleanup";
    private final AerospikeOperations aerospikeOperations;
    private final MutateOperations mutateOperations;
    private final BatchUpdater<BatchLocks, BatchUpdates, AerospikeLock, Value> batchUpdater;
    private final WriteAheadLogCompleter<BatchLocks, BatchUpdates, AerospikeLock, Value> writeAheadLogCompleter;
    private final ReadOperations readOperations;
    private final ScanOperations scanOperations;
    private final IdsCleanupOperations idsCleanupOperations;

    public BasicOperations(Configuration configuration) {
        this.aerospikeOperations = this.buildAerospikeOperations(configuration);
        WalOperations walOperations = this.buildWalOperations(configuration, this.aerospikeOperations);
        this.mutateOperations = this.buildMutateOperations(this.aerospikeOperations);
        BatchOperations<BatchLocks, BatchUpdates, AerospikeLock, Value> batchOperations = this.buildBatchOperations(this.aerospikeOperations, walOperations, this.getClock(), this.aerospikeOperations.getAerospikeExecutor(), this.aerospikeOperations.getBatchExecutor());
        this.batchUpdater = new BatchUpdater(batchOperations);
        this.writeAheadLogCompleter = (Boolean)configuration.get(ConfigOptions.START_WAL_COMPLETER, new String[0]) != false ? this.buildWriteAheadLogCompleter(walOperations, batchOperations) : null;
        this.readOperations = this.buildReadOperations(configuration, this.aerospikeOperations);
        this.scanOperations = this.buildScanOperations(configuration, this.aerospikeOperations);
        this.idsCleanupOperations = new IdsCleanupOperations((String)configuration.get(GraphDatabaseConfiguration.IDS_STORE_NAME, new String[0]), this.readOperations, this.mutateOperations, (Long)configuration.get(ConfigOptions.IDS_BLOCK_TTL, new String[0]), (TimestampProvider)configuration.get(GraphDatabaseConfiguration.TIMESTAMP_PROVIDER, new String[0]), BasicOperations.executorService(1, IDS_CLEANUP_PREFIX));
    }

    protected BatchOperations<BatchLocks, BatchUpdates, AerospikeLock, Value> buildBatchOperations(AerospikeOperations aerospikeOperations, WalOperations walOperations, Clock clock, ExecutorService executorService, ExecutorService batchExecutorService) {
        return BatchOperationsUtil.batchOperations(aerospikeOperations, walOperations.getWalNamespace(), walOperations.getWalSetName(), clock, executorService, batchExecutorService);
    }

    @Override
    public AerospikeOperations getAerospikeOperations() {
        return this.aerospikeOperations;
    }

    @Override
    public BatchUpdater<BatchLocks, BatchUpdates, AerospikeLock, Value> batchUpdater() {
        return this.batchUpdater;
    }

    @Override
    public WriteAheadLogCompleter<BatchLocks, BatchUpdates, AerospikeLock, Value> getWriteAheadLogCompleter() {
        return this.writeAheadLogCompleter;
    }

    @Override
    public MutateOperations mutateOperations() {
        return this.mutateOperations;
    }

    @Override
    public ReadOperations getReadOperations() {
        return this.readOperations;
    }

    @Override
    public ScanOperations getScanOperations() {
        return this.scanOperations;
    }

    protected AerospikePolicyProvider buildPolicyProvider(Configuration configuration) {
        return new AerospikePolicyProvider(configuration);
    }

    protected AerospikeOperations buildAerospikeOperations(Configuration configuration) {
        String namespace = (String)configuration.get(ConfigOptions.NAMESPACE, new String[0]);
        String idsNamespace = (String)configuration.get(ConfigOptions.IDS_NAMESPACE, new String[0]);
        String graphPrefix = (String)configuration.get(ConfigOptions.GRAPH_PREFIX, new String[0]);
        AerospikePolicyProvider policyProvider = this.buildPolicyProvider(configuration);
        ClientPolicy clientPolicy = policyProvider.clientPolicy();
        IAerospikeClient client = AerospikeOperations.buildAerospikeClient(configuration, clientPolicy);
        this.waitForClientToConnect(client);
        return new AerospikeOperations(graphPrefix, namespace, idsNamespace, (String)configuration.get(GraphDatabaseConfiguration.IDS_STORE_NAME, new String[0]), client, policyProvider, BasicOperations.executorService((Integer)configuration.get(ConfigOptions.AEROSPIKE_EXECUTOR_MAX_THREADS, new String[0]), AEROSPIKE_PREFIX), BasicOperations.executorService(8, 8, BATCH_PREFIX));
    }

    private void waitForClientToConnect(IAerospikeClient client) {
        while (!client.isConnected()) {
            logger.debug("Waiting for client [{}] to connect to Aerospike cluster", (Object)client);
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException();
            }
        }
    }

    protected WalOperations buildWalOperations(Configuration configuration, AerospikeOperations aerospikeOperations) {
        return new WalOperations(configuration, aerospikeOperations);
    }

    protected Clock getClock() {
        return Clock.systemUTC();
    }

    protected MutateOperations buildMutateOperations(AerospikeOperations aerospikeOperations) {
        return new BasicMutateOperations(aerospikeOperations);
    }

    protected WriteAheadLogCompleter<BatchLocks, BatchUpdates, AerospikeLock, Value> buildWriteAheadLogCompleter(WalOperations walOperations, BatchOperations<BatchLocks, BatchUpdates, AerospikeLock, Value> batchOperations) {
        return new WriteAheadLogCompleter(batchOperations, Duration.ofMillis(walOperations.getStaleTransactionLifetimeThresholdInMs()), walOperations.getMaxBatchSize().intValue(), (ExclusiveLocker)new AerospikeExclusiveLocker(walOperations.getAerospikeOperations().getClient(), walOperations.getWalNamespace(), walOperations.getWalSetName()), Executors.newScheduledThreadPool(1));
    }

    protected ReadOperations buildReadOperations(Configuration configuration, AerospikeOperations aerospikeOperations) {
        Integer parallelReadThreshold = (Integer)configuration.get(ConfigOptions.PARALLEL_READ_THRESHOLD, new String[0]);
        return new ReadOperations(aerospikeOperations, parallelReadThreshold);
    }

    protected ScanOperations buildScanOperations(Configuration configuration, AerospikeOperations aerospikeOperations) {
        Integer scanParallelism = (Integer)configuration.get(ConfigOptions.SCAN_PARALLELISM, new String[0]);
        if (scanParallelism > 0) {
            return new BasicScanOperations(aerospikeOperations, new NamedThreadFactory(JANUS_GROUP_NAME, "scan"));
        }
        return new UnsupportedScanOperations();
    }

    @Override
    public void close() {
        if (this.writeAheadLogCompleter != null) {
            this.writeAheadLogCompleter.shutdown();
        }
        this.aerospikeOperations.close();
    }

    @Override
    public IdsCleanupOperations getIdsCleanupOperations(String storeName) {
        return this.idsCleanupOperations.getIdsStoreName().equals(storeName) ? this.idsCleanupOperations : null;
    }

    public static ExecutorService executorService(int maxThreads, String prefix) {
        return new ThreadPoolExecutor(0, maxThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(JANUS_GROUP_NAME, prefix));
    }

    public static ExecutorService executorService(int maxThreads, int queueCapacity, String prefix) {
        return new ThreadPoolExecutor(0, maxThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), new NamedThreadFactory(JANUS_GROUP_NAME, prefix), new ThreadPoolExecutor.CallerRunsPolicy());
    }
}

