/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer.processor.infinispan;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerChangeRecordEmitter;
import io.debezium.connector.oracle.logminer.LogMinerQueryBuilder;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.TransactionCache;
import io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer;
import io.debezium.connector.oracle.logminer.processor.infinispan.InfinispanTransaction;
import io.debezium.connector.oracle.logminer.processor.infinispan.InfinispanTransactionCache;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.SingleFileStoreConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.util.function.SerializablePredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfinispanLogMinerEventProcessor
extends AbstractLogMinerEventProcessor<InfinispanTransaction> {
    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanLogMinerEventProcessor.class);
    private final OracleConnection jdbcConnection;
    private final OracleStreamingChangeEventSourceMetrics metrics;
    private final OraclePartition partition;
    private final OracleOffsetContext offsetContext;
    private final EventDispatcher<TableId> dispatcher;
    private final ChangeEventSource.ChangeEventSourceContext context;
    private final InfinispanTransactionCache transactionCache;
    private final Cache<String, LogMinerEvent> eventCache;
    private final Cache<String, String> recentlyCommittedTransactionsCache;
    private final Cache<String, String> rollbackTransactionsCache;
    private final Cache<String, String> schemaChangesCache;
    private Scn currentOffsetScn = Scn.NULL;
    private Scn currentOffsetCommitScn = Scn.NULL;
    private Scn lastCommittedScn = Scn.NULL;
    private Scn maxCommittedScn = Scn.NULL;

    public InfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, EventDispatcher<TableId> dispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, OracleStreamingChangeEventSourceMetrics metrics) {
        super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics);
        this.jdbcConnection = jdbcConnection;
        this.metrics = metrics;
        this.partition = partition;
        this.offsetContext = offsetContext;
        this.dispatcher = dispatcher;
        this.context = context;
        DefaultCacheManager manager = new DefaultCacheManager();
        this.transactionCache = new InfinispanTransactionCache(this.createCache((EmbeddedCacheManager)manager, connectorConfig, "transactions"));
        this.eventCache = this.createCache((EmbeddedCacheManager)manager, connectorConfig, "events");
        this.recentlyCommittedTransactionsCache = this.createCache((EmbeddedCacheManager)manager, connectorConfig, "committed-transactions");
        this.rollbackTransactionsCache = this.createCache((EmbeddedCacheManager)manager, connectorConfig, "rollback-transactions");
        this.schemaChangesCache = this.createCache((EmbeddedCacheManager)manager, connectorConfig, "schema-changes");
        this.displayCacheStatistics();
    }

    private void displayCacheStatistics() {
        LOGGER.info("Cache Statistics:");
        LOGGER.info("\tTransactions   : {}", (Object)this.transactionCache.size());
        LOGGER.info("\tCommitted Trxs : {}", (Object)this.recentlyCommittedTransactionsCache.size());
        LOGGER.info("\tRollback Trxs  : {}", (Object)this.rollbackTransactionsCache.size());
        LOGGER.info("\tSchema Changes : {}", (Object)this.schemaChangesCache.size());
        LOGGER.info("\tEvents         : {}", (Object)this.eventCache.size());
        if (!this.eventCache.isEmpty()) {
            for (String eventKey : this.eventCache.keySet()) {
                LOGGER.debug("\t\tFound Key: {}", (Object)eventKey);
            }
        }
    }

    private <K, V> Cache<K, V> createCache(EmbeddedCacheManager manager, OracleConnectorConfig connectorConfig, String name) {
        Configuration config = ((SingleFileStoreConfigurationBuilder)((SingleFileStoreConfigurationBuilder)((SingleFileStoreConfigurationBuilder)((SingleFileStoreConfigurationBuilder)((SingleFileStoreConfigurationBuilder)new ConfigurationBuilder().persistence().passivation(false).addSingleFileStore().segmented(false)).preload(true)).shared(false)).fetchPersistentState(true)).ignoreModifications(false)).location(connectorConfig.getLogMiningBufferLocation()).build();
        manager.defineConfiguration(name, config);
        return manager.getCache(name);
    }

    @Override
    protected TransactionCache<InfinispanTransaction, ?> getTransactionCache() {
        return this.transactionCache;
    }

    @Override
    protected InfinispanTransaction createTransaction(LogMinerEventRow row) {
        return new InfinispanTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime());
    }

    @Override
    protected void removeEventWithRowId(LogMinerEventRow row) {
        for (String eventKey : (List)this.eventCache.keySet().stream().filter((SerializablePredicate & Serializable)k -> k.startsWith(row.getTransactionId() + "-")).collect(Collectors.toList())) {
            LogMinerEvent event = (LogMinerEvent)this.eventCache.get((Object)eventKey);
            if (event == null || !event.getRowId().equals(row.getRowId())) continue;
            LOGGER.trace("Undo applied for event {}.", (Object)event);
            this.eventCache.remove((Object)eventKey);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.getConfig().isLogMiningBufferDropOnStop()) {
            this.eventCache.clear();
            this.transactionCache.clear();
            this.recentlyCommittedTransactionsCache.clear();
            this.rollbackTransactionsCache.clear();
            this.schemaChangesCache.clear();
        }
        this.recentlyCommittedTransactionsCache.stop();
        this.rollbackTransactionsCache.stop();
        this.schemaChangesCache.stop();
        this.eventCache.stop();
        this.transactionCache.close();
    }

    @Override
    public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedException {
        this.counters.reset();
        try (PreparedStatement statement = this.createQueryStatement();){
            Scn scn;
            block14: {
                LOGGER.debug("Fetching results for SCN [{}, {}]", (Object)startScn, (Object)endScn);
                statement.setFetchSize(this.getConfig().getMaxQueueSize());
                statement.setFetchDirection(1000);
                statement.setString(1, startScn.toString());
                statement.setString(2, endScn.toString());
                Instant queryStart = Instant.now();
                ResultSet resultSet = statement.executeQuery();
                try {
                    this.metrics.setLastDurationOfBatchCapturing(Duration.between(queryStart, Instant.now()));
                    Instant startProcessTime = Instant.now();
                    this.processResults(resultSet);
                    Duration totalTime = Duration.between(startProcessTime, Instant.now());
                    this.metrics.setLastCapturedDmlCount(this.counters.dmlCount);
                    this.metrics.setLastDurationOfBatchCapturing(totalTime);
                    if (this.counters.dmlCount > 0 || this.counters.commitCount > 0 || this.counters.rollbackCount > 0) {
                        this.warnPotentiallyStuckScn(this.currentOffsetScn, this.currentOffsetCommitScn);
                        this.currentOffsetScn = this.offsetContext.getScn();
                        if (this.offsetContext.getCommitScn() != null) {
                            this.currentOffsetCommitScn = this.offsetContext.getCommitScn();
                        }
                    }
                    LOGGER.debug("{}.", (Object)this.counters);
                    LOGGER.debug("Processed in {} ms. Log: {}. Offset SCN: {}, Offset Commit SCN: {}, Active Transactions: {}, Sleep: {}", new Object[]{totalTime.toMillis(), this.metrics.getLagFromSourceInMilliseconds(), this.offsetContext.getScn(), this.offsetContext.getCommitScn(), this.metrics.getNumberOfActiveTransactions(), this.metrics.getMillisecondToSleepBetweenMiningQuery()});
                    this.metrics.addProcessedRows(this.counters.rows);
                    scn = this.calculateNewStartScn(endScn);
                    if (resultSet == null) break block14;
                }
                catch (Throwable throwable) {
                    if (resultSet != null) {
                        try {
                            resultSet.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                resultSet.close();
            }
            return scn;
        }
    }

    @Override
    protected void processRow(LogMinerEventRow row) throws SQLException, InterruptedException {
        String transactionId = row.getTransactionId();
        if (this.recentlyCommittedTransactionsCache.containsKey((Object)transactionId)) {
            LOGGER.trace("Transaction {} has been seen by connector, skipped.", (Object)transactionId);
            return;
        }
        super.processRow(row);
    }

    @Override
    public void abandonTransactions(Duration retention) {
    }

    @Override
    protected boolean isTransactionIdAllowed(String transactionId) {
        if (this.rollbackTransactionsCache.containsKey((Object)transactionId)) {
            LOGGER.warn("Event for transaction {} skipped as transaction is marked for rollback.", (Object)transactionId);
            return false;
        }
        if (this.recentlyCommittedTransactionsCache.containsKey((Object)transactionId)) {
            LOGGER.warn("Event for transaction {} skipped as transaction was recently committed.", (Object)transactionId);
            return false;
        }
        return true;
    }

    @Override
    protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
        return this.schemaChangesCache.containsKey((Object)row.getScn().toString());
    }

    @Override
    protected void handleCommit(LogMinerEventRow row) throws InterruptedException {
        final String transactionId = row.getTransactionId();
        if (this.recentlyCommittedTransactionsCache.containsKey((Object)transactionId)) {
            return;
        }
        final InfinispanTransaction transaction = this.transactionCache.remove(transactionId);
        if (transaction == null) {
            LOGGER.trace("Transaction {} not found.", (Object)transactionId);
            return;
        }
        final Scn smallestScn = this.transactionCache.getMinimumScn();
        this.metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn);
        final Scn commitScn = row.getScn();
        Scn offsetCommitScn = this.offsetContext.getCommitScn();
        if (offsetCommitScn != null && offsetCommitScn.compareTo(commitScn) > 0 || this.lastCommittedScn.compareTo(commitScn) > 0) {
            LOGGER.debug("Transaction {} has already been processed. Commit SCN in offset is {} while commit SCN of transaction is {} and last seen committed SCN is {}.", new Object[]{transactionId, offsetCommitScn, commitScn, this.lastCommittedScn});
            this.transactionCache.remove(transactionId);
            this.metrics.setActiveTransactions(this.transactionCache.size());
            this.removeEventsWithTransaction(transaction);
            return;
        }
        ++this.counters.commitCount;
        Instant start = Instant.now();
        int numEvents = this.getTransactionEventCount(transaction);
        LOGGER.trace("Commit: (smallest SCN {}) {}", (Object)smallestScn, (Object)row);
        LOGGER.trace("Transaction {} has {} events", (Object)transactionId, (Object)numEvents);
        BlockingConsumer<LogMinerEvent> delegate = new BlockingConsumer<LogMinerEvent>(){
            private int numEvents;
            {
                this.numEvents = InfinispanLogMinerEventProcessor.this.getTransactionEventCount(transaction);
            }

            public void accept(LogMinerEvent event) throws InterruptedException {
                if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) {
                    InfinispanLogMinerEventProcessor.this.offsetContext.setScn(event.getScn());
                    InfinispanLogMinerEventProcessor.this.metrics.setOldestScn(event.getScn());
                }
                InfinispanLogMinerEventProcessor.this.offsetContext.setTransactionId(transactionId);
                InfinispanLogMinerEventProcessor.this.offsetContext.setSourceTime(event.getChangeTime());
                InfinispanLogMinerEventProcessor.this.offsetContext.setTableId(event.getTableId());
                if (--this.numEvents == 0) {
                    InfinispanLogMinerEventProcessor.this.offsetContext.setCommitScn(commitScn);
                }
                DmlEvent dmlEvent = (DmlEvent)event;
                InfinispanLogMinerEventProcessor.this.dispatcher.dispatchDataChangeEvent((DataCollectionId)event.getTableId(), (ChangeRecordEmitter)new LogMinerChangeRecordEmitter(InfinispanLogMinerEventProcessor.this.partition, InfinispanLogMinerEventProcessor.this.offsetContext, dmlEvent.getEventType(), dmlEvent.getDmlEntry().getOldValues(), dmlEvent.getDmlEntry().getNewValues(), InfinispanLogMinerEventProcessor.this.getSchema().tableFor(event.getTableId()), Clock.system()));
            }
        };
        int eventCount = 0;
        try (TransactionCommitConsumer commitConsumer = new TransactionCommitConsumer(delegate, this.getConfig(), this.getSchema());){
            for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
                if (!this.context.isRunning()) {
                    return;
                }
                LogMinerEvent event = (LogMinerEvent)this.eventCache.get((Object)transaction.getEventId(i));
                if (event == null) continue;
                ++eventCount;
                LOGGER.trace("Dispatching event {} {}", (Object)transaction.getEventId(i), (Object)event.getEventType());
                commitConsumer.accept(event);
            }
        }
        this.lastCommittedScn = Scn.valueOf(commitScn.longValue());
        if (transaction.getNumberOfEvents() > 0) {
            this.dispatcher.dispatchTransactionCommittedEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
        } else {
            this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
        }
        this.metrics.calculateLagMetrics(row.getChangeTime());
        if (this.lastCommittedScn.compareTo(this.maxCommittedScn) > 0) {
            this.maxCommittedScn = this.lastCommittedScn;
        }
        this.recentlyCommittedTransactionsCache.put((Object)transactionId, (Object)commitScn.toString());
        this.removeEventsWithTransaction(transaction);
        this.metrics.incrementCommittedTransactions();
        this.metrics.setActiveTransactions(this.transactionCache.size());
        this.metrics.incrementCommittedDmlCount(eventCount);
        this.metrics.setCommittedScn(commitScn);
        this.metrics.setOffsetScn(this.offsetContext.getScn());
        this.metrics.setLastCommitDuration(Duration.between(start, Instant.now()));
    }

    @Override
    protected void handleRollback(LogMinerEventRow row) {
        InfinispanTransaction transaction = this.transactionCache.get(row.getTransactionId());
        if (transaction != null) {
            this.removeEventsWithTransaction(transaction);
            this.transactionCache.remove(row.getTransactionId());
            this.rollbackTransactionsCache.put((Object)row.getTransactionId(), (Object)row.getScn().toString());
            this.metrics.setActiveTransactions(this.transactionCache.size());
            this.metrics.incrementRolledBackTransactions();
            this.metrics.addRolledBackTransactionId(row.getTransactionId());
            ++this.counters.rollbackCount;
        }
    }

    @Override
    protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
        super.handleSchemaChange(row);
        if (row.getTableName() != null) {
            this.schemaChangesCache.put((Object)row.getScn().toString(), (Object)row.getTableId().identifier());
        }
    }

    @Override
    protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
        if (this.isTransactionIdAllowed(transactionId)) {
            String eventKey;
            InfinispanTransaction transaction = this.getTransactionCache().get(transactionId);
            if (transaction == null) {
                LOGGER.trace("Transaction {} is not in cache, creating.", (Object)transactionId);
                transaction = this.createTransaction(row);
            }
            if (!this.eventCache.containsKey((Object)(eventKey = transaction.getEventId(transaction.getNextEventId())))) {
                LOGGER.trace("Transaction {}, adding event reference at key {}", (Object)transactionId, (Object)eventKey);
                this.eventCache.put((Object)eventKey, (Object)eventSupplier.get());
                this.metrics.calculateLagMetrics(row.getChangeTime());
            }
            this.getTransactionCache().put(transactionId, transaction);
            this.metrics.setActiveTransactions(this.getTransactionCache().size());
        }
    }

    @Override
    protected int getTransactionEventCount(InfinispanTransaction transaction) {
        return (int)this.eventCache.keySet().parallelStream().filter((SerializablePredicate & Serializable)k -> k.startsWith(transaction.getTransactionId() + "-")).count();
    }

    private PreparedStatement createQueryStatement() throws SQLException {
        String query = LogMinerQueryBuilder.build(this.getConfig(), this.getSchema());
        return this.jdbcConnection.connection().prepareStatement(query, 1003, 1007, 1);
    }

    private Scn calculateNewStartScn(Scn endScn) throws InterruptedException {
        Scn minCacheScn = this.transactionCache.getMinimumScn();
        if (!minCacheScn.isNull()) {
            this.recentlyCommittedTransactionsCache.entrySet().removeIf(entry -> Scn.valueOf((String)entry.getValue()).compareTo(minCacheScn) < 0);
            this.rollbackTransactionsCache.entrySet().removeIf(entry -> Scn.valueOf((String)entry.getValue()).compareTo(minCacheScn) < 0);
            this.schemaChangesCache.entrySet().removeIf(entry -> Scn.valueOf((String)entry.getKey()).compareTo(minCacheScn) < 0);
        } else {
            this.recentlyCommittedTransactionsCache.clear();
            this.rollbackTransactionsCache.clear();
            this.schemaChangesCache.clear();
        }
        if (this.getConfig().isLobEnabled()) {
            if (this.transactionCache.isEmpty() && !this.maxCommittedScn.isNull()) {
                this.offsetContext.setScn(this.maxCommittedScn);
                this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
            } else if (!minCacheScn.isNull()) {
                this.offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1)));
                this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
            }
            return this.offsetContext.getScn();
        }
        if (!this.getLastProcessedScn().isNull() && this.getLastProcessedScn().compareTo(endScn) < 0) {
            endScn = this.getLastProcessedScn();
        }
        this.offsetContext.setScn(endScn);
        this.metrics.setOldestScn(endScn);
        this.metrics.setOffsetScn(endScn);
        this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
        return endScn;
    }

    private void removeEventsWithTransaction(InfinispanTransaction transaction) {
        for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
            this.eventCache.remove((Object)transaction.getEventId(i));
        }
    }
}

