/*
 * Decompiled with CFR 0.152.
 */
package org.jesterj.ingest.model.impl;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.cql.Statement;
import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.commons.codec.binary.Hex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import org.jesterj.ingest.model.DocDestinationStatus;
import org.jesterj.ingest.model.Document;
import org.jesterj.ingest.model.Router;
import org.jesterj.ingest.model.Scanner;
import org.jesterj.ingest.model.Status;
import org.jesterj.ingest.model.impl.DocumentImpl;
import org.jesterj.ingest.model.impl.FTIQueryContext;
import org.jesterj.ingest.model.impl.StepImpl;
import org.jesterj.ingest.persistence.Cassandra;
import org.jesterj.ingest.persistence.CassandraSupport;
import org.jesterj.ingest.routers.RouterBase;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public abstract class ScannerImpl
extends StepImpl
implements Scanner {
    public static final String SCAN_ORIGIN = "SCAN";
    public static final String FTI_ORIGIN = "FTI";
    private static final Logger log = LogManager.getLogger();
    public static final int DEF_MAX_ERROR_RETRY = Integer.getInteger("org.jesterj.scanner.max_error_retry", 3);
    public static final int TIMEOUT = 600;
    static final String FIND_STRANDED_DOCS = "find_stranded_docs";
    static final String FIND_ERROR_DOCS = "find_error_docs";
    static final String FIND_HISTORY = "find_error_history";
    public static final String NEW_CONTENT_FOUND_MSG = "New content found by {}.";
    public static final int DDL_TIMEOUT = 30;
    private boolean hashing;
    private long interval;
    boolean remembering;
    private int retryErrors = DEF_MAX_ERROR_RETRY;
    protected final AtomicInteger activeScans = new AtomicInteger(0);
    private final ExecutorService exec = new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue(), r -> {
        Thread scanner = new Thread(r);
        scanner.setName("jj-scan-" + this.getName() + "-" + System.nanoTime());
        scanner.setDaemon(true);
        return scanner;
    }){

        @Override
        @NotNull
        public Future<?> submit(@NotNull Runnable task) {
            Runnable originalTask = task;
            return super.submit(() -> {
                try {
                    ThreadContext.put((String)"JJ_PLAN_NAME", (String)ScannerImpl.this.getPlan().getName());
                    ThreadContext.put((String)"JJ_PLAN_VERSION", (String)String.valueOf(ScannerImpl.this.getPlan().getVersion()));
                    originalTask.run();
                }
                catch (Throwable t) {
                    t.printStackTrace();
                }
                finally {
                    ThreadContext.remove((String)"JJ_PLAN_NAME");
                    ThreadContext.remove((String)"JJ_PLAN_VERSION");
                }
            });
        }
    };
    private long nanoInterval;
    private CassandraSupport cassandra = new CassandraSupport();
    public static final String CREATE_FT_KEYSPACE = "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
    public static final String CREATE_FT_TABLE = "CREATE TABLE IF NOT EXISTS %s.jj_output_step_status (docId varchar, docHash varchar, parentId varchar, origParentId varchar, outputStepName varchar, status varchar, message varchar, antiCollision int, created timestamp, createdNanos int, PRIMARY KEY (docId, created,createdNanos,outputStepName,antiCollision)) WITH CLUSTERING ORDER BY (created DESC, createdNanos DESC);";
    public static final String CREATE_INDEX_STATUS = "CREATE INDEX IF NOT EXISTS jj_ft_idx_step_status ON %s.jj_output_step_status (status);";
    public static final String CREATE_DOC_HASH = "CREATE TABLE IF NOT EXISTS %s.jj_scanner_doc_hash (docId varchar, created timestamp, createdNanos int, antiCollision int, hashAlg varchar, docHash varchar, PRIMARY KEY ((docId),created,createdNanos,antiCollision)) WITH CLUSTERING ORDER BY (created DESC, createdNanos DESC);";
    static final String FIND_STRANDED_STATUS = "SELECT docid FROM %s.jj_output_step_status WHERE status = ? PER PARTITION LIMIT 1";
    static final String FIND_ERRORS = "SELECT docid, created FROM %s.jj_output_step_status WHERE status = 'ERROR'  PER PARTITION LIMIT 1";
    static final String FIND_HIST = "SELECT docid, status, created FROM %s.jj_output_step_status WHERE docid = ?  PER PARTITION LIMIT ?";
    private static final String FIND_LATEST_STATUS_Q = "find_latest_status_for_doc";
    static final String FIND_LATEST_STATUS = "SELECT docid, created, status FROM %s.jj_output_step_status WHERE docId = ? PER PARTITION LIMIT 1";
    static String FTI_CHECK_DOC_HASH_Q = "FTI_CHECK_Q";
    static String FTI_CHECK_DOC_HASH = "SELECT docHash from %s.jj_scanner_doc_hash WHERE docid = ? LIMIT 1";
    static String FTI_DOC_HASH_U = "FTI_DOC_HASH_Q";
    static String FTI_DOC_HASH = "INSERT into %s.jj_scanner_doc_hash (docId, created, createdNanos, antiCollision, hashAlg, docHash)VALUES(?,?,?,?,?,?) USING TTL ?";
    private volatile boolean shutdownHasStarted;
    private boolean persistenceCreated;
    private final Map<String, String> keySpaces = new ConcurrentHashMap<String, String>();

    protected ScannerImpl() {
    }

    @Override
    public void activate() {
        try {
            this.addStepContext();
            this.shutdownHasStarted = false;
            HashSet<String> sentAlready = new HashSet<String>();
            FTIQueryContext ctx = new FTIQueryContext(sentAlready);
            this.processPendingDocs(ctx, List.of(Status.FORCE, Status.RESTART, Status.PROCESSING, Status.BATCHED), true);
            this.processErrors(ctx);
            this.processPendingDocs(ctx, List.of(Status.DIRTY), false);
            this.superActivate();
        }
        finally {
            this.removeStepContext();
        }
    }

    void superActivate() {
        super.activate();
    }

    @Override
    public void deactivate() {
        this.shutdownHasStarted = true;
        super.deactivate();
    }

    @Override
    public void run() {
        this.nanoInterval = this.interval * 1000000L;
        Future<?> scanner = null;
        long last = System.nanoTime() - 1L;
        if (this.isActive()) {
            scanner = this.safeSubmit();
            last = System.nanoTime();
        }
        try {
            while (this.isActive()) {
                try {
                    boolean timeForNextScan = this.longerAgoThanInterval(last);
                    boolean scanning = this.isScanning();
                    long now = System.nanoTime();
                    log.trace("scanning:{} timeForNext:{} now:{}  - (last:{} + nanoInt:{}) = {}", (Object)scanning, (Object)timeForNextScan, (Object)now, (Object)last, (Object)this.nanoInterval, (Object)(now - (last + this.nanoInterval)));
                    if (!scanning && timeForNextScan) {
                        scanner = this.safeSubmit();
                        last = System.nanoTime();
                    } else {
                        log.trace("{}:Scan skipped, still scanning:{}; msSinceLast:{}", (Object)this.getName(), (Object)scanning, (Object)ScannerImpl.msSinceNanoTime(last));
                    }
                    Thread.sleep(25L);
                }
                catch (InterruptedException e) {
                    if (scanner != null) {
                        scanner.cancel(true);
                    }
                    log.error((Object)e);
                }
            }
        }
        catch (Throwable t) {
            log.error("Exited scanner due to throwable!", t);
            throw t;
        }
        finally {
            log.info("Exited {}", (Object)this.getName());
        }
        if (scanner != null) {
            scanner.cancel(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Future<?> safeSubmit() {
        Future<?> scanner = null;
        Instant now = Instant.now();
        long start = System.nanoTime();
        try {
            log.trace("Submitting scan for {} (Scan interval = {} ms)", (Object)this.getName(), (Object)this.getInterval());
            scanner = this.exec.submit(this.getScanOperation());
        }
        catch (Exception e) {
            log.error("Scan operation for {} failed.", (Object)this.getName());
            log.error((Object)e);
            e.printStackTrace();
        }
        finally {
            log.trace("Scan Submitted for {} (Scan interval = {} ms), started at {}, elapsed:{}", (Object)this.getName(), (Object)this.getInterval(), (Object)now, (Object)ScannerImpl.msSinceNanoTime(start));
        }
        return scanner;
    }

    private static long msSinceNanoTime(long start) {
        return System.nanoTime() - start / 1000000L;
    }

    boolean longerAgoThanInterval(long last) {
        return last + this.nanoInterval < System.nanoTime();
    }

    @Override
    public void sendToNext(Document doc) {
        this.superSendToNext(doc);
    }

    void superSendToNext(Document doc) {
        super.sendToNext(doc);
    }

    public boolean docFound(Document doc) {
        ((DocumentImpl)doc).stepStarted(this);
        String scannerName = this.getName();
        log.trace("{} found doc: {}", (Object)scannerName, (Object)doc.getId());
        doc.setStatus(Status.PROCESSING, "{} found doc:{}", new Serializable[]{scannerName, doc.getId()});
        String id = doc.getId();
        Function<String, String> idFunction = this.getIdFunction();
        String result = idFunction.apply(id);
        String idField = doc.getIdField();
        doc.removeAll(idField);
        doc.put(idField, result);
        boolean shouldIndex = doc.isForceReprocess();
        if (this.isRemembering() & !shouldIndex) {
            id = doc.getId();
            CqlSession session = this.getCassandra().getSession();
            Set<String> outputDestinationNames = this.getOutputDestinationNames();
            ArrayList<String> downstreamOutputSteps = new ArrayList<String>(outputDestinationNames);
            for (int i = 0; i < downstreamOutputSteps.size() && !shouldIndex; ++i) {
                Status status = doc.getStatus((String)downstreamOutputSteps.get(i));
                if (status == Status.FORCE || status == Status.RESTART) {
                    shouldIndex = true;
                    break;
                }
                shouldIndex = this.isHashing() ? this.isFreshContent(doc, scannerName, id, session) : !this.seenPreviously(scannerName, id, session);
            }
        } else {
            shouldIndex = true;
            log.trace("Not Remembering");
        }
        shouldIndex = shouldIndex || this.isHeuristicallyDirty(doc);
        log.trace("Memory complete");
        if (shouldIndex) {
            log.trace("Need to index {}", (Object)id);
            if (!this.isRemembering() || !doc.alreadyHasIncompleteStepList()) {
                ((DocumentImpl)doc).initDestinations(this.getOutputDestinationNames(), this.getName());
            }
            this.sendToNext(doc);
        } else {
            log.trace("Did not need to index {}", (Object)id);
        }
        return shouldIndex;
    }

    boolean seenPreviously(String scannerName, String id, CqlSession session) {
        String anyStep = this.getOutputDestinationNames().iterator().next();
        String keySpace = this.keySpace(anyStep);
        String actualQuery = String.format(FIND_LATEST_STATUS, keySpace);
        PreparedStatement seenDocQuery = this.getCassandra().getPreparedQuery("find_latest_status_for_doc_" + this.keySpace(anyStep), actualQuery);
        BoundStatement bs = seenDocQuery.bind(new Object[]{id});
        ResultSet lastStatus = session.execute((Statement)bs);
        if (lastStatus.getAvailableWithoutFetching() > 0) {
            log.trace("{} ignoring document previously seen {}", (Object)scannerName, (Object)id);
            return true;
        }
        return false;
    }

    boolean isFreshContent(Document doc, String scannerName, String id, CqlSession session) {
        String prevHash = this.findPreviousHash(doc, id, session);
        if (doc.getHash().equals(prevHash)) {
            log.trace("{} ignoring document with previously seen content {}", (Object)scannerName, (Object)id);
            return false;
        }
        this.updateHash(doc, session);
        return true;
    }

    private void updateHash(Document doc, CqlSession session) {
        String actualQuery = String.format(FTI_DOC_HASH, this.keySpace(null));
        PreparedStatement updateHash = this.getCassandra().getPreparedQuery(FTI_DOC_HASH_U + "_" + this.keySpace(null), actualQuery);
        BoundStatement bs = updateHash.bind(new Object[]{doc.getId(), Instant.now(), (int)(System.nanoTime() % 1000000L), CassandraSupport.antiCollision.get().nextInt(), doc.getHashAlg(), doc.getHash(), 7776000});
        session.execute((Statement)bs);
    }

    @Nullable
    private String findPreviousHash(Document doc, String id, CqlSession session) {
        log.trace("We are using hashing to detect new versions");
        String actualQuery = String.format(FTI_CHECK_DOC_HASH, this.keySpace(null));
        PreparedStatement preparedQuery = this.getCassandra().getPreparedQuery(FTI_CHECK_DOC_HASH_Q + "_" + this.keySpace(null), actualQuery);
        BoundStatement bind = preparedQuery.bind(new Object[]{id});
        ResultSet statusRs = session.execute((Statement)bind);
        Cassandra.printErrors(statusRs);
        String previousHash = null;
        if (statusRs.getAvailableWithoutFetching() > 0) {
            Row next = (Row)statusRs.all().iterator().next();
            previousHash = next.getString(0);
            log.trace("Found '{}' with hash {}, current hash is {}", (Object)id, (Object)previousHash, (Object)doc.getHash());
        }
        return previousHash;
    }

    protected void setInterval(long interval) {
        this.interval = interval;
    }

    @Override
    public boolean isHeuristicallyDirty(Document doc) {
        return false;
    }

    @Override
    public abstract ScanOp getScanOperation();

    protected void processPendingDocs(FTIQueryContext ftiQueryContext, List<Status> statusesToProcess, boolean force) {
        boolean activeAtStart = this.isActive();
        if (this.isShutdown()) {
            return;
        }
        this.ensurePersistence();
        Set<String> sentAlready = ftiQueryContext.getSentAlready();
        int i = 0;
        CassandraSupport cStar = this.getCassandra();
        CqlSession session = cStar.getSession();
        HashMap<String, Set> needToProcess = new HashMap<String, Set>();
        HashMap<String, LatestStatus> statusCheckCache = new HashMap<String, LatestStatus>();
        for (String string : this.getOutputDestinationNames()) {
            String keySpace = this.keySpace(string);
            block1: for (Status status : statusesToProcess) {
                String actualQuery = String.format(FIND_STRANDED_STATUS, keySpace);
                PreparedStatement pq = cStar.getPreparedQuery("find_stranded_docs_" + keySpace, actualQuery);
                BoundStatement bs = pq.bind(new Object[]{String.valueOf(status)});
                bs = (BoundStatement)bs.setTimeout(Duration.ofSeconds(600L));
                ResultSet rs = session.execute((Statement)bs);
                log.trace("found {} using {}", (Object)rs, (Object)actualQuery);
                for (Row r : rs) {
                    if (this.isShutdown() || !this.isActive() && activeAtStart) continue block1;
                    String id = r.getString(0);
                    LatestStatus latestStatus = this.findLatestSatus(actualQuery, id, string, statusCheckCache);
                    if (status.toString().equals(latestStatus.getStatus())) {
                        log.trace("{} found for reprocessing with status={}", (Object)id, (Object)status);
                        needToProcess.computeIfAbsent(id, docid -> new HashSet()).add(latestStatus);
                        continue;
                    }
                    log.trace("{} not processed for status of {}, latest status is {}", (Object)id, (Object)status, (Object)latestStatus);
                }
            }
        }
        for (Map.Entry entry : needToProcess.entrySet()) {
            this.process(force, sentAlready, entry, FTI_ORIGIN);
            ++i;
        }
        log.info("Found and restarted processing for {} FTI records", (Object)i);
    }

    void process(boolean force, Set<String> sentAlready, Map.Entry<String, Set<LatestStatus>> toProcess, String origination) {
        String docId = toProcess.getKey();
        if (sentAlready != null) {
            sentAlready.add(docId);
        }
        this.fetchById(docId, origination).ifPresentOrElse(d -> {
            d.setForceReprocess(force);
            Set statuses = (Set)toProcess.getValue();
            HashMap<String, DocDestinationStatus> downstream = new HashMap<String, DocDestinationStatus>();
            statuses.forEach((? super T status) -> downstream.put(status.getoutputStepName(), new DocDestinationStatus(Status.PROCESSING, status.getoutputStepName(), "Prior status:" + status.getoutputStepName() + ">" + status.getStatus() + "@" + status.getTimestamp(), new Serializable[0])));
            d.setIncompleteOutputDestinations(downstream);
            this.docFound((Document)d);
        }, () -> log.error("Unable to load previously scanned (stranded) document {}", (Object)docId));
    }

    LatestStatus findLatestSatus(String priorQuery, String docId, String outputStepName, Map<String, LatestStatus> cache) {
        LatestStatus latestStatus;
        if (cache.containsKey(docId + outputStepName)) {
            return cache.get(docId + outputStepName);
        }
        String histQuery = String.format(FIND_HIST, this.keySpace(outputStepName));
        PreparedStatement phq = this.getCassandra().getPreparedQuery("find_error_history_" + this.keySpace(outputStepName), histQuery);
        BoundStatement bhq = phq.bind(new Object[]{docId, 1});
        ResultSet histRS = this.getCassandra().getSession().execute((Statement)bhq);
        Row one = (Row)histRS.one();
        if (one == null) {
            log.error("{} appeared in {} but not in {}", (Object)docId, (Object)priorQuery, (Object)histQuery);
            latestStatus = new LatestStatus("NO PRIOR STATUS FOUND", Instant.now().toString(), outputStepName);
        } else {
            latestStatus = new LatestStatus(one.getString(1), String.valueOf(one.getInstant(2)), outputStepName);
        }
        cache.put(docId + outputStepName, latestStatus);
        return latestStatus;
    }

    void ensurePersistence() {
        if (!this.persistenceCreated) {
            CqlSession session = this.cassandra.getSession();
            for (String name : this.getOutputDestinationNames()) {
                this.executeWithTimoutSecs(session, CREATE_FT_KEYSPACE, name, 30);
                this.executeWithTimoutSecs(session, CREATE_FT_TABLE, name, 30);
                this.executeWithTimoutSecs(session, CREATE_INDEX_STATUS, name, 30);
            }
            this.executeWithTimoutSecs(session, CREATE_FT_KEYSPACE, null, 30);
            this.executeWithTimoutSecs(session, CREATE_DOC_HASH, null, 30);
            this.persistenceCreated = true;
        }
    }

    private void executeWithTimoutSecs(CqlSession session, String cqlTemplate, String destinationName, int seconds) {
        String format = String.format(cqlTemplate, this.keySpace(destinationName));
        SimpleStatement simpleStatement = ((SimpleStatementBuilder)SimpleStatement.builder((String)format).setTimeout(Duration.of(seconds, ChronoUnit.SECONDS))).build();
        session.execute((Statement)simpleStatement);
    }

    void processErrors(FTIQueryContext scanContext) {
        log.info("Checking for Errored docs");
        HashSet<DocumentImpl> deadDocs = new HashSet<DocumentImpl>();
        HashMap<String, Set> forceReprocess = new HashMap<String, Set>();
        block5: for (String string : this.getOutputDestinationNames()) {
            String actualQuery = String.format(FIND_ERRORS, this.keySpace(string));
            PreparedStatement pq = this.getCassandra().getPreparedQuery("find_error_docs_" + this.keySpace(string), actualQuery);
            BoundStatement bs = pq.bind(new Object[0]);
            bs = (BoundStatement)bs.setTimeout(Duration.ofSeconds(600L));
            ResultSet rs = this.getCassandra().getSession().execute((Statement)bs);
            for (Row r : rs) {
                if (!this.isActive()) continue block5;
                String id = r.getString(0);
                if (scanContext.getSentAlready().contains(id)) {
                    log.trace("Skipping error for document already submitted during this FTI processing round");
                    continue;
                }
                log.trace("Found Errored document:{}", (Object)id);
                String findErrorHistory = String.format(FIND_HIST, this.keySpace(string));
                PreparedStatement pq2 = this.cassandra.getPreparedQuery("find_error_history_" + this.keySpace(string), findErrorHistory);
                ResultSet hist = this.cassandra.getSession().execute((Statement)pq2.bind(new Object[]{id, 2 * this.retryErrors - 1}));
                int errorCount = 0;
                boolean firstRow = true;
                boolean errorMostRecent = true;
                boolean alreadyDropped = false;
                Instant mostRecent = null;
                DocumentImpl tempDoc = new DocumentImpl(new byte[0], id, this.getPlan(), Document.Operation.UPDATE, this, FTI_ORIGIN);
                int loop = 0;
                for (Row row : hist) {
                    String status = row.getString(1);
                    if (mostRecent == null) {
                        mostRecent = row.getInstant(2);
                    }
                    log.trace("Observing status of {} for {} on row {}", (Object)status, (Object)id, (Object)(++loop));
                    switch (Status.valueOf(status)) {
                        case ERROR: {
                            ++errorCount;
                            break;
                        }
                        case PROCESSING: {
                            if (!firstRow) break;
                        }
                        case DROPPED: {
                            if (firstRow) {
                                alreadyDropped = true;
                            }
                        }
                        default: {
                            if (!firstRow) break;
                            errorMostRecent = false;
                        }
                    }
                    log.trace("after switch:{}", (Object)status);
                    firstRow = false;
                    if (errorMostRecent) continue;
                    break;
                }
                log.trace("ERROR COUNT {} of {} for {}", (Object)errorCount, (Object)this.retryErrors, (Object)id);
                if (errorMostRecent && errorCount < this.retryErrors) {
                    log.info("Re-feeding errored document {}", (Object)id);
                    LatestStatus latestStatus = new LatestStatus(Status.ERROR.toString(), String.valueOf(mostRecent), string);
                    forceReprocess.computeIfAbsent(id, k -> new HashSet()).add(latestStatus);
                    continue;
                }
                if (!alreadyDropped && errorCount >= this.retryErrors) {
                    log.warn("Marking document dead id={} due to too many error retries ({})", (Object)id, (Object)errorCount);
                    tempDoc.initDestinations(Set.of(string), this.getName());
                    tempDoc.setStatus(Status.DEAD, "Retry limit of {} exceeded", Integer.valueOf(this.retryErrors));
                    tempDoc.stepStarted(this);
                    deadDocs.add(tempDoc);
                    continue;
                }
                log.trace("Ignoring {} because errorMostRecent = {} and errorCount = {}", (Object)id, (Object)errorMostRecent, (Object)errorCount);
            }
        }
        if (forceReprocess.size() > 0 || deadDocs.size() > 0) {
            log.info("Found Errored docs. Reprocess:{} Mark Dead:{}", (Object)forceReprocess.size(), (Object)deadDocs.size());
        } else {
            log.info("No errored documents found.");
        }
        for (DocumentImpl documentImpl : deadDocs) {
            log.trace("REPORTING DEAD STATUS {}", (Object)documentImpl.getId());
            documentImpl.reportDocStatus();
        }
        for (Map.Entry entry : forceReprocess.entrySet()) {
            this.process(true, null, entry, FTI_ORIGIN);
        }
    }

    private boolean isShutdown() {
        return this.shutdownHasStarted;
    }

    @Override
    public long getInterval() {
        return this.interval;
    }

    @Override
    public boolean isActivePriorSteps() {
        return false;
    }

    @Override
    public void addPredecessor(StepImpl obj) {
        throw new UnsupportedOperationException("Scanners cannot have predecessors");
    }

    @Override
    public boolean add(Document document) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public boolean offer(Document document) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public Document remove() {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public Document poll() {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public Document element() {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public Document peek() {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public void put(Document document) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public boolean offer(Document document, long timeout, TimeUnit unit) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public Document take() {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public Document poll(long timeout, TimeUnit unit) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public int remainingCapacity() {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public boolean remove(Object o) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public boolean addAll(Collection<? extends Document> c) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public void clear() {
    }

    @Override
    public boolean contains(Object o) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public Iterator<Document> iterator() {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public Object[] toArray() {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public <T> T[] toArray(T[] a) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public int drainTo(Collection<? super Document> c) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public int drainTo(Collection<? super Document> c, int maxElements) {
        throw new UnsupportedOperationException("Scanners are a push only source of documents. Queue methods are not supported for this type of step.");
    }

    @Override
    public boolean isEmpty() {
        return true;
    }

    @Override
    protected Logger getLogger() {
        return log;
    }

    public boolean isScanActive() {
        return this.activeScans.get() > 0;
    }

    public void scanStarted() {
        this.activeScans.incrementAndGet();
    }

    public void scanFinished() {
        this.activeScans.decrementAndGet();
    }

    @Override
    public boolean isRemembering() {
        return this.remembering;
    }

    @Override
    public boolean isHashing() {
        return this.hashing;
    }

    public CassandraSupport getCassandra() {
        return this.cassandra;
    }

    public void setCassandra(CassandraSupport cassandra) {
        this.cassandra = cassandra;
    }

    @Override
    public String keySpace(String outputStep) {
        return this.keySpaces.computeIfAbsent(String.valueOf(outputStep), ps -> "jj_" + ScannerImpl.keySpaceHash(ps, this));
    }

    @NotNull
    private static String keySpaceHash(String outputStep, Scanner s) {
        MessageDigest md;
        String baseName = "jj_" + s.getName() + "_" + s.getPlan().getName() + "_" + s.getPlan().getVersion() + (String)(outputStep != null && !"null".equals(outputStep) ? "_" + outputStep : "");
        try {
            md = MessageDigest.getInstance("MD5");
        }
        catch (NoSuchAlgorithmException e) {
            throw new RuntimeException(e);
        }
        String result = new String(Hex.encodeHex((byte[])md.digest(baseName.getBytes())));
        log.info("Hash for {} keyspace is {}", (Object)baseName, (Object)result);
        return result;
    }

    protected void processDirty() {
        if (this.isRemembering()) {
            log.trace("processing dirty");
            HashSet<String> sentAlready = new HashSet<String>();
            FTIQueryContext ftiQueryContext = new FTIQueryContext(sentAlready);
            this.processPendingDocs(ftiQueryContext, List.of(Status.DIRTY, Status.FORCE, Status.RESTART), false);
            this.processErrors(ftiQueryContext);
        }
    }

    public class ScanOp
    implements Runnable {
        private final Runnable custom;
        private final Scanner scanner;

        public ScanOp(Runnable custom, Scanner scanner) {
            this.custom = custom;
            this.scanner = scanner;
        }

        @Override
        public void run() {
            CassandraSupport cassandra = ScannerImpl.this.getCassandra();
            if (this.scanner.isRemembering() && (cassandra == null || Cassandra.isBooting())) {
                log.error("Cassandra null or still starting for scan operation, Invocation skipped");
                return;
            }
            try {
                if (ScannerImpl.this.isScanActive()) {
                    log.info("Skipping scan, there is already an active scan");
                    return;
                }
                log.info("{} of plan {} Starting scan at {} on {}", (Object)this.scanner.getName(), (Object)ScannerImpl.this.getPlan().getName(), (Object)new Date(), (Object)Thread.currentThread().getName());
                ScannerImpl.this.scanStarted();
                ScannerImpl.this.processDirty();
                this.custom.run();
                log.info("{} of plan {} Finishing scan at {} on {}", (Object)this.scanner.getName(), (Object)ScannerImpl.this.getPlan().getName(), (Object)new Date(), (Object)Thread.currentThread().getName());
            }
            catch (Exception e) {
                if (Thread.interrupted()) {
                    this.scanner.deactivate();
                }
                log.error("Exception while processing files!", (Throwable)e);
            }
            finally {
                ScannerImpl.this.scanFinished();
            }
        }
    }

    public static abstract class Builder
    extends StepImpl.Builder {
        @Override
        public Builder batchSize(int size) {
            super.batchSize(size);
            return this;
        }

        @Override
        public Builder named(String stepName) {
            super.named(stepName);
            return this;
        }

        @Override
        public Builder routingBy(RouterBase.Builder<? extends Router> router) {
            super.routingBy(router);
            return this;
        }

        @Override
        protected abstract ScannerImpl getObj();

        public Builder scanFreqMS(long interval) {
            this.getObj().interval = interval;
            return this;
        }

        public Builder retryErroredDocsUpTo(int retries) {
            this.getObj().retryErrors = retries;
            return this;
        }

        public Builder rememberScannedIds(boolean remember) {
            this.getObj().remembering = remember;
            return this;
        }

        public Builder detectChangesViaHashing(boolean hash) {
            this.getObj().hashing = hash;
            return this;
        }
    }

    static class LatestStatus {
        private final String status;
        private final String timestamp;
        private final String outputStepName;

        LatestStatus(String status, String timestamp, String outputStepName) {
            this.status = status;
            this.timestamp = timestamp;
            this.outputStepName = outputStepName;
        }

        public String toString() {
            return "LatestStatus{status='" + this.getStatus() + "', timestamp='" + this.getTimestamp() + "', outputStepName='" + this.getoutputStepName() + "'}";
        }

        public String getStatus() {
            return this.status;
        }

        public String getTimestamp() {
            return this.timestamp;
        }

        public String getoutputStepName() {
            return this.outputStepName;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            LatestStatus that = (LatestStatus)o;
            return Objects.equals(this.status, that.status) && Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.outputStepName, that.outputStepName);
        }

        public int hashCode() {
            return Objects.hash(this.status, this.timestamp, this.outputStepName);
        }
    }
}

