/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.impl.messaging.file;

import com.sap.cds.services.ErrorStatus;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.messaging.service.MessagingBrokerQueueListener;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileBasedMessagingBroker {
    private static final Logger logger = LoggerFactory.getLogger(FileBasedMessagingBroker.class);
    private static final Set<String> createdBrokers = new HashSet<String>();
    private final File file;
    private final Set<String> subscriptions = ConcurrentHashMap.newKeySet();
    private MessagingBrokerQueueListener listener;

    private FileBasedMessagingBroker(String name, File exchangeFile) throws IOException {
        this.file = new File(exchangeFile.getCanonicalPath());
        WatchService fileWatcher = FileSystems.getDefault().newWatchService();
        Path dir = Paths.get(this.file.getParentFile().getAbsolutePath(), new String[0]);
        dir.register(fileWatcher, StandardWatchEventKinds.ENTRY_MODIFY);
        new Thread(() -> {
            WatchKey watchKey = null;
            while (true) {
                try {
                    while (true) {
                        if ((watchKey = fileWatcher.poll(10L, TimeUnit.MINUTES)) == null) {
                            continue;
                        }
                        if (!watchKey.pollEvents().isEmpty()) {
                            this.receivedRawEvent();
                        }
                        watchKey.reset();
                    }
                }
                catch (InterruptedException e) {
                    logger.debug("File watching timed out, restarting watcher.");
                    continue;
                }
                break;
            }
        }, "FileBasedBroker: " + name).start();
    }

    public static FileBasedMessagingBroker connect(String name, File exchangeFile) throws IOException {
        if (createdBrokers.contains(exchangeFile.getAbsolutePath())) {
            throw new IOException(String.format("The file based messaging broker for '%s' was already created!", exchangeFile.getAbsolutePath()));
        }
        FileBasedMessagingBroker broker = new FileBasedMessagingBroker(name, exchangeFile);
        createdBrokers.add(exchangeFile.getAbsolutePath());
        return broker;
    }

    public void emitMessage(String topic, String message) {
        try (RandomAccessFile raFile = new RandomAccessFile(this.file, "rw");
             FileChannel channel = raFile.getChannel();
             FileLock lock = this.obtainFileLock(60000L, channel);){
            MessageLine line = new MessageLine(topic, message);
            channel.write(ByteBuffer.wrap(line.toString().getBytes(StandardCharsets.UTF_8)), channel.size());
        }
        catch (Exception e) {
            throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.EVENT_EMITTING_FAILED, new Object[]{topic, e});
        }
    }

    public void registerListener(MessagingBrokerQueueListener listener) {
        if (this.listener != null) {
            throw new IllegalStateException("Only one listener is expected to be registered");
        }
        this.listener = listener;
    }

    public void subscribeTopic(String topic) {
        this.subscriptions.add(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void receivedRawEvent() {
        ArrayList<MessageLine> toPublishLines = new ArrayList<MessageLine>();
        try (RandomAccessFile raFile = new RandomAccessFile(this.file, "rw");
             FileChannel channel = raFile.getChannel();
             FileLock lock = this.obtainFileLock(60000L, channel);){
            MessageLine line;
            ArrayList<MessageLine> leftLines = new ArrayList<MessageLine>();
            while ((line = this.readNextLine(raFile)) != null) {
                if (this.subscriptions.contains(line.getTopic())) {
                    toPublishLines.add(line);
                    continue;
                }
                leftLines.add(line);
            }
            if (!toPublishLines.isEmpty()) {
                raFile.setLength(0L);
                for (MessageLine l : leftLines) {
                    raFile.write(l.toString().getBytes(StandardCharsets.UTF_8));
                }
            }
        }
        catch (Exception e) {
            logger.error("Could not read messages from '{}'", (Object)this.file, (Object)e);
        }
        finally {
            if (this.listener != null) {
                toPublishLines.forEach(message -> {
                    try {
                        this.listener.receivedMessage(message.getData(), message.getTopic(), UUID.randomUUID().toString());
                    }
                    catch (Throwable e) {
                        logger.error("The received message with topic '{}' could not be handled", (Object)message.getTopic(), (Object)e);
                    }
                });
            }
        }
    }

    private MessageLine readNextLine(RandomAccessFile file) throws IOException {
        String line = file.readLine();
        if (line != null) {
            return new MessageLine(new String(line.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8));
        }
        return null;
    }

    private FileLock obtainFileLock(long timeout, FileChannel channel) throws IOException, InterruptedException, TimeoutException {
        Long quitTime = System.currentTimeMillis() + timeout;
        do {
            try {
                FileLock lock = channel.tryLock();
                if (lock != null) {
                    return lock;
                }
                Thread.sleep(1000L);
            }
            catch (OverlappingFileLockException e) {
                Thread.sleep(1000L);
            }
        } while (System.currentTimeMillis() < quitTime);
        throw new TimeoutException();
    }

    private static class MessageLine {
        private final String topic;
        private final String data;

        public MessageLine(String line) throws IOException {
            String message = line.trim();
            int separator = message.indexOf(32);
            if (separator == -1) {
                throw new IOException("Could not find separator between topic and data in message");
            }
            this.topic = message.substring(0, separator);
            this.data = message.substring(separator).trim();
        }

        private MessageLine(String topic, String data) {
            this.topic = topic;
            this.data = data;
        }

        public String getTopic() {
            return this.topic;
        }

        public String getData() {
            return this.data;
        }

        public String toString() {
            return this.topic + ' ' + this.data + System.lineSeparator();
        }
    }
}

