/*
 * Decompiled with CFR 0.152.
 */
package io.dingodb.net.netty.service;

import com.google.auto.service.AutoService;
import io.dingodb.common.Location;
import io.dingodb.common.codec.PrimitiveCodec;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.util.NoBreakFunctions;
import io.dingodb.net.Message;
import io.dingodb.net.netty.NettyNetService;
import io.dingodb.net.netty.NettyNetServiceProvider;
import io.dingodb.net.netty.channel.Channel;
import io.dingodb.net.netty.packet.Type;
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

@AutoService(value={io.dingodb.net.service.FileTransferService.class})
public class FileTransferService
implements io.dingodb.net.service.FileTransferService {
    private static final NettyNetService netService = NettyNetServiceProvider.NET_SERVICE_INSTANCE;
    private final int block = 0x400000;

    @Override
    public void mkdir(Location location, Path target) {
    }

    @Override
    public void transfer(Location location, Path source, Path target) {
        if (!Files.exists(source, new LinkOption[0])) {
            throw new IllegalArgumentException(source + " not found.");
        }
        if (Files.isDirectory(source, new LinkOption[0])) {
            File[] files = source.toFile().listFiles();
            if (files == null || files.length == 0) {
                return;
            }
            CountDownLatch countDownLatch = new CountDownLatch(files.length);
            Arrays.stream(files).map(File::toPath).forEach(_p -> Executors.submit("transfer-to-" + location.getUrl(), () -> this.transfer(location, (Path)_p, target.resolve(_p.getFileName()))).thenRun(countDownLatch::countDown));
            try {
                countDownLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return;
        }
        Channel ch = netService.newChannel(location);
        CompletableFuture future = new CompletableFuture();
        ch.directListener(__ -> future.complete(null));
        ch.setCloseListener(__ -> {
            if (!future.isDone()) {
                future.completeExceptionally(new RuntimeException("Unknown!"));
            }
        });
        try (FileChannel fileChannel = FileChannel.open(source, StandardOpenOption.READ);){
            int read;
            ByteBuf buffer;
            ch.send(new Message("FILE_TRANSFER", PrimitiveCodec.encodeString(target.toAbsolutePath().toString())));
            long size = fileChannel.size();
            for (long position = 0L; position < size; position += (long)buffer.writeBytes(fileChannel, position, read)) {
                read = (int)Math.min(size - position, 0x400000L);
                buffer = ch.buffer(Type.USER_DEFINE, read);
                ch.send(buffer);
            }
            ch.send(ch.buffer(Type.USER_DEFINE, 0));
            future.join();
        }
        catch (Exception e) {
            ch.close();
            throw new RuntimeException(e);
        }
    }

    static {
        netService.registerTagMessageListener("FILE_TRANSFER", (msg, ch) -> {
            try {
                ((Channel)ch).directListener(new Receiver(Paths.get(PrimitiveCodec.readString(msg.content()), new String[0]), (Channel)ch));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    static class Receiver
    implements Consumer<ByteBuffer> {
        private final FileChannel fileChannel;
        private final Path path;
        private final Channel channel;

        public Receiver(Path path, Channel channel) throws Exception {
            Files.deleteIfExists(path);
            Files.createDirectories(path.getParent(), new FileAttribute[0]);
            this.path = path;
            this.fileChannel = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
            this.channel = channel;
            channel.setCloseListener(NoBreakFunctions.wrap(ch -> this.fileChannel.close()));
        }

        @Override
        public void accept(ByteBuffer buffer) {
            try {
                if (!buffer.hasRemaining()) {
                    this.channel.send(Message.EMPTY);
                    this.channel.close();
                } else {
                    this.fileChannel.write(buffer);
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

