/*
 * Decompiled with CFR 0.152.
 */
package com.feedzai.commons.sql.abstraction.batch.impl;

import com.feedzai.commons.sql.abstraction.batch.AbstractPdbBatch;
import com.feedzai.commons.sql.abstraction.batch.BatchEntry;
import com.feedzai.commons.sql.abstraction.batch.PdbBatch;
import com.feedzai.commons.sql.abstraction.batch.impl.MultithreadedBatchConfig;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine;
import com.feedzai.commons.sql.abstraction.listeners.BatchListener;
import com.feedzai.commons.sql.abstraction.listeners.MetricsListener;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.inject.Inject;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

public class MultithreadedBatch
extends AbstractPdbBatch
implements PdbBatch {
    private static final Logger logger = LoggerFactory.getLogger(MultithreadedBatch.class);
    private final Logger confidentialLogger;
    private static final Marker DEV = MarkerFactory.getMarker((String)"DEV");
    private static final int SALT = 100;
    private final Map<Long, DatabaseEngine> dbEnginesMap = new ConcurrentHashMap<Long, DatabaseEngine>();
    private final Supplier<DatabaseEngine> dbEngineSupplier;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService flusher;
    private final long maxAwaitTimeShutdownMs;
    protected final int batchSize;
    protected final long batchTimeoutMs;
    protected volatile long lastFlush;
    protected BlockingQueue<BatchEntry> buffer;
    protected String name;
    protected final BatchListener batchListener;
    private final MetricsListener metricsListener;
    protected final int maxFlushRetries;
    protected final long flushRetryDelayMs;
    private final Set<CompletableFuture<Void>> pendingFlushFutures = ConcurrentHashMap.newKeySet();

    @Inject
    public MultithreadedBatch(DatabaseEngine dbEngine, MultithreadedBatchConfig batchConfig) {
        Objects.requireNonNull(dbEngine, "dbEngine can't be null.");
        Objects.requireNonNull(batchConfig, "batchConfig can't be null.");
        int numberOfThreads = batchConfig.getNumberOfThreads();
        logger.info("Running MultithreadedBatch with {} threads.", (Object)numberOfThreads);
        Properties properties = new Properties();
        properties.setProperty("pdb.schema_policy", "none");
        this.dbEngineSupplier = () -> {
            try {
                return dbEngine.duplicate(properties, true);
            }
            catch (Exception e) {
                throw new IllegalStateException(e);
            }
        };
        this.batchSize = batchConfig.getBatchSize();
        this.buffer = new LinkedBlockingQueue<BatchEntry>(this.batchSize);
        this.batchTimeoutMs = batchConfig.getBatchTimeout().toMillis();
        this.lastFlush = System.currentTimeMillis();
        this.name = batchConfig.getName();
        this.maxAwaitTimeShutdownMs = Optional.ofNullable(batchConfig.getMaxAwaitTimeShutdown()).map(Duration::toMillis).orElse(dbEngine.getProperties().getMaximumAwaitTimeBatchShutdown());
        this.batchListener = batchConfig.getBatchListener();
        this.metricsListener = batchConfig.getMetricsListener();
        this.maxFlushRetries = batchConfig.getMaxFlushRetries();
        this.flushRetryDelayMs = batchConfig.getFlushRetryDelay().toMillis();
        this.confidentialLogger = batchConfig.getConfidentialLogger().orElse(logger);
        this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("MultiThreadedBatch-scheduler-" + this.name + "-%d").setUncaughtExceptionHandler((thread, throwable) -> logger.error("Uncaught exception in scheduler worker thread.", throwable)).build());
        this.flusher = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(batchConfig.getExecutorCapacity()), new ThreadFactoryBuilder().setNameFormat("MultiThreadedBatch-" + this.name + "-%d").setUncaughtExceptionHandler((thread, throwable) -> logger.error("Uncaught exception in flusher worker thread.", throwable)).build());
        this.scheduler.scheduleAtFixedRate(this.periodicFlushTask(), 0L, this.batchTimeoutMs + 100L, TimeUnit.MILLISECONDS);
        logger.info("{} - MultithreadedBatch started", (Object)this.name);
    }

    @Override
    public void close() {
        logger.info("{} - MultithreadedBatch closing", (Object)this.name);
        long remainingTimeout = this.maxAwaitTimeShutdownMs;
        long start = System.currentTimeMillis();
        this.orderlyShutdownExecutor(this.scheduler, remainingTimeout);
        remainingTimeout = Math.max(this.maxAwaitTimeShutdownMs - (System.currentTimeMillis() - start), 1L);
        try {
            this.flushAsync().get(remainingTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception exception) {
            // empty catch block
        }
        remainingTimeout = Math.max(this.maxAwaitTimeShutdownMs - (System.currentTimeMillis() - start), 1L);
        this.orderlyShutdownExecutor(this.flusher, remainingTimeout);
        try {
            this.metricsListener.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        logger.trace("Closing internal database connections");
        this.dbEnginesMap.values().forEach(DatabaseEngine::close);
    }

    private void orderlyShutdownExecutor(ExecutorService executor, long shutdownTimeout) {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
                logger.warn("Could not terminate batch within {}. Forcing shutdown.", (Object)DurationFormatUtils.formatDurationWords((long)shutdownTimeout, (boolean)true, (boolean)true));
                executor.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("Interrupted while waiting.", (Throwable)e);
        }
    }

    @Override
    public void add(BatchEntry batchEntry) throws InterruptedException {
        this.buffer.put(batchEntry);
        this.metricsListener.onEntryAdded();
        if (this.buffer.size() == this.batchSize) {
            this.flushAsync();
        }
    }

    @Override
    public void flush() throws ExecutionException, InterruptedException {
        this.flushAsync();
        CompletableFuture.allOf(this.pendingFlushFutures.toArray(new CompletableFuture[0])).get();
    }

    @Override
    public CompletableFuture<Void> flushAsync() {
        long flushTriggeredMs;
        this.metricsListener.onFlushTriggered();
        this.lastFlush = flushTriggeredMs = System.currentTimeMillis();
        if (this.buffer.isEmpty()) {
            this.onFlushFinished(flushTriggeredMs, Collections.emptyList(), Collections.emptyList());
            logger.trace("[{}] Batch empty, not flushing", (Object)this.name);
            return CompletableFuture.completedFuture(null);
        }
        LinkedList<BatchEntry> temp = new LinkedList<BatchEntry>();
        this.buffer.drainTo(temp);
        if (temp.isEmpty()) {
            this.onFlushFinished(flushTriggeredMs, Collections.emptyList(), Collections.emptyList());
            logger.trace("[{}] Batch empty, not flushing", (Object)this.name);
            return CompletableFuture.completedFuture(null);
        }
        try {
            CompletableFuture<Void> flushAsyncFuture = CompletableFuture.runAsync(() -> this.flush(flushTriggeredMs, temp), this.flusher);
            if (!flushAsyncFuture.isDone()) {
                this.pendingFlushFutures.add(flushAsyncFuture);
                flushAsyncFuture.whenComplete((unused, throwable) -> this.pendingFlushFutures.remove(flushAsyncFuture));
            }
            return flushAsyncFuture;
        }
        catch (RejectedExecutionException e) {
            logger.trace("[{}] Rejected execution while flushing batch", (Object)this.name);
            this.onFlushFinished(flushTriggeredMs, Collections.emptyList(), temp);
            CompletableFuture<Void> flushFuture = new CompletableFuture<Void>();
            flushFuture.completeExceptionally(e);
            return flushFuture;
        }
    }

    @Override
    public void flushUpsert() {
        logger.error("Flush ignoring not available for MultithreadedBatch.");
        throw new UnsupportedOperationException("Flushing pending batches upserting duplicated entries is not implemented using multiple threads/connections.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush(long flushTriggeredMs, List<BatchEntry> batchEntries) {
        block24: {
            DatabaseEngine de = this.dbEnginesMap.computeIfAbsent(Thread.currentThread().getId(), ignored -> this.dbEngineSupplier.get());
            this.metricsListener.onFlushStarted(flushTriggeredMs, batchEntries.size());
            try {
                this.processBatch(de, batchEntries);
                this.onFlushFinished(flushTriggeredMs, batchEntries, Collections.emptyList());
                logger.trace("[{}] Batch flushed. Took {} ms, {} rows.", new Object[]{this.name, System.currentTimeMillis() - flushTriggeredMs, batchEntries.size()});
            }
            catch (Exception e) {
                String msg;
                int retryCount;
                if (this.maxFlushRetries > 0) {
                    String msg2 = "[{}] Error occurred while flushing. Retrying.";
                    this.confidentialLogger.warn(DEV, "[{}] Error occurred while flushing. Retrying.", (Object)this.name, (Object)e);
                    if (this.confidentialLogger != logger) {
                        logger.warn(DEV, "[{}] Error occurred while flushing. Retrying.", (Object)this.name);
                    }
                }
                boolean success = false;
                for (retryCount = 0; retryCount < this.maxFlushRetries && !success; ++retryCount) {
                    try {
                        Thread.sleep(this.flushRetryDelayMs);
                        if (de.checkConnection() && de.isTransactionActive()) {
                            de.rollback();
                        }
                        this.processBatch(de, batchEntries);
                        success = true;
                        continue;
                    }
                    catch (InterruptedException ex) {
                        logger.debug("Interrupted while trying to flush batch. Stopping retries.");
                        Thread.currentThread().interrupt();
                        break;
                    }
                    catch (Exception ex) {
                        msg = "[{}] Error occurred while flushing (retry attempt {}).";
                        this.confidentialLogger.warn(DEV, "[{}] Error occurred while flushing (retry attempt {}).", new Object[]{this.name, retryCount + 1, ex});
                        if (this.confidentialLogger == logger) continue;
                        logger.warn(DEV, "[{}] Error occurred while flushing (retry attempt {}).", (Object)this.name, (Object)(retryCount + 1));
                    }
                }
                if (!success) {
                    block25: {
                        try {
                            if (de.isTransactionActive()) {
                                de.rollback();
                            }
                        }
                        catch (Exception ee) {
                            ee.addSuppressed(e);
                            msg = "[{}] Batch failed to check the flush transaction state";
                            this.confidentialLogger.trace("[{}] Batch failed to check the flush transaction state", (Object)this.name, (Object)ee);
                            if (this.confidentialLogger == logger) break block25;
                            logger.trace("[{}] Batch failed to check the flush transaction state", (Object)this.name);
                        }
                    }
                    this.onFlushFinished(flushTriggeredMs, Collections.emptyList(), batchEntries);
                    String msg3 = "[{}] Error occurred while flushing. Aborting batch flush.";
                    this.confidentialLogger.error(DEV, "[{}] Error occurred while flushing. Aborting batch flush.", (Object)this.name, (Object)e);
                    if (this.confidentialLogger != logger) {
                        logger.error(DEV, "[{}] Error occurred while flushing. Aborting batch flush.", (Object)this.name);
                    }
                    break block24;
                }
                this.onFlushFinished(flushTriggeredMs, batchEntries, Collections.emptyList());
                logger.trace("[{}] Batch flushed. Took {} ms, {} retries, {} rows.", new Object[]{this.name, System.currentTimeMillis() - flushTriggeredMs, retryCount, batchEntries.size()});
            }
            finally {
                block26: {
                    try {
                        if (de.isTransactionActive()) {
                            de.rollback();
                        }
                    }
                    catch (Exception e) {
                        String msg = "[{}] Batch failed to check the flush transaction state";
                        this.confidentialLogger.trace("[{}] Batch failed to check the flush transaction state", (Object)this.name, (Object)e);
                        if (this.confidentialLogger == logger) break block26;
                        logger.trace("[{}] Batch failed to check the flush transaction state", (Object)this.name);
                    }
                }
            }
        }
    }

    private void onFlushFinished(long flushTriggeredMs, List<BatchEntry> successfulEntries, List<BatchEntry> failedEntries) {
        long elapsed = System.currentTimeMillis() - flushTriggeredMs;
        this.metricsListener.onFlushFinished(elapsed, successfulEntries.size(), failedEntries.size());
        if (!failedEntries.isEmpty()) {
            this.batchListener.onFailure(failedEntries.toArray(new BatchEntry[0]));
        }
        if (!successfulEntries.isEmpty()) {
            this.batchListener.onSuccess(successfulEntries.toArray(new BatchEntry[0]));
        }
    }

    private Runnable periodicFlushTask() {
        return () -> {
            try {
                if (System.currentTimeMillis() - this.lastFlush >= this.batchTimeoutMs) {
                    logger.trace("[{}] Flush timeout occurred", (Object)this.name);
                    this.flushAsync();
                }
            }
            catch (Exception e) {
                logger.error("[{}] Error during timeout-initiated flush", (Object)this.name, (Object)e);
            }
        };
    }
}

