/*
 * Decompiled with CFR 0.152.
 */
package karate.com.linecorp.armeria.common.stream;

import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import karate.com.linecorp.armeria.common.CommonPools;
import karate.com.linecorp.armeria.common.HttpData;
import karate.com.linecorp.armeria.common.RequestContext;
import karate.com.linecorp.armeria.common.annotation.Nullable;
import karate.com.linecorp.armeria.common.annotation.UnstableApi;
import karate.com.linecorp.armeria.common.stream.AsyncMapStreamMessage;
import karate.com.linecorp.armeria.common.stream.ByteStreamMessage;
import karate.com.linecorp.armeria.common.stream.ByteStreamMessageOutputStream;
import karate.com.linecorp.armeria.common.stream.ConcatArrayStreamMessage;
import karate.com.linecorp.armeria.common.stream.ConcatPublisherStreamMessage;
import karate.com.linecorp.armeria.common.stream.DefaultStreamMessage;
import karate.com.linecorp.armeria.common.stream.DefaultStreamMessageDuplicator;
import karate.com.linecorp.armeria.common.stream.DeferredStreamMessage;
import karate.com.linecorp.armeria.common.stream.FlatMapStreamMessage;
import karate.com.linecorp.armeria.common.stream.FuseableStreamMessage;
import karate.com.linecorp.armeria.common.stream.InputStreamStreamMessageBuilder;
import karate.com.linecorp.armeria.common.stream.NoopSubscriber;
import karate.com.linecorp.armeria.common.stream.PathStreamMessageBuilder;
import karate.com.linecorp.armeria.common.stream.PublisherBasedStreamMessage;
import karate.com.linecorp.armeria.common.stream.StreamDecoder;
import karate.com.linecorp.armeria.common.stream.StreamMessageCollector;
import karate.com.linecorp.armeria.common.stream.StreamMessageDuplicator;
import karate.com.linecorp.armeria.common.stream.StreamMessageInputStream;
import karate.com.linecorp.armeria.common.stream.StreamMessageUtil;
import karate.com.linecorp.armeria.common.stream.StreamMessages;
import karate.com.linecorp.armeria.common.stream.StreamTimeoutMode;
import karate.com.linecorp.armeria.common.stream.StreamWriter;
import karate.com.linecorp.armeria.common.stream.SubscribeOnStreamMessage;
import karate.com.linecorp.armeria.common.stream.SubscriptionOption;
import karate.com.linecorp.armeria.common.stream.TimeoutStreamMessage;
import karate.com.linecorp.armeria.common.util.BlockingTaskExecutor;
import karate.com.linecorp.armeria.common.util.Exceptions;
import karate.com.linecorp.armeria.internal.common.stream.AbortedStreamMessage;
import karate.com.linecorp.armeria.internal.common.stream.DecodedStreamMessage;
import karate.com.linecorp.armeria.internal.common.stream.EmptyFixedStreamMessage;
import karate.com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil;
import karate.com.linecorp.armeria.internal.common.stream.OneElementFixedStreamMessage;
import karate.com.linecorp.armeria.internal.common.stream.RecoverableStreamMessage;
import karate.com.linecorp.armeria.internal.common.stream.RegularFixedStreamMessage;
import karate.com.linecorp.armeria.internal.common.stream.SurroundingPublisher;
import karate.com.linecorp.armeria.internal.common.stream.ThreeElementFixedStreamMessage;
import karate.com.linecorp.armeria.internal.common.stream.TwoElementFixedStreamMessage;
import karate.com.linecorp.armeria.internal.shaded.guava.base.Preconditions;
import karate.com.linecorp.armeria.internal.shaded.guava.collect.ImmutableList;
import karate.com.linecorp.armeria.internal.shaded.guava.collect.Iterables;
import karate.com.linecorp.armeria.internal.shaded.guava.collect.ObjectArrays;
import karate.com.linecorp.armeria.server.ServiceRequestContext;
import karate.io.netty.buffer.ByteBufAllocator;
import karate.io.netty.channel.EventLoop;
import karate.io.netty.util.concurrent.EventExecutor;
import karate.org.reactivestreams.Publisher;
import karate.org.reactivestreams.Subscriber;

