/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.util;

import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileLockInterruptionException;
import java.nio.channels.NonWritableChannelException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.Collections;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.context.ContextView;

public class FluxUtilTest {
    @Test
    public void testCallWithContextGetSingle() {
        StepVerifier.create((Publisher)this.getSingle().contextWrite((ContextView)reactor.util.context.Context.of((Object)"FirstName", (Object)"Foo", (Object)"LastName", (Object)"Bar"))).assertNext(response -> Assertions.assertEquals((Object)"Hello, Foo Bar", (Object)response)).verifyComplete();
    }

    @Test
    public void testCallWithContextGetCollection() {
        StepVerifier.create((Publisher)this.getCollection().contextWrite((ContextView)reactor.util.context.Context.of((Object)"FirstName", (Object)"Foo", (Object)"LastName", (Object)"Bar"))).assertNext(response -> Assertions.assertEquals((Object)"Hello,", (Object)response)).assertNext(response -> Assertions.assertEquals((Object)"Foo", (Object)response)).assertNext(response -> Assertions.assertEquals((Object)"Bar", (Object)response)).verifyComplete();
    }

    @Test
    public void testCallWithDefaultContextGetSingle() {
        StepVerifier.create((Publisher)this.getSingleWithContextAttributes().contextWrite((ContextView)reactor.util.context.Context.of((Object)"FirstName", (Object)"Foo"))).assertNext(response -> Assertions.assertEquals((Object)"Hello, Foo additionalContextValue", (Object)response)).verifyComplete();
    }

    @Test
    public void toReactorContextNull() {
        Assertions.assertTrue((boolean)FluxUtil.toReactorContext(null).isEmpty());
    }

    @Test
    public void toReactorContextContextNone() {
        Assertions.assertTrue((boolean)FluxUtil.toReactorContext((Context)Context.NONE).isEmpty());
    }

    @Test
    public void toReactorContextCleansesNullValues() {
        Assertions.assertTrue((boolean)FluxUtil.toReactorContext((Context)new Context((Object)"key", null)).isEmpty());
    }

    @Test
    public void toReactorContext() {
        Context context = new Context((Object)"key1", (Object)"value1");
        reactor.util.context.Context reactorContext = FluxUtil.toReactorContext((Context)context);
        Assertions.assertEquals((int)1, (int)reactorContext.size());
        Assertions.assertTrue((boolean)reactorContext.hasKey((Object)"key1"));
        Assertions.assertEquals((Object)"value1", (Object)reactorContext.get((Object)"key1"));
        context = context.addData((Object)"key2", (Object)"value2").addData((Object)"key1", (Object)"value3");
        reactorContext = FluxUtil.toReactorContext((Context)context);
        Assertions.assertEquals((int)2, (int)reactorContext.size());
        Assertions.assertTrue((boolean)reactorContext.hasKey((Object)"key1"));
        Assertions.assertEquals((Object)"value3", (Object)reactorContext.get((Object)"key1"));
        Assertions.assertTrue((boolean)reactorContext.hasKey((Object)"key2"));
        Assertions.assertEquals((Object)"value2", (Object)reactorContext.get((Object)"key2"));
    }

    @Test
    public void testIsFluxByteBufferInvalidType() {
        Assertions.assertFalse((boolean)FluxUtil.isFluxByteBuffer(Mono.class));
    }

    @Test
    public void testIsFluxByteBufferValidType() throws Exception {
        Method method = FluxUtilTest.class.getMethod("mockReturnType", new Class[0]);
        Type returnType = method.getGenericReturnType();
        Assertions.assertTrue((boolean)FluxUtil.isFluxByteBuffer((Type)returnType));
    }

    @Test
    public void testToMono() {
        String testValue = "some value";
        SimpleResponse response = new SimpleResponse(new HttpRequest(HttpMethod.GET, "http://www.test.com"), 202, new HttpHeaders(), (Object)testValue);
        StepVerifier.create((Publisher)FluxUtil.toMono((Response)response)).assertNext(val -> Assertions.assertEquals((Object)val, (Object)testValue)).verifyComplete();
    }

    @Test
    public void testMonoError() {
        String errMsg = "It is an error message";
        RuntimeException ex = new RuntimeException(errMsg);
        ClientLogger logger = new ClientLogger(FluxUtilTest.class);
        StepVerifier.create((Publisher)FluxUtil.monoError((ClientLogger)logger, (RuntimeException)ex)).verifyErrorMessage(errMsg);
    }

    @Test
    public void testFluxError() {
        String errMsg = "It is an error message";
        RuntimeException ex = new RuntimeException(errMsg);
        ClientLogger logger = new ClientLogger(FluxUtilTest.class);
        StepVerifier.create((Publisher)FluxUtil.fluxError((ClientLogger)logger, (RuntimeException)ex)).verifyErrorMessage(errMsg);
    }

