/*
 * Decompiled with CFR 0.152.
 */
package org.jesterj.ingest.processors;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.utils.ConcurrentBiMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import org.jesterj.ingest.model.Document;
import org.jesterj.ingest.model.DocumentProcessor;
import org.jesterj.ingest.model.Status;
import org.jesterj.ingest.model.impl.NamedBuilder;
import org.jesterj.ingest.processors.DocumentLoggingContext;

abstract class BatchProcessor<T>
implements DocumentProcessor {
    private static final Logger log = LogManager.getLogger();
    private volatile ScheduledExecutorService sender;
    private int batchSize = 100;
    private int sendPartialBatchAfterMs = 5000;
    private ScheduledFuture<?> scheduledSend;
    private final Object batchLock = new Object();
    private final Object sendLock = new Object();
    private ConcurrentBiMap<Document, T> batch;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    BatchProcessor() {
        Object object = this.batchLock;
        synchronized (object) {
            this.batch = new ConcurrentBiMap();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Document[] processDocument(Document document) {
        if (this.sender == null) {
            BatchProcessor batchProcessor = this;
            synchronized (batchProcessor) {
                if (this.sender == null) {
                    this.sender = Executors.newScheduledThreadPool(1, new ThreadFactory(){

                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r){
                                private final Map<String, String> context;
                                {
                                    this.context = ThreadContext.getContext();
                                }

                                @Override
                                public void run() {
                                    ThreadContext.putAll(this.context);
                                    super.run();
                                }
                            };
                        }
                    });
                    this.schedulePartialBatch();
                }
            }
        }
        T doc = this.convertDoc(document);
        ConcurrentBiMap<Document, T> oldBatch = null;
        Object object = this.batchLock;
        synchronized (object) {
            if (this.batch.size() >= this.batchSize) {
                oldBatch = this.takeBatch();
            }
            this.batch.put((Object)document, doc);
            document.setStatus(Status.BATCHED, "{} queued in position {} for sending to solr. Will be sent within {} milliseconds.", new Serializable[]{document.getId(), Integer.valueOf(this.batch.size()), Integer.valueOf(this.sendPartialBatchAfterMs)});
            document.reportDocStatus();
        }
        if (oldBatch != null) {
            this.sendBatch(oldBatch);
        }
        log.trace("Batch Processor ({}) processed {}", (Object)this.getName(), (Object)document.getId());
        return new Document[0];
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConcurrentBiMap<Document, T> takeBatch() {
        Object object = this.batchLock;
        synchronized (object) {
            ConcurrentBiMap<Document, T> oldBatch = this.batch;
            this.batch = new ConcurrentBiMap();
            log.trace("took batch {} with size {}", (Object)oldBatch.toString(), (Object)oldBatch.size());
            return oldBatch;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendBatch(ConcurrentBiMap<Document, T> oldBatch) {
        Object object = this.sendLock;
        synchronized (object) {
            try {
                if (oldBatch.size() == 0) {
                    return;
                }
                this.batchOperation(oldBatch);
            }
            catch (InterruptedException e) {
                log.info("Send aborted due to system shutdown");
            }
            catch (Exception e) {
                log.info("Batch Send failed", (Throwable)e);
                if (this.exceptionIndicatesDocumentIssue(e)) {
                    this.individualFallbackOperation(oldBatch, e);
                } else {
                    this.perDocumentFailure(oldBatch, e);
                }
            }
            finally {
                this.schedulePartialBatch();
                oldBatch.clear();
            }
        }
    }

    private void schedulePartialBatch() {
        log.trace("Scheduling partial batch");
        if (this.scheduledSend != null) {
            this.scheduledSend.cancel(false);
        }
        this.scheduledSend = this.sender.schedule(() -> {
            log.trace("Scheduled Send Activated");
            this.sendBatch(this.takeBatch());
        }, (long)this.sendPartialBatchAfterMs, TimeUnit.MILLISECONDS);
    }

    protected void perDocumentFailure(ConcurrentBiMap<Document, ?> oldBatch, Exception e) {
        for (Document doc : oldBatch.keySet()) {
            this.perDocFailLogging(e, doc);
        }
    }

    DocumentLoggingContext createDocContext(Document doc) {
        return new DocumentLoggingContext(doc);
    }

    protected abstract void perDocFailLogging(Exception var1, Document var2);

    protected abstract void individualFallbackOperation(ConcurrentBiMap<Document, T> var1, Exception var2);

    protected abstract void batchOperation(ConcurrentBiMap<Document, T> var1) throws Exception;

    protected abstract boolean exceptionIndicatesDocumentIssue(Exception var1);

    protected abstract T convertDoc(Document var1);

    public static abstract class Builder<T>
    extends NamedBuilder<BatchProcessor<T>> {
        public Builder<T> sendingBatchesOf(int batchSize) {
            ((BatchProcessor)this.getObj()).batchSize = batchSize;
            return this;
        }

        public Builder<T> sendingPartialBatchesAfterMs(int ms) {
            ((BatchProcessor)this.getObj()).sendPartialBatchAfterMs = ms;
            return this;
        }
    }
}

