/*
 * Decompiled with CFR 0.152.
 */
package darabonba.core.async;

import com.aliyun.core.utils.FunctionalUtils;
import darabonba.core.async.AsyncResponseHandler;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class FileAsyncResponseHandler<ResponseT>
implements AsyncResponseHandler<ResponseT, ResponseT> {
    private final Path path;
    private FileOutputStream fileOutputStream;

    public FileAsyncResponseHandler(Path path) {
        this.path = path;
    }

    private FileOutputStream createFileOutputStream(Path path) throws IOException {
        return new FileOutputStream(new File(path.toUri()));
    }

    @Override
    public void onStream(Publisher<ByteBuffer> publisher) {
        this.fileOutputStream = FunctionalUtils.invokeSafely(() -> this.createFileOutputStream(this.path));
        publisher.subscribe((Subscriber)new FileSubscriber(this.fileOutputStream, this.path, this::exceptionOccurred));
    }

    @Override
    public ResponseT transform(ResponseT response) {
        return response;
    }

    private void exceptionOccurred(Throwable throwable) {
        try {
            if (this.fileOutputStream != null) {
                FunctionalUtils.invokeSafely(this.fileOutputStream::close);
            }
        }
        finally {
            FunctionalUtils.invokeSafely(() -> Files.deleteIfExists(this.path));
        }
    }

    static class FileSubscriber
    implements Subscriber<ByteBuffer> {
        private final AtomicLong position = new AtomicLong();
        private final Path path;
        private final Consumer<Throwable> onErrorMethod;
        private final FileOutputStream fileOutputStream;
        private final OutputStream outputStream;
        private volatile boolean writeInProgress = false;
        private volatile boolean closeOnLastWrite = false;
        private Subscription subscription;

        FileSubscriber(FileOutputStream fileOutputStream, Path path, Consumer<Throwable> onErrorMethod) {
            this.fileOutputStream = fileOutputStream;
            this.outputStream = new BufferedOutputStream(fileOutputStream);
            this.path = path;
            this.onErrorMethod = onErrorMethod;
        }

        public void onSubscribe(Subscription s) {
            if (this.subscription != null) {
                s.cancel();
                return;
            }
            this.subscription = s;
            s.request(1L);
        }

        public void onNext(ByteBuffer byteBuffer) {
            if (byteBuffer == null) {
                throw new NullPointerException("Element must not be null");
            }
            if (this.outputStream != null) {
                this.performWrite(byteBuffer);
            }
        }

        private void performWrite(ByteBuffer byteBuffer) {
            byte[] b = byteBuffer.array();
            int off = byteBuffer.arrayOffset() + byteBuffer.position();
            int len = byteBuffer.remaining();
            FunctionalUtils.invokeSafely(() -> this.outputStream.write(b, off, len));
            this.subscription.request(1L);
        }

        public void onError(Throwable t) {
            this.onErrorMethod.accept(t);
        }

        public void onComplete() {
            this.close();
        }

        private void close() {
            if (this.outputStream != null) {
                FunctionalUtils.invokeSafely(this.outputStream::close);
            }
        }

        public String toString() {
            return this.getClass() + ":" + this.path.toString();
        }
    }
}