    @Test
    public void testPageFluxError() {
        String errMsg = "It is an error message";
        RuntimeException ex = new RuntimeException(errMsg);
        ClientLogger logger = new ClientLogger(FluxUtilTest.class);
        StepVerifier.create((Publisher)FluxUtil.pagedFluxError((ClientLogger)logger, (RuntimeException)ex)).verifyErrorMessage(errMsg);
    }

    @Test
    public void testWriteFile() throws Exception {
        String toReplace = "test";
        String original = "hello there";
        String target = "testo there";
        Flux body = Flux.just((Object)ByteBuffer.wrap(toReplace.getBytes(StandardCharsets.UTF_8)));
        File file = this.createFileIfNotExist();
        try (FileOutputStream stream = new FileOutputStream(file);){
            stream.write(original.getBytes(StandardCharsets.UTF_8));
        }
        try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.WRITE);){
            FluxUtil.writeFile((Flux)body, (AsynchronousFileChannel)channel).block();
            byte[] outputStream = Files.readAllBytes(file.toPath());
            Assertions.assertArrayEquals((byte[])outputStream, (byte[])target.getBytes(StandardCharsets.UTF_8));
        }
    }

    @ParameterizedTest
    @MethodSource(value={"writeFileDoesNotSwallowErrorSupplier"})
    public void writeFileDoesNotSwallowError(Flux<ByteBuffer> data, AsynchronousFileChannel channel, Class<? extends Throwable> expectedException) {
        Flux writeFile = Flux.using(() -> channel, c -> FluxUtil.writeFile((Flux)data, (AsynchronousFileChannel)c), c -> {
            try {
                c.close();
            }
            catch (IOException ex) {
                throw new UncheckedIOException(ex);
            }
        });
        StepVerifier.create((Publisher)writeFile).expectError(expectedException).verify(Duration.ofSeconds(30L));
    }

    private static Stream<Arguments> writeFileDoesNotSwallowErrorSupplier() throws IOException {
        Path nonWritableFile = Files.createTempFile("nonWritableFile" + UUID.randomUUID(), ".txt", new FileAttribute[0]);
        nonWritableFile.toFile().deleteOnExit();
        AsynchronousFileChannel nonWritableChannel = AsynchronousFileChannel.open(nonWritableFile, StandardOpenOption.READ);
        Flux exceptionThrowingFlux = Flux.generate(() -> 0, (count, sink) -> {
            if (count == 10) {
                sink.error((Throwable)new IOException());
                return count;
            }
            sink.next((Object)ByteBuffer.allocate(16));
            return count + 1;
        });
        Path exceptionThrowingFile = Files.createTempFile("exceptionThrowingFile" + UUID.randomUUID(), ".txt", new FileAttribute[0]);
        exceptionThrowingFile.toFile().deleteOnExit();
        AsynchronousFileChannel exceptionThrowingChannel = AsynchronousFileChannel.open(exceptionThrowingFile, StandardOpenOption.WRITE);
        final byte[] data = new byte[4096];
        new SecureRandom().nextBytes(data);
        Flux<ByteBuffer> ignoresRequestFlux = new Flux<ByteBuffer>(){

            public void subscribe(final CoreSubscriber<? super ByteBuffer> actual) {
                actual.onSubscribe(new Subscription(){

                    public void request(long n) {
                        IntStream.range(0, 16).forEach(ignored -> actual.onNext((Object)ByteBuffer.wrap(data)));
                        actual.onComplete();
                    }

                    public void cancel() {
                    }
                });
            }
        };
        AsynchronousFileChannel ignoresRequestChannel = (AsynchronousFileChannel)Mockito.mock(AsynchronousFileChannel.class);
        Timer timer = new Timer(true);
        ((AsynchronousFileChannel)Mockito.doAnswer(invocation -> {
            final ByteBuffer stream = (ByteBuffer)invocation.getArgument(0);
            final CompletionHandler completionHandler = (CompletionHandler)invocation.getArgument(3);
            timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    int remaining = stream.remaining();
                    stream.position(stream.limit());
                    completionHandler.completed(remaining, stream);
                }
            }, 100L);
            return null;
        }).when((Object)ignoresRequestChannel)).write((ByteBuffer)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.any(), (CompletionHandler)ArgumentMatchers.any());
        AsynchronousFileChannel completionHandlerPropagatesError = (AsynchronousFileChannel)Mockito.mock(AsynchronousFileChannel.class);
        ((AsynchronousFileChannel)Mockito.doAnswer(invocation -> {
            CompletionHandler completionHandler = (CompletionHandler)invocation.getArgument(3);
            completionHandler.failed(new FileLockInterruptionException(), (ByteBuffer)invocation.getArgument(0));
            return null;
        }).when((Object)completionHandlerPropagatesError)).write((ByteBuffer)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.any(), (CompletionHandler)ArgumentMatchers.any());
        return Stream.of(Arguments.of((Object[])new Object[]{Flux.just((Object)ByteBuffer.allocate(0)), nonWritableChannel, NonWritableChannelException.class}), Arguments.of((Object[])new Object[]{exceptionThrowingFlux, exceptionThrowingChannel, IOException.class}), Arguments.of((Object[])new Object[]{ignoresRequestFlux, ignoresRequestChannel, IllegalStateException.class}), Arguments.of((Object[])new Object[]{Flux.just((Object)ByteBuffer.allocate(0)), completionHandlerPropagatesError, FileLockInterruptionException.class}));
    }

    @Test
    public void writingRetriableStreamThatFails() throws IOException {
        byte[] data = new byte[0x400000];
        new SecureRandom().nextBytes(data);
        AtomicInteger errorCount = new AtomicInteger();
        Flux retriableStream = FluxUtil.createRetriableDownloadFlux(() -> this.generateStream(data, 0L, errorCount), (throwable, position) -> this.generateStream(data, (long)position, errorCount), (int)5);
        Path file = Files.createTempFile("writingRetriableStreamThatFails" + UUID.randomUUID(), ".txt", new FileAttribute[0]);
        file.toFile().deleteOnExit();
        Flux writeFile = Flux.using(() -> AsynchronousFileChannel.open(file, StandardOpenOption.WRITE), channel -> FluxUtil.writeFile((Flux)retriableStream, (AsynchronousFileChannel)channel), channel -> {
            try {
                channel.close();
            }
            catch (IOException ex) {
                throw new UncheckedIOException(ex);
            }
        });
        StepVerifier.create((Publisher)writeFile).expectComplete().verify(Duration.ofSeconds(30L));
        byte[] writtenData = Files.readAllBytes(file);
        Assertions.assertArrayEquals((byte[])data, (byte[])writtenData);
    }

    private Flux<ByteBuffer> generateStream(byte[] data, long offset, AtomicInteger errorCount) {
        long[] pos = new long[]{offset};
        return Flux.push(emitter -> {
            while (pos[0] != (long)data.length) {
                double random = Math.random();
                if (random < 0.05 && errorCount.getAndIncrement() < 5) {
                    emitter.error((Throwable)new IOException());
                    return;
                }
                int readCount = (int)Math.min(4096L, (long)data.length - pos[0]);
                emitter.next((Object)ByteBuffer.wrap(data, (int)pos[0], readCount));
                pos[0] = pos[0] + (long)readCount;
            }
            emitter.complete();
        });
    }

    @Test
    public void readFile() throws IOException {
        byte[] expectedFileBytes = new byte[0xA00000];
        SecureRandom random = new SecureRandom();
        random.nextBytes(expectedFileBytes);
        File file = this.createFileIfNotExist();
        try (FileOutputStream stream = new FileOutputStream(file);){
            stream.write(expectedFileBytes);
        }
        try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);){
            StepVerifier.create((Publisher)FluxUtil.collectBytesInByteBufferStream((Flux)FluxUtil.readFile((AsynchronousFileChannel)channel), (int)expectedFileBytes.length)).assertNext(bytes -> Assertions.assertArrayEquals((byte[])expectedFileBytes, (byte[])bytes)).verifyComplete();
        }
        Files.deleteIfExists(file.toPath());
    }

    @ParameterizedTest
    @MethodSource(value={"toFluxByteBufferSupplier"})
    public void toFluxByteBuffer(InputStream inputStream, Integer chunkSize, byte[] expected) {
        Flux conversionFlux;
        Flux flux = conversionFlux = chunkSize == null ? FluxUtil.toFluxByteBuffer((InputStream)inputStream) : FluxUtil.toFluxByteBuffer((InputStream)inputStream, (int)chunkSize);
        if (inputStream == null || expected.length == 0) {
            StepVerifier.create((Publisher)conversionFlux).verifyComplete();
            return;
        }
        int unboxedChunkSize = chunkSize == null ? 4096 : chunkSize;
        AtomicLong requestCount = new AtomicLong((long)Math.ceil((double)expected.length / (double)unboxedChunkSize));
        ByteBuffer collectionBuffer = ByteBuffer.allocate(expected.length);
        StepVerifier.create((Publisher)FluxUtil.collectBytesInByteBufferStream((Flux)conversionFlux)).thenRequest(requestCount.get()).thenConsumeWhile(bytes -> {
            collectionBuffer.put((byte[])bytes, collectionBuffer.position(), ((byte[])bytes).length);
            if (requestCount.decrementAndGet() == -1L) {
                Assertions.assertArrayEquals((byte[])expected, (byte[])collectionBuffer.array());
                return false;
            }
            return true;
        }).verifyComplete();
    }

    private static Stream<Arguments> toFluxByteBufferSupplier() {
        byte[] emptyBuffer = new byte[]{};
        byte[] singleRead = new byte[4096];
        byte[] multipleReads = new byte[8193];
        SecureRandom random = new SecureRandom();
        random.nextBytes(singleRead);
        random.nextBytes(multipleReads);
        return Stream.of(Arguments.arguments((Object[])new Object[]{null, null, emptyBuffer}), Arguments.arguments((Object[])new Object[]{new ByteArrayInputStream(emptyBuffer), null, emptyBuffer}), Arguments.arguments((Object[])new Object[]{new ByteArrayInputStream(singleRead), null, singleRead}), Arguments.arguments((Object[])new Object[]{new ByteArrayInputStream(multipleReads), null, multipleReads}), Arguments.arguments((Object[])new Object[]{new ByteArrayInputStream(singleRead), 8192, singleRead}), Arguments.arguments((Object[])new Object[]{new ByteArrayInputStream(singleRead), 2048, singleRead}), Arguments.arguments((Object[])new Object[]{new ByteArrayInputStream(multipleReads), 5432, multipleReads}));
    }

    @Test
    public void toFluxByteBufferMultipleSubscriptions() {
        byte[] singleRead = new byte[4096];
        new SecureRandom().nextBytes(singleRead);
        ByteArrayInputStream inputStream = new ByteArrayInputStream(singleRead);
        Flux conversionFlux = FluxUtil.toFluxByteBuffer((InputStream)inputStream);
        StepVerifier.create((Publisher)FluxUtil.collectBytesInByteBufferStream((Flux)conversionFlux)).assertNext(actual -> Assertions.assertArrayEquals((byte[])singleRead, (byte[])actual)).verifyComplete();
        StepVerifier.create((Publisher)FluxUtil.collectBytesInByteBufferStream((Flux)conversionFlux)).assertNext(actual -> Assertions.assertArrayEquals((byte[])new byte[0], (byte[])actual)).verifyComplete();
    }

    @Test
    public void illegalToFluxByteBufferChunkSize() {
        StepVerifier.create((Publisher)FluxUtil.toFluxByteBuffer(null, (int)0)).verifyError(IllegalArgumentException.class);
        StepVerifier.create((Publisher)FluxUtil.toFluxByteBuffer(null, (int)-1)).verifyError(IllegalArgumentException.class);
    }

    @Test
    public void toFluxByteBufferSinkException() throws IOException {
        InputStream inputStream = (InputStream)Mockito.mock(InputStream.class);
        Mockito.when((Object)inputStream.read((byte[])ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt())).thenThrow(new Throwable[]{new IOException("error")});
        StepVerifier.create((Publisher)FluxUtil.toFluxByteBuffer((InputStream)inputStream)).verifyError(IOException.class);
    }

    public Flux<ByteBuffer> mockReturnType() {
        return Flux.just((Object)ByteBuffer.wrap(new byte[0]));
    }

    private Mono<String> getSingle() {
        return FluxUtil.withContext(this::serviceCallSingle);
    }

    private Flux<String> getCollection() {
        return FluxUtil.fluxContext(this::serviceCallCollection);
    }

    private Mono<String> getSingleWithContextAttributes() {
        return FluxUtil.withContext(this::serviceCallWithContextMetadata, Collections.singletonMap("additionalContextKey", "additionalContextValue"));
    }

    private Mono<String> serviceCallSingle(Context context) {
        String msg = "Hello, " + context.getData((Object)"FirstName").orElse("Stranger") + " " + context.getData((Object)"LastName").orElse("");
        return Mono.just((Object)msg);
    }

    private Flux<String> serviceCallCollection(Context context) {
        String msg = "Hello, " + context.getData((Object)"FirstName").orElse("Stranger") + " " + context.getData((Object)"LastName").orElse("");
        return Flux.just((Object[])msg.split(" "));
    }

    private Mono<String> serviceCallWithContextMetadata(Context context) {
        String msg = "Hello, " + context.getData((Object)"FirstName").orElse("Stranger") + " " + context.getData((Object)"additionalContextKey").orElse("Not found");
        return Mono.just((Object)msg);
    }

    private File createFileIfNotExist() {
        String fileName = UUID.randomUUID().toString();
        File file = new File("target");
        if (!file.exists() && !file.getParentFile().mkdirs()) {
            throw new RuntimeException("Unable to create directories: " + file.getAbsolutePath());
        }
        try {
            return Files.createTempFile(file.toPath(), fileName, "", new FileAttribute[0]).toFile();
        }
        catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }
}

