/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.bin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.bin.Bin;
import org.apache.nifi.processor.util.bin.BinManager;
import org.apache.nifi.processor.util.bin.BinProcessingResult;

public abstract class BinFiles
extends AbstractSessionFactoryProcessor {
    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder().name("Minimum Group Size").description("The minimum size of for the bundle").required(true).defaultValue("0 B").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder().name("Maximum Group Size").description("The maximum size for the bundle. If not specified, there is no maximum.").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder().name("Minimum Number of Entries").description("The minimum number of files to include in a bundle").required(true).defaultValue("1").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder().name("Maximum Number of Entries").description("The maximum number of files to include in a bundle").defaultValue("1000").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder().name("Maximum number of Bins").description("Specifies the maximum number of bins that can be held in memory at any one time").defaultValue("5").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder().name("Max Bin Age").description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours").required(false).addValidator(StandardValidators.createTimePeriodValidator((long)1L, (TimeUnit)TimeUnit.SECONDS, (long)Integer.MAX_VALUE, (TimeUnit)TimeUnit.SECONDS)).build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
    private final BinManager binManager = new BinManager();
    private final Queue<Bin> readyBins = new LinkedBlockingQueue<Bin>();

    @OnStopped
    public final void resetState() {
        Bin bin;
        this.binManager.purge();
        while ((bin = this.readyBins.poll()) != null) {
            bin.getSession().rollback();
        }
    }

    protected abstract FlowFile preprocessFlowFile(ProcessContext var1, ProcessSession var2, FlowFile var3);

    protected abstract String getGroupId(ProcessContext var1, FlowFile var2, ProcessSession var3);

    protected abstract void setUpBinManager(BinManager var1, ProcessContext var2);

    protected abstract BinProcessingResult processBin(Bin var1, ProcessContext var2) throws ProcessException;

    protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
        return new ArrayList<ValidationResult>();
    }

    public final void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        int flowFilesBinned;
        int maxBinCount;
        int totalBinCount = this.binManager.getBinCount() + this.readyBins.size();
        if (totalBinCount < (maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger().intValue())) {
            flowFilesBinned = this.binFlowFiles(context, sessionFactory);
            this.getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned});
        } else {
            flowFilesBinned = 0;
            this.getLogger().debug("Will not bin any FlowFiles because {} bins already exist;will wait until bins have been emptied before any more are created", new Object[]{totalBinCount});
        }
        if (!this.isScheduled()) {
            return;
        }
        int binsMigrated = this.migrateBins(context);
        int binsProcessed = this.processBins(context);
        if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
            context.yield();
        }
    }

    private int migrateBins(ProcessContext context) {
        Bin bin;
        int added = 0;
        for (Bin bin2 : this.binManager.removeReadyBins(true)) {
            this.readyBins.add(bin2);
            ++added;
        }
        if (added == 0 && this.binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger() && (bin = this.binManager.removeOldestBin()) != null) {
            ++added;
            this.readyBins.add(bin);
        }
        return added;
    }

    private int processBins(ProcessContext context) {
        Bin bin;
        ComponentLog logger = this.getLogger();
        int processedBins = 0;
        while ((bin = this.readyBins.poll()) != null) {
            BinProcessingResult binProcessingResult;
            try {
                binProcessingResult = this.processBin(bin, context);
            }
            catch (ProcessException e) {
                logger.error("Failed to process bundle of {} files due to {}", new Object[]{bin.getContents().size(), e});
                ProcessSession binSession = bin.getSession();
                for (FlowFile flowFile : bin.getContents()) {
                    binSession.transfer(flowFile, REL_FAILURE);
                }
                binSession.commit();
                continue;
            }
            catch (Exception e) {
                logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[]{bin.getContents().size(), e});
                bin.getSession().rollback();
                continue;
            }
            if (!binProcessingResult.isCommitted()) {
                ProcessSession binSession = bin.getSession();
                bin.getContents().stream().forEach(ff -> binSession.putAllAttributes(ff, binProcessingResult.getAttributes()));
                binSession.transfer(bin.getContents(), REL_ORIGINAL);
                binSession.commit();
            }
            ++processedBins;
        }
        return processedBins;
    }

    private int binFlowFiles(ProcessContext context, ProcessSessionFactory sessionFactory) {
        ProcessSession session;
        List flowFiles;
        int flowFilesBinned = 0;
        while (this.binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger() && this.isScheduled() && !(flowFiles = (session = sessionFactory.createSession()).get(1000)).isEmpty()) {
            HashMap<String, List> flowFileGroups = new HashMap<String, List>();
            for (FlowFile flowFile : flowFiles) {
                FlowFile flowFile2 = this.preprocessFlowFile(context, session, flowFile);
                try {
                    String groupingIdentifier = this.getGroupId(context, flowFile2, session);
                    flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList()).add(flowFile2);
                }
                catch (Exception e) {
                    this.getLogger().error("Could not determine which Bin to add {} to; will route to failure", new Object[]{flowFile2}, (Throwable)e);
                    session.transfer(flowFile2, REL_FAILURE);
                }
            }
            for (Map.Entry entry : flowFileGroups.entrySet()) {
                Set<FlowFile> unbinned = this.binManager.offer((String)entry.getKey(), (Collection)entry.getValue(), session, sessionFactory);
                for (FlowFile flowFile : unbinned) {
                    Bin bin = new Bin(sessionFactory.createSession(), 0L, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
                    bin.offer(flowFile, session);
                    this.readyBins.add(bin);
                }
                flowFilesBinned += ((List)entry.getValue()).size();
            }
        }
        return flowFilesBinned;
    }

    @OnScheduled
    public final void onScheduled(ProcessContext context) throws IOException {
        this.binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
        if (context.getProperty(MAX_BIN_AGE).isSet()) {
            this.binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
        } else {
            this.binManager.setMaxBinAge(Integer.MAX_VALUE);
        }
        if (context.getProperty(MAX_SIZE).isSet()) {
            this.binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
        } else {
            this.binManager.setMaximumSize(Long.MAX_VALUE);
        }
        this.binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
        if (context.getProperty(MAX_ENTRIES).isSet()) {
            this.binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger());
        } else {
            this.binManager.setMaximumEntries(Integer.MAX_VALUE);
        }
        this.setUpBinManager(this.binManager, context);
    }

    protected final Collection<ValidationResult> customValidate(ValidationContext context) {
        Collection<ValidationResult> otherProblems;
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>(super.customValidate(context));
        long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
        Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
        if (maxBytes != null && maxBytes.longValue() < minBytes) {
            problems.add(new ValidationResult.Builder().subject(MIN_SIZE.getName()).input(context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size must be less than or equal to Max Size").build());
        }
        Long min = context.getProperty(MIN_ENTRIES).asLong();
        Long max = context.getProperty(MAX_ENTRIES).asLong();
        if (min != null && max != null && min > max) {
            problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
        }
        if ((otherProblems = this.additionalCustomValidation(context)) != null) {
            problems.addAll(otherProblems);
        }
        return problems;
    }
}