public interface StreamMessage<T>
extends Publisher<T> {
    public static <T> StreamMessage<T> of() {
        return new EmptyFixedStreamMessage();
    }

    public static <T> StreamMessage<T> of(T obj) {
        Objects.requireNonNull(obj, "obj");
        return new OneElementFixedStreamMessage<T>(obj);
    }

    public static <T> StreamMessage<T> of(T obj1, T obj2) {
        Objects.requireNonNull(obj1, "obj1");
        Objects.requireNonNull(obj2, "obj2");
        return new TwoElementFixedStreamMessage<T>(obj1, obj2);
    }

    public static <T> StreamMessage<T> of(T obj1, T obj2, T obj3) {
        Objects.requireNonNull(obj1, "obj1");
        Objects.requireNonNull(obj2, "obj2");
        Objects.requireNonNull(obj3, "obj3");
        return new ThreeElementFixedStreamMessage<T>(obj1, obj2, obj3);
    }

    @SafeVarargs
    public static <T> StreamMessage<T> of(T ... objs) {
        Objects.requireNonNull(objs, "objs");
        switch (objs.length) {
            case 0: {
                return StreamMessage.of();
            }
            case 1: {
                return StreamMessage.of(objs[0]);
            }
            case 2: {
                return StreamMessage.of(objs[0], objs[1]);
            }
            case 3: {
                return StreamMessage.of(objs[0], objs[1], objs[2]);
            }
        }
        for (int i = 0; i < objs.length; ++i) {
            if (objs[i] != null) continue;
            throw new NullPointerException("objs[" + i + "] is null");
        }
        return new RegularFixedStreamMessage<T>(objs);
    }

    public static <T> StreamMessage<T> of(Publisher<? extends T> publisher) {
        Objects.requireNonNull(publisher, "publisher");
        if (publisher instanceof StreamMessage) {
            StreamMessage cast = (StreamMessage)publisher;
            return cast;
        }
        return new PublisherBasedStreamMessage<T>(publisher);
    }

    @UnstableApi
    public static <T> StreamMessage<T> of(CompletableFuture<? extends Publisher<? extends T>> future) {
        Objects.requireNonNull(future, "stage");
        return StreamMessageUtil.createStreamMessageFrom(future);
    }

    @UnstableApi
    public static <T> StreamMessage<T> of(CompletionStage<? extends Publisher<? extends T>> stage) {
        Objects.requireNonNull(stage, "stage");
        DeferredStreamMessage deferred = new DeferredStreamMessage();
        deferred.delegateOnCompletion(stage);
        return deferred;
    }

    public static <T> StreamMessage<T> of(CompletionStage<? extends StreamMessage<? extends T>> stage, EventExecutor subscriberExecutor) {
        Objects.requireNonNull(stage, "stage");
        Objects.requireNonNull(subscriberExecutor, "subscriberExecutor");
        DeferredStreamMessage deferred = new DeferredStreamMessage(subscriberExecutor);
        deferred.delegateOnCompletion(stage);
        return deferred;
    }

    public static ByteStreamMessage of(File file) {
        Objects.requireNonNull(file, "file");
        return StreamMessage.of(file.toPath());
    }

    public static ByteStreamMessage of(Path path) {
        Objects.requireNonNull(path, "path");
        return StreamMessage.builder(path).build();
    }

    @UnstableApi
    public static PathStreamMessageBuilder builder(Path path) {
        Objects.requireNonNull(path, "path");
        return new PathStreamMessageBuilder(path);
    }

    @Deprecated
    public static ByteStreamMessage of(Path path, int bufferSize) {
        return ((PathStreamMessageBuilder)StreamMessage.builder(path).bufferSize(bufferSize)).build();
    }

    @Deprecated
    public static ByteStreamMessage of(Path path, ByteBufAllocator alloc, int bufferSize) {
        Objects.requireNonNull(path, "path");
        Objects.requireNonNull(alloc, "alloc");
        Preconditions.checkArgument(bufferSize > 0, "bufferSize: %s (expected: > 0)", bufferSize);
        return ((PathStreamMessageBuilder)StreamMessage.builder(path).alloc(alloc).bufferSize(bufferSize)).build();
    }

    @Deprecated
    public static ByteStreamMessage of(Path path, @Nullable ExecutorService executor, ByteBufAllocator alloc, int bufferSize) {
        Objects.requireNonNull(path, "path");
        Objects.requireNonNull(alloc, "alloc");
        PathStreamMessageBuilder builder = StreamMessage.builder(path);
        if (executor != null) {
            builder.executor(executor);
        }
        return ((PathStreamMessageBuilder)builder.alloc(alloc).bufferSize(bufferSize)).build();
    }

    @UnstableApi
    public static ByteStreamMessage of(InputStream inputStream) {
        Objects.requireNonNull(inputStream, "inputStream");
        return StreamMessage.builder(inputStream).build();
    }

    @UnstableApi
    public static InputStreamStreamMessageBuilder builder(InputStream inputStream) {
        Objects.requireNonNull(inputStream, "inputStream");
        return new InputStreamStreamMessageBuilder(inputStream);
    }

    public static ByteStreamMessage fromOutputStream(Consumer<? super OutputStream> outputStreamConsumer) {
        Object ctx = RequestContext.currentOrNull();
        BlockingTaskExecutor blockingTaskExecutor = ctx instanceof ServiceRequestContext ? ((ServiceRequestContext)ctx).blockingTaskExecutor() : CommonPools.blockingTaskExecutor();
        return StreamMessage.fromOutputStream(outputStreamConsumer, blockingTaskExecutor);
    }

    public static ByteStreamMessage fromOutputStream(Consumer<? super OutputStream> outputStreamConsumer, Executor blockingTaskExecutor) {
        return new ByteStreamMessageOutputStream(outputStreamConsumer, blockingTaskExecutor);
    }

    @SafeVarargs
    public static <T> StreamMessage<T> concat(Publisher<? extends T> ... publishers) {
        Objects.requireNonNull(publishers, "publishers");
        return StreamMessage.concat(ImmutableList.copyOf(publishers));
    }

    public static <T> StreamMessage<T> concat(Iterable<? extends Publisher<? extends T>> publishers) {
        Objects.requireNonNull(publishers, "publishers");
        if (Iterables.isEmpty(publishers)) {
            return StreamMessage.of();
        }
        List streamMessages = ImmutableList.copyOf(publishers).stream().map(StreamMessage::of).collect(ImmutableList.toImmutableList());
        return new ConcatArrayStreamMessage(streamMessages);
    }

    public static <T> StreamMessage<T> concat(Publisher<? extends Publisher<? extends T>> publishers) {
        Objects.requireNonNull(publishers, "publishers");
        return new ConcatPublisherStreamMessage(StreamMessage.of(publishers));
    }

    public static <T> StreamMessage<T> aborted(Throwable cause) {
        Objects.requireNonNull(cause, "cause");
        return new AbortedStreamMessage(cause);
    }

    @UnstableApi
    public static <T> StreamWriter<T> streaming() {
        return new DefaultStreamMessage();
    }

    public boolean isOpen();

    public boolean isEmpty();

    public long demand();

    default public boolean isComplete() {
        return this.whenComplete().isDone();
    }

    public CompletableFuture<Void> whenComplete();

    default public CompletableFuture<Void> subscribe() {
        return this.subscribe(this.defaultSubscriberExecutor());
    }

    @UnstableApi
    default public CompletableFuture<Void> subscribe(EventExecutor executor) {
        Objects.requireNonNull(executor, "executor");
        this.subscribe(NoopSubscriber.get(), executor);
        return this.whenComplete();
    }

    @Override
    default public void subscribe(Subscriber<? super T> subscriber) {
        this.subscribe(subscriber, this.defaultSubscriberExecutor());
    }

    default public void subscribe(Subscriber<? super T> subscriber, SubscriptionOption ... options) {
        this.subscribe(subscriber, this.defaultSubscriberExecutor(), options);
    }

    default public void subscribe(Subscriber<? super T> subscriber, EventExecutor executor) {
        this.subscribe(subscriber, executor, InternalStreamMessageUtil.EMPTY_OPTIONS);
    }

    public void subscribe(Subscriber<? super T> var1, EventExecutor var2, SubscriptionOption ... var3);

    default public StreamMessageDuplicator<T> toDuplicator() {
        return this.toDuplicator(this.defaultSubscriberExecutor());
    }

    default public StreamMessageDuplicator<T> toDuplicator(EventExecutor executor) {
        Objects.requireNonNull(executor, "executor");
        return new DefaultStreamMessageDuplicator<Object>(this, unused -> 0, executor, 0L);
    }

    default public EventExecutor defaultSubscriberExecutor() {
        EventLoop eventExecutor = RequestContext.mapCurrent(RequestContext::eventLoop, CommonPools.workerGroup()::next);
        if (!1.$assertionsDisabled && eventExecutor == null) {
            throw new AssertionError();
        }
        return eventExecutor;
    }

    public void abort();

    public void abort(Throwable var1);

    @UnstableApi
    default public <U> StreamMessage<U> decode(StreamDecoder<T, U> decoder) {
        Objects.requireNonNull(decoder, "decoder");
        return this.decode(decoder, ByteBufAllocator.DEFAULT);
    }

    @UnstableApi
    default public <U> StreamMessage<U> decode(StreamDecoder<T, U> decoder, ByteBufAllocator alloc) {
        return new DecodedStreamMessage<T, U>(this, decoder, alloc);
    }

    default public CompletableFuture<List<T>> collect() {
        return this.collect(InternalStreamMessageUtil.EMPTY_OPTIONS);
    }

    default public CompletableFuture<List<T>> collect(SubscriptionOption ... options) {
        return this.collect(this.defaultSubscriberExecutor(), options);
    }

    default public CompletableFuture<List<T>> collect(EventExecutor executor, SubscriptionOption ... options) {
        Objects.requireNonNull(executor, "executor");
        Objects.requireNonNull(options, "options");
        StreamMessageCollector collector = new StreamMessageCollector(options);
        if (!InternalStreamMessageUtil.containsNotifyCancellation(options)) {
            options = ObjectArrays.concat(options, SubscriptionOption.NOTIFY_CANCELLATION);
        }
        this.subscribe(collector, executor, options);
        return collector.collect();
    }

    default public StreamMessage<T> filter(Predicate<? super T> predicate) {
        Objects.requireNonNull(predicate, "predicate");
        return FuseableStreamMessage.of(this, predicate);
    }

    default public <U> StreamMessage<U> map(Function<? super T, ? extends U> function) {
        Objects.requireNonNull(function, "function");
        if (function == Function.identity()) {
            StreamMessage cast = this;
            return cast;
        }
        return FuseableStreamMessage.of(this, function);
    }

    default public <U> StreamMessage<U> mapAsync(Function<? super T, ? extends CompletableFuture<? extends U>> function) {
        Objects.requireNonNull(function, "function");
        return this.mapParallel(function, 1);
    }

    @UnstableApi
    default public <U> StreamMessage<U> mapParallel(Function<? super T, ? extends CompletableFuture<? extends U>> function) {
        Objects.requireNonNull(function, "function");
        return this.mapParallel(function, Integer.MAX_VALUE);
    }

    @UnstableApi
    default public <U> StreamMessage<U> mapParallel(Function<? super T, ? extends CompletableFuture<? extends U>> function, int maxConcurrency) {
        Objects.requireNonNull(function, "function");
        Preconditions.checkArgument(maxConcurrency > 0, "maxConcurrency: %s (expected > 0)", maxConcurrency);
        return new AsyncMapStreamMessage(this, function, maxConcurrency);
    }

    default public <U> StreamMessage<U> flatMap(Function<? super T, ? extends StreamMessage<? extends U>> function) {
        Objects.requireNonNull(function, "function");
        return this.flatMap(function, Integer.MAX_VALUE);
    }

    default public <U> StreamMessage<U> flatMap(Function<? super T, ? extends StreamMessage<? extends U>> function, int maxConcurrency) {
        Preconditions.checkArgument(maxConcurrency > 0, "maxConcurrency: %s (expected: > 0)", maxConcurrency);
        Objects.requireNonNull(function, "function");
        return new FlatMapStreamMessage(this, function, maxConcurrency);
    }

    default public StreamMessage<T> mapError(Function<? super Throwable, ? extends Throwable> function) {
        Objects.requireNonNull(function, "function");
        return FuseableStreamMessage.error(this, function);
    }

    default public StreamMessage<T> peek(Consumer<? super T> action) {
        Objects.requireNonNull(action, "action");
        Function<Object, Object> function = obj -> {
            action.accept(obj);
            return obj;
        };
        return this.map(function);
    }

    default public <U extends T> StreamMessage<T> peek(Consumer<? super U> action, Class<? extends U> type) {
        Objects.requireNonNull(action, "action");
        Objects.requireNonNull(type, "type");
        Function<Object, Object> function = obj -> {
            if (type.isInstance(obj)) {
                action.accept((Object)obj);
            }
            return obj;
        };
        return this.map(function);
    }

    default public StreamMessage<T> peekError(Consumer<? super Throwable> action) {
        Objects.requireNonNull(action, "action");
        Function<Throwable, Throwable> function = obj -> {
            action.accept((Throwable)obj);
            return obj;
        };
        return this.mapError(function);
    }

    default public StreamMessage<T> recoverAndResume(Function<? super Throwable, ? extends StreamMessage<T>> function) {
        Objects.requireNonNull(function, "function");
        return new RecoverableStreamMessage(this, function, true);
    }

    @UnstableApi
    default public <E extends Throwable> StreamMessage<T> recoverAndResume(Class<E> causeClass, Function<? super E, ? extends StreamMessage<T>> function) {
        Objects.requireNonNull(causeClass, "causeClass");
        Objects.requireNonNull(function, "function");
        return this.recoverAndResume(cause -> {
            if (!causeClass.isInstance(cause)) {
                return (StreamMessage)Exceptions.throwUnsafely(cause);
            }
            try {
                StreamMessage recoveredStreamMessage = (StreamMessage)function.apply((Object)cause);
                Objects.requireNonNull(recoveredStreamMessage, "recoveredStreamMessage");
                return recoveredStreamMessage;
            }
            catch (Throwable t) {
                return (StreamMessage)Exceptions.throwUnsafely(cause);
            }
        });
    }

    default public CompletableFuture<Void> writeTo(Function<? super T, ? extends HttpData> mapper, Path destination, OpenOption ... options) {
        Objects.requireNonNull(mapper, "mapper");
        Objects.requireNonNull(destination, "destination");
        Objects.requireNonNull(options, "options");
        return StreamMessages.writeTo(this.map(mapper), destination, options);
    }

    default public InputStream toInputStream(Function<? super T, ? extends HttpData> httpDataConverter) {
        return this.toInputStream(httpDataConverter, this.defaultSubscriberExecutor());
    }

    default public InputStream toInputStream(Function<? super T, ? extends HttpData> httpDataConverter, EventExecutor executor) {
        Objects.requireNonNull(httpDataConverter, "httpDataConverter");
        Objects.requireNonNull(executor, "executor");
        return new StreamMessageInputStream<T>(this, httpDataConverter, executor);
    }

    @UnstableApi
    default public StreamMessage<T> endWith(Function<@Nullable Throwable, ? extends @Nullable T> finalizer) {
        return new SurroundingPublisher<T>(null, this, finalizer);
    }

    default public StreamMessage<T> subscribeOn(EventExecutor eventExecutor) {
        Objects.requireNonNull(eventExecutor, "eventExecutor");
        return new SubscribeOnStreamMessage(this, eventExecutor);
    }

    @UnstableApi
    default public StreamMessage<T> timeout(Duration timeoutDuration) {
        return this.timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT);
    }

    @UnstableApi
    default public StreamMessage<T> timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) {
        Objects.requireNonNull(timeoutDuration, "timeoutDuration");
        Objects.requireNonNull(timeoutMode, "timeoutMode");
        return new TimeoutStreamMessage(this, timeoutDuration, timeoutMode);
    }

    static {
        if (1.$assertionsDisabled) {
            // empty if block
        }
    }
}

