/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.file;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.file.FileRecord;
import org.apache.pulsar.io.file.utils.GZipFiles;
import org.apache.pulsar.io.file.utils.ZipFiles;

public class FileConsumerThread
extends Thread {
    private final PushSource<byte[]> source;
    private final BlockingQueue<File> workQueue;
    private final BlockingQueue<File> inProcess;
    private final BlockingQueue<File> recentlyProcessed;

    public FileConsumerThread(PushSource<byte[]> source, BlockingQueue<File> workQueue, BlockingQueue<File> inProcess, BlockingQueue<File> recentlyProcessed) {
        this.source = source;
        this.workQueue = workQueue;
        this.inProcess = inProcess;
        this.recentlyProcessed = recentlyProcessed;
    }

    @Override
    public void run() {
        try {
            while (true) {
                File file = this.workQueue.take();
                boolean added = false;
                while (!(added = this.inProcess.add(file))) {
                }
                this.consumeFile(file);
            }
        }
        catch (InterruptedException interruptedException) {
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void consumeFile(File file) {
        AtomicInteger idx = new AtomicInteger(1);
        try (Stream<String> lines = this.getLines(file);){
            lines.forEachOrdered(line -> this.process(file, idx.getAndIncrement(), (String)line));
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
            boolean removed = false;
            while (!(removed = this.inProcess.remove(file))) {
            }
            boolean added = false;
            while (!(added = this.recentlyProcessed.add(file))) {
            }
        }
    }

    private Stream<String> getLines(File file) throws IOException {
        if (file == null) {
            return null;
        }
        if (GZipFiles.isGzip(file)) {
            return GZipFiles.lines(Paths.get(file.getAbsolutePath(), new String[0]));
        }
        if (ZipFiles.isZip(file)) {
            return ZipFiles.lines(Paths.get(file.getAbsolutePath(), new String[0]));
        }
        return Files.lines(Paths.get(file.getAbsolutePath(), new String[0]), Charset.defaultCharset());
    }

    private void process(File srcFile, int lineNumber, String line) {
        this.source.consume((Record)new FileRecord(srcFile, lineNumber, line.getBytes()));
    }
}

