/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.list;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.list.EntityListing;
import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;

@TriggerSerially
@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. The scope used depends on the implementation.")
public abstract class AbstractListProcessor<T extends ListableEntity>
extends AbstractProcessor
implements VerifiableProcessor {
    private static final Long IGNORE_MIN_TIMESTAMP_VALUE = 0L;
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. The stored value in the cache service will be migrated into the state when this processor is started at the first time. The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.").required(false).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect", "Automatically detect time unit deterministically based on candidate entries timestamp. Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp. E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
    public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds", "This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
    public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds", "For a target system that does not have millis precision, but has in seconds.");
    public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
    public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder().name("target-system-timestamp-precision").displayName("Target System Timestamp Precision").description("Specify timestamp precision at the target system. Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.").required(true).allowableValues(new AllowableValue[]{PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES}).defaultValue(PRECISION_AUTO_DETECT.getValue()).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps", "This strategy tracks the latest timestamp of listed entity to determine new/updated entities. Since it only tracks few timestamps, it can manage listing state efficiently. However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy. For example, such situation can happen in a file system if a file with old timestamp is copied or moved into the target directory without its last modified timestamp being updated. Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
    public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities", "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities. This strategy can pick entities having old timestamp that can be missed with 'Tracking Timestamps'. Works even when multiple subdirectories are being written at the same time while listing is running. However additional DistributedMapCache controller service is required and more JVM heap memory is used. See the description of 'Entity Tracking Time Window' property for further details on how it works.");
    public static final AllowableValue NO_TRACKING = new AllowableValue("none", "No Tracking", "This strategy lists an entity without any tracking. The same entity will be listed each time on executing this processor. It is recommended to change the default run schedule value. Any property that related to the persisting state will be disregarded.");
    public static final AllowableValue BY_TIME_WINDOW = new AllowableValue("time-window", "Time Window", "This strategy uses a sliding time window. The window starts where the previous window ended and ends with the 'current time'. One cycle will list files with modification time falling within the time window. Works even when multiple subdirectories are being written at the same time while listing is running. IMPORTANT: This strategy works properly only if the time on both the system hosting NiFi and the one hosting the files are accurate.");
    public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder().name("listing-strategy").displayName("Listing Strategy").description("Specify how to determine new/updated entities. See each strategy descriptions for detail.").required(true).allowableValues(new AllowableValue[]{BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING}).defaultValue(BY_TIMESTAMPS.getValue()).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.").required(false).identifiesControllerService(RecordSetWriterFactory.class).build();
    private volatile Long lastListedLatestEntryTimestampMillis = null;
    private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
    private volatile Long lastRunTimeNanos = 0L;
    private volatile boolean justElectedPrimaryNode = false;
    private volatile boolean resetState = false;
    private volatile boolean resetEntityTrackingState = false;
    private volatile List<String> latestIdentifiersProcessed = new ArrayList<String>();
    private volatile ListedEntityTracker<T> listedEntityTracker;
    public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
    static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
    static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
    static final String IDENTIFIER_PREFIX = "id";

    public File getPersistenceFile() {
        return new File("conf/state/" + this.getIdentifier());
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (this.isConfigurationRestored() && this.isListingResetNecessary(descriptor)) {
            this.resetTimeStates();
            this.resetState = true;
            this.resetEntityTrackingState = true;
        }
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        return relationships;
    }

    protected final Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
        if (BY_ENTITIES.equals((Object)listingStrategy)) {
            ListedEntityTracker.validateProperties(context, results, this.getStateScope((PropertyContext)context));
        }
        this.customValidate(context, results);
        return results;
    }

    protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> validationResults) {
    }

    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog logger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> results = new ArrayList<ConfigVerificationResult>();
        String containerName = this.getListingContainerName(context);
        try {
            String countExplanation;
            Integer unfilteredListingCount = this.countUnfilteredListing(context);
            int matchingCount = this.performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.CONFIGURATION_VERIFICATION).size();
            if (unfilteredListingCount == null) {
                if (matchingCount == 0) {
                    countExplanation = "Found no objects matching the filter.";
                } else {
                    String matchingCountText = matchingCount == 1 ? matchingCount + " object" : matchingCount + " objects";
                    countExplanation = String.format("Found %s matching the filter.", matchingCountText);
                }
            } else if (unfilteredListingCount == 0) {
                countExplanation = "Found no objects.";
            } else {
                String unfilteredListingCountText = unfilteredListingCount == 1 ? unfilteredListingCount + " object" : unfilteredListingCount + " objects";
                String unfilteredDemonstrativePronoun = unfilteredListingCount == 1 ? "that" : "those";
                String matchingCountText = matchingCount == 1 ? matchingCount + " matches" : matchingCount + " match";
                countExplanation = String.format("Found %s.  Of %s, %s the filter.", unfilteredListingCountText, unfilteredDemonstrativePronoun, matchingCountText);
            }
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Perform Listing").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Successfully listed contents of %s.  %s", containerName, countExplanation)).build());
            logger.info("Successfully verified configuration");
        }
        catch (IOException e) {
            logger.warn("Failed to verify configuration. Could not list contents of {}", new Object[]{containerName, e});
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Perform Listing").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to list contents of %s: %s", containerName, e.getMessage())).build());
        }
        return results;
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState newState) {
        this.justElectedPrimaryNode = newState == PrimaryNodeState.ELECTED_PRIMARY_NODE;
    }

    @OnScheduled
    public final void updateState(ProcessContext context) throws IOException {
        String path = this.getPath(context);
        DistributedMapCacheClient client = (DistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        StateMap stateMap = context.getStateManager().getState(this.getStateScope((PropertyContext)context));
        if (stateMap.getVersion() == -1L) {
            try {
                this.migrateState(path, client, context.getStateManager(), this.getStateScope((PropertyContext)context));
            }
            catch (IOException ioe) {
                throw new IOException("Failed to properly migrate state to State Manager", ioe);
            }
        }
        if (this.lastListedLatestEntryTimestampMillis != null && stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
            this.getLogger().info("Detected that state was cleared for this component.  Resetting internal values.");
            this.resetTimeStates();
        }
        if (this.resetState) {
            context.getStateManager().clear(this.getStateScope((PropertyContext)context));
            this.resetState = false;
        }
    }

    private void migrateState(String path, DistributedMapCacheClient client, StateManager stateManager, Scope scope) throws IOException {
        File persistenceFile;
        Long minTimestamp = null;
        if (client != null) {
            StringSerDe serde = new StringSerDe();
            String serializedState = (String)client.get((Object)this.getKey(path), (Serializer)serde, (Deserializer)serde);
            if (serializedState != null && !serializedState.isEmpty()) {
                EntityListing listing = this.deserialize(serializedState);
                minTimestamp = listing.getLatestTimestamp().getTime();
            }
            if (client != null) {
                try {
                    client.remove((Object)path, (Serializer)new StringSerDe());
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new State Management service, so the Distributed Cache Service is no longer needed.");
                }
            }
        }
        if ((persistenceFile = this.getPersistenceFile()).exists()) {
            Properties props = new Properties();
            try (FileInputStream fis = new FileInputStream(persistenceFile);){
                props.load(fis);
            }
            String locallyPersistedValue = props.getProperty(path);
            if (locallyPersistedValue != null) {
                EntityListing listing = this.deserialize(locallyPersistedValue);
                long localTimestamp = listing.getLatestTimestamp().getTime();
                if (minTimestamp == null || localTimestamp > minTimestamp) {
                    minTimestamp = localTimestamp;
                    this.latestIdentifiersProcessed.clear();
                    this.latestIdentifiersProcessed.addAll(listing.getMatchingIdentifiers());
                }
            }
            if (persistenceFile.exists() && !persistenceFile.delete()) {
                this.getLogger().warn("Migrated state but failed to delete local persistence file");
            }
        }
        if (minTimestamp != null) {
            Map<String, String> updatedState = this.createStateMap(minTimestamp, minTimestamp, this.latestIdentifiersProcessed);
            stateManager.setState(updatedState, scope);
        }
    }

    private Map<String, String> createStateMap(long latestListedEntryTimestampThisCycleMillis, long lastProcessedLatestEntryTimestampMillis, List<String> processedIdentifiesWithLatestTimestamp) throws IOException {
        HashMap<String, String> updatedState = new HashMap<String, String>(processedIdentifiesWithLatestTimestamp.size() + 2);
        updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
        updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
        for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); ++i) {
            updatedState.put("id." + i, processedIdentifiesWithLatestTimestamp.get(i));
        }
        return updatedState;
    }

    private void persist(long latestListedEntryTimestampThisCycleMillis, long lastProcessedLatestEntryTimestampMillis, List<String> processedIdentifiesWithLatestTimestamp, ProcessSession session, Scope scope) throws IOException {
        Map<String, String> updatedState = this.createStateMap(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, processedIdentifiesWithLatestTimestamp);
        session.setState(updatedState, scope);
    }

    protected String getKey(String directory) {
        return this.getIdentifier() + ".lastListingTime." + directory;
    }

    private EntityListing deserialize(String serializedState) throws JsonParseException, JsonMappingException, IOException {
        ObjectMapper mapper = new ObjectMapper();
        return (EntityListing)mapper.readValue(serializedState, EntityListing.class);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
        if (BY_TIMESTAMPS.equals((Object)listingStrategy)) {
            this.listByTrackingTimestamps(context, session);
        } else if (BY_ENTITIES.equals((Object)listingStrategy)) {
            this.listByTrackingEntities(context, session);
        } else if (NO_TRACKING.equals((Object)listingStrategy)) {
            this.listByNoTracking(context, session);
        } else if (BY_TIME_WINDOW.equals((Object)listingStrategy)) {
            this.listByTimeWindow(context, session);
        } else {
            throw new ProcessException("Unknown listing strategy: " + listingStrategy);
        }
    }

    protected long getCurrentTime() {
        return System.currentTimeMillis();
    }

    public void listByNoTracking(ProcessContext context, ProcessSession session) {
        List<T> entityList;
        try {
            context.getStateManager().clear(this.getStateScope((PropertyContext)context));
        }
        catch (IOException re) {
            this.getLogger().error("Failed to remove previous state from the State Manager.", new Object[]{re.getMessage()}, (Throwable)re);
            context.yield();
            return;
        }
        try {
            entityList = this.performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.EXECUTION);
        }
        catch (IOException pe) {
            this.getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{pe.getMessage()}, (Throwable)pe);
            context.yield();
            return;
        }
        if (entityList == null || entityList.isEmpty()) {
            context.yield();
            return;
        }
        TreeMap<Long, List> orderedEntries = new TreeMap<Long, List>();
        for (ListableEntity listableEntity : entityList) {
            List entitiesForTimestamp = orderedEntries.computeIfAbsent(listableEntity.getTimestamp(), k -> new ArrayList());
            entitiesForTimestamp.add(listableEntity);
        }
        if (orderedEntries.size() > 0) {
            for (Map.Entry entry : orderedEntries.entrySet()) {
                List entities = (List)entry.getValue();
                for (ListableEntity entity : entities) {
                    Map<String, String> attributes = this.createAttributes(entity, context);
                    FlowFile flowFile = session.create();
                    flowFile = session.putAllAttributes(flowFile, attributes);
                    session.transfer(flowFile, REL_SUCCESS);
                }
            }
        }
    }

    public void listByTimeWindow(ProcessContext context, ProcessSession session) throws ProcessException {
        long upperBoundExclusiveTimestamp;
        if (this.lastListedLatestEntryTimestampMillis == null || this.justElectedPrimaryNode) {
            try {
                StateMap stateMap = context.getStateManager().getState(this.getStateScope((PropertyContext)context));
                Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY)).map(Long::parseLong).ifPresent(lastTimestamp -> {
                    this.lastListedLatestEntryTimestampMillis = lastTimestamp;
                });
                this.justElectedPrimaryNode = false;
            }
            catch (IOException ioe) {
                this.getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
                context.yield();
                return;
            }
        }
        long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(IGNORE_MIN_TIMESTAMP_VALUE);
        long currentTime = this.getCurrentTime();
        TreeMap orderedEntries = new TreeMap();
        try {
            List<T> entityList = this.performListing(context, lowerBoundInclusiveTimestamp, ListingMode.EXECUTION);
            boolean targetSystemHasMilliseconds = false;
            boolean targetSystemHasSeconds = false;
            for (ListableEntity entity2 : entityList) {
                long entityTimestampMillis = entity2.getTimestamp();
                if (!targetSystemHasMilliseconds) {
                    boolean bl = targetSystemHasMilliseconds = entityTimestampMillis % 1000L > 0L;
                }
                if (targetSystemHasSeconds) continue;
                targetSystemHasSeconds = entityTimestampMillis % 60000L > 0L;
            }
            String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
            if (StringUtils.isBlank((String)specifiedPrecision)) {
                specifiedPrecision = this.getDefaultTimePrecision();
            }
            TimeUnit targetSystemTimePrecision = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision) ? (targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : (targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES)) : (PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS : (PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES));
            Long listingLagMillis = LISTING_LAG_MILLIS.get((Object)targetSystemTimePrecision);
            upperBoundExclusiveTimestamp = currentTime - listingLagMillis;
            if (this.getLogger().isTraceEnabled()) {
                this.getLogger().trace("interval: " + lowerBoundInclusiveTimestamp + " - " + upperBoundExclusiveTimestamp);
                this.getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", ")));
            }
            entityList.stream().filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp).filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp).forEach(entity -> orderedEntries.computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList()).add(entity));
            if (this.getLogger().isTraceEnabled()) {
                this.getLogger().trace("orderedEntries: " + orderedEntries.values().stream().flatMap(Collection::stream).map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", ")));
            }
        }
        catch (IOException e) {
            this.getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, (Throwable)e);
            context.yield();
            return;
        }
        if (orderedEntries.isEmpty()) {
            this.getLogger().debug("There is no data to list. Yielding.");
            context.yield();
            return;
        }
        boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
        if (writerSet) {
            try {
                this.createRecordsForEntities(context, session, orderedEntries);
            }
            catch (IOException | SchemaNotFoundException e) {
                this.getLogger().error("Failed to write listing to FlowFile", e);
                context.yield();
                return;
            }
        } else {
            this.createFlowFilesForEntities(context, session, orderedEntries);
        }
        try {
            if (this.getLogger().isTraceEnabled()) {
                this.getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
            }
            this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
            this.persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, this.latestIdentifiersProcessed, session, this.getStateScope((PropertyContext)context));
        }
        catch (IOException ioe) {
            this.getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or if another node begins executing this Processor, data duplication may occur.", (Throwable)ioe);
        }
    }

    public void listByTrackingTimestamps(ProcessContext context, ProcessSession session) throws ProcessException {
        List<T> entityList;
        Long minTimestampToListMillis = this.lastListedLatestEntryTimestampMillis;
        if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || this.justElectedPrimaryNode) {
            try {
                boolean noUpdateRequired = false;
                StateMap stateMap = session.getState(this.getStateScope((PropertyContext)context));
                this.latestIdentifiersProcessed.clear();
                for (Map.Entry state : stateMap.toMap().entrySet()) {
                    String k = (String)state.getKey();
                    String v = (String)state.getValue();
                    if (v == null || v.isEmpty()) continue;
                    if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
                        minTimestampToListMillis = Long.parseLong(v);
                        if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
                            noUpdateRequired = true;
                            continue;
                        }
                        this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
                        continue;
                    }
                    if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
                        this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
                        continue;
                    }
                    if (!k.startsWith(IDENTIFIER_PREFIX)) continue;
                    this.latestIdentifiersProcessed.add(v);
                }
                this.justElectedPrimaryNode = false;
                if (noUpdateRequired) {
                    context.yield();
                    return;
                }
            }
            catch (IOException ioe) {
                this.getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
                context.yield();
                return;
            }
        }
        long currentRunTimeNanos = System.nanoTime();
        long currentRunTimeMillis = System.currentTimeMillis();
        try {
            entityList = this.performListing(context, minTimestampToListMillis, ListingMode.EXECUTION);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, (Throwable)e);
            context.yield();
            return;
        }
        if (entityList == null || entityList.isEmpty()) {
            context.yield();
            return;
        }
        Long latestListedEntryTimestampThisCycleMillis = null;
        TreeMap<Long, List<T>> orderedEntries = new TreeMap<Long, List<T>>();
        boolean targetSystemHasMilliseconds = false;
        boolean targetSystemHasSeconds = false;
        for (ListableEntity entity2 : entityList) {
            boolean newEntry;
            long entityTimestampMillis = entity2.getTimestamp();
            if (!targetSystemHasMilliseconds) {
                boolean bl = targetSystemHasMilliseconds = entityTimestampMillis % 1000L > 0L;
            }
            if (!targetSystemHasSeconds) {
                targetSystemHasSeconds = entityTimestampMillis % 60000L > 0L;
            }
            if (!(newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= this.lastProcessedLatestEntryTimestampMillis)) continue;
            ArrayList<ListableEntity> entitiesForTimestamp = (ArrayList<ListableEntity>)orderedEntries.get(entity2.getTimestamp());
            if (entitiesForTimestamp == null) {
                entitiesForTimestamp = new ArrayList<ListableEntity>();
                orderedEntries.put(entity2.getTimestamp(), entitiesForTimestamp);
            }
            entitiesForTimestamp.add(entity2);
        }
        int entitiesListed = 0;
        if (orderedEntries.size() > 0) {
            boolean writerSet;
            latestListedEntryTimestampThisCycleMillis = (Long)orderedEntries.lastKey();
            String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
            if (StringUtils.isBlank((String)specifiedPrecision)) {
                specifiedPrecision = this.getDefaultTimePrecision();
            }
            TimeUnit targetSystemTimePrecision = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision) ? (targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : (targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES)) : (PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS : (PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES));
            Long listingLagMillis = LISTING_LAG_MILLIS.get((Object)targetSystemTimePrecision);
            if (latestListedEntryTimestampThisCycleMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
                long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
                if (currentRunTimeNanos - this.lastRunTimeNanos < listingLagNanos || latestListedEntryTimestampThisCycleMillis.equals(this.lastProcessedLatestEntryTimestampMillis) && ((List)orderedEntries.get(latestListedEntryTimestampThisCycleMillis)).stream().allMatch(entity -> this.latestIdentifiersProcessed.contains(entity.getIdentifier()))) {
                    context.yield();
                    return;
                }
            } else {
                long minimumReliableTimestampInFilesystemTimeUnit = targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, TimeUnit.MILLISECONDS);
                long minimumReliableTimestampMillis = targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit);
                if (minimumReliableTimestampMillis < latestListedEntryTimestampThisCycleMillis) {
                    orderedEntries.remove(latestListedEntryTimestampThisCycleMillis);
                }
            }
            if (writerSet = context.getProperty(RECORD_WRITER).isSet()) {
                try {
                    entitiesListed = this.createRecordsForEntities(context, session, orderedEntries);
                }
                catch (IOException | SchemaNotFoundException e) {
                    this.getLogger().error("Failed to write listing to FlowFile", e);
                    context.yield();
                    return;
                }
            } else {
                entitiesListed = this.createFlowFilesForEntities(context, session, orderedEntries);
            }
        }
        if (latestListedEntryTimestampThisCycleMillis != null) {
            boolean processedNewFiles;
            boolean bl = processedNewFiles = entitiesListed > 0;
            if (processedNewFiles) {
                if (!((Long)orderedEntries.lastKey()).equals(this.lastProcessedLatestEntryTimestampMillis)) {
                    this.latestIdentifiersProcessed.clear();
                }
                this.latestIdentifiersProcessed.addAll(((List)orderedEntries.lastEntry().getValue()).stream().map(ListableEntity::getIdentifier).collect(Collectors.toList()));
                this.lastProcessedLatestEntryTimestampMillis = (Long)orderedEntries.lastKey();
            }
            if (!latestListedEntryTimestampThisCycleMillis.equals(this.lastListedLatestEntryTimestampMillis) || processedNewFiles) {
                try {
                    this.lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
                    this.persist(latestListedEntryTimestampThisCycleMillis, this.lastProcessedLatestEntryTimestampMillis, this.latestIdentifiersProcessed, session, this.getStateScope((PropertyContext)context));
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or if another node begins executing this Processor, data duplication may occur.", (Throwable)ioe);
                }
            }
            if (processedNewFiles) {
                this.getLogger().info("Successfully created listing with {} new objects", new Object[]{entitiesListed});
                session.commitAsync();
            }
            this.lastRunTimeNanos = currentRunTimeNanos;
        } else {
            this.getLogger().debug("There is no data to list. Yielding.");
            context.yield();
            if (this.lastListedLatestEntryTimestampMillis == null) {
                this.lastListedLatestEntryTimestampMillis = 0L;
            }
        }
    }

    private int createRecordsForEntities(ProcessContext context, ProcessSession session, Map<Long, List<T>> orderedEntries) throws IOException, SchemaNotFoundException {
        WriteResult writeResult;
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        int entitiesListed = 0;
        FlowFile flowFile = session.create();
        try (OutputStream out = session.write(flowFile);
             RecordSetWriter recordSetWriter = writerFactory.createWriter(this.getLogger(), this.getRecordSchema(), out, Collections.emptyMap());){
            recordSetWriter.beginRecordSet();
            for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
                List<T> entities = timestampEntities.getValue();
                if (timestampEntities.getKey().equals(this.lastProcessedLatestEntryTimestampMillis)) {
                    entities = entities.stream().filter(entity -> !this.latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
                }
                for (ListableEntity entity2 : entities) {
                    ++entitiesListed;
                    recordSetWriter.write(entity2.toRecord());
                }
            }
            writeResult = recordSetWriter.finishRecordSet();
        }
        if (entitiesListed == 0) {
            session.remove(flowFile);
            return 0;
        }
        HashMap<String, String> attributes = new HashMap<String, String>(writeResult.getAttributes());
        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
        flowFile = session.putAllAttributes(flowFile, attributes);
        session.transfer(flowFile, REL_SUCCESS);
        return entitiesListed;
    }

    private int createFlowFilesForEntities(ProcessContext context, ProcessSession session, Map<Long, List<T>> orderedEntries) {
        int entitiesListed = 0;
        for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
            List<T> entities = timestampEntities.getValue();
            if (timestampEntities.getKey().equals(this.lastProcessedLatestEntryTimestampMillis)) {
                entities = entities.stream().filter(entity -> !this.latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
            }
            for (ListableEntity entity2 : entities) {
                ++entitiesListed;
                Map<String, String> attributes = this.createAttributes(entity2, context);
                FlowFile flowFile = session.create();
                flowFile = session.putAllAttributes(flowFile, attributes);
                session.transfer(flowFile, REL_SUCCESS);
            }
        }
        return entitiesListed;
    }

    protected String getDefaultTimePrecision() {
        return TARGET_SYSTEM_TIMESTAMP_PRECISION.getDefaultValue();
    }

    private void resetTimeStates() {
        this.lastListedLatestEntryTimestampMillis = null;
        this.lastProcessedLatestEntryTimestampMillis = 0L;
        this.lastRunTimeNanos = 0L;
        this.latestIdentifiersProcessed.clear();
    }

    protected abstract Map<String, String> createAttributes(T var1, ProcessContext var2);

    protected abstract String getPath(ProcessContext var1);

    protected abstract List<T> performListing(ProcessContext var1, Long var2, ListingMode var3) throws IOException;

    protected abstract boolean isListingResetNecessary(PropertyDescriptor var1);

    protected abstract Scope getStateScope(PropertyContext var1);

    protected abstract RecordSchema getRecordSchema();

    protected abstract Integer countUnfilteredListing(ProcessContext var1) throws IOException;

    protected abstract String getListingContainerName(ProcessContext var1);

    @OnScheduled
    public void initListedEntityTracker(ProcessContext context) {
        boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
        if (this.listedEntityTracker != null && (this.resetEntityTrackingState || !isTrackingEntityStrategy)) {
            try {
                this.listedEntityTracker.clearListedEntities();
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to reset previously listed entities due to " + e, e);
            }
        }
        this.resetEntityTrackingState = false;
        if (isTrackingEntityStrategy) {
            if (this.listedEntityTracker == null) {
                this.listedEntityTracker = this.createListedEntityTracker();
            }
        } else {
            this.listedEntityTracker = null;
        }
    }

    protected ListedEntityTracker<T> createListedEntityTracker() {
        return new ListedEntityTracker(this.getIdentifier(), this.getLogger(), this.getRecordSchema());
    }

    private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException {
        this.listedEntityTracker.trackEntities(context, session, this.justElectedPrimaryNode, this.getStateScope((PropertyContext)context), minTimestampToList -> {
            try {
                return this.performListing(context, (Long)minTimestampToList, ListingMode.EXECUTION);
            }
            catch (IOException e) {
                this.getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, (Throwable)e);
                return Collections.emptyList();
            }
        }, entity -> this.createAttributes(entity, context));
        this.justElectedPrimaryNode = false;
    }

    static {
        HashMap<TimeUnit, Long> nanos = new HashMap<TimeUnit, Long>();
        nanos.put(TimeUnit.MILLISECONDS, 100L);
        nanos.put(TimeUnit.SECONDS, 1000L);
        nanos.put(TimeUnit.MINUTES, 60000L);
        LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
    }

    private static class StringSerDe
    implements Serializer<String>,
    Deserializer<String> {
        private StringSerDe() {
        }

        public String deserialize(byte[] value) throws DeserializationException, IOException {
            if (value == null) {
                return null;
            }
            return new String(value, StandardCharsets.UTF_8);
        }

        public void serialize(String value, OutputStream out) throws SerializationException, IOException {
            out.write(value.getBytes(StandardCharsets.UTF_8));
        }
    }

    protected static enum ListingMode {
        EXECUTION,
        CONFIGURATION_VERIFICATION;

    }
}

