/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import io.trino.client.spooling.DataAttribute;
import io.trino.client.spooling.DataAttributes;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.DriverContext;
import io.trino.operator.OperationTimer;
import io.trino.operator.Operator;
import io.trino.operator.OperatorContext;
import io.trino.operator.OperatorFactory;
import io.trino.operator.OperatorInfo;
import io.trino.operator.OperatorSpoolingController;
import io.trino.operator.PipelineSpoolingController;
import io.trino.operator.SpoolingController;
import io.trino.server.protocol.spooling.QueryDataEncoder;
import io.trino.server.protocol.spooling.SpooledMetadataBlock;
import io.trino.server.protocol.spooling.SpoolingSessionProperties;
import io.trino.spi.Mergeable;
import io.trino.spi.Page;
import io.trino.spi.spool.SpooledSegmentHandle;
import io.trino.spi.spool.SpoolingContext;
import io.trino.spi.spool.SpoolingManager;
import io.trino.sql.planner.plan.PlanNodeId;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;

public class OutputSpoolingOperatorFactory
implements OperatorFactory {
    private final int operatorId;
    private final PlanNodeId planNodeId;
    private final SpoolingManager spoolingManager;
    private final Supplier<QueryDataEncoder> queryDataEncoderSupplier;
    private final QueryDataEncoder queryDataEncoder;
    private final AtomicInteger encoderReferencesCount = new AtomicInteger(1);
    private boolean closed;

    public OutputSpoolingOperatorFactory(int operatorId, PlanNodeId planNodeId, Supplier<QueryDataEncoder> queryDataEncoderSupplier, SpoolingManager spoolingManager) {
        this.operatorId = operatorId;
        this.planNodeId = Objects.requireNonNull(planNodeId, "planNodeId is null");
        this.queryDataEncoderSupplier = Objects.requireNonNull(queryDataEncoderSupplier, "queryDataEncoder is null");
        this.queryDataEncoder = queryDataEncoderSupplier.get();
        this.spoolingManager = Objects.requireNonNull(spoolingManager, "spoolingManager is null");
    }

    @Override
    public Operator createOperator(DriverContext driverContext) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"Factory is already closed");
        OperatorContext operatorContext = driverContext.addOperatorContext(this.operatorId, this.planNodeId, OutputSpoolingOperator.class.getSimpleName());
        this.encoderReferencesCount.incrementAndGet();
        QueryDataEncoder trackingQueryDataEncoder = new QueryDataEncoder(){
            private final AtomicBoolean closed = new AtomicBoolean();

            @Override
            public DataAttributes encodeTo(OutputStream output, List<Page> pages) throws IOException {
                return OutputSpoolingOperatorFactory.this.queryDataEncoder.encodeTo(output, pages);
            }

            @Override
            public void close() {
                if (this.closed.getAndSet(true)) {
                    return;
                }
                OutputSpoolingOperatorFactory.this.closeEncoderIfNoMoreReferences();
            }

            @Override
            public String encoding() {
                return OutputSpoolingOperatorFactory.this.queryDataEncoder.encoding();
            }
        };
        return new OutputSpoolingOperator(operatorContext, trackingQueryDataEncoder, this.spoolingManager);
    }

    @Override
    public void noMoreOperators() {
        this.closed = true;
        this.closeEncoderIfNoMoreReferences();
    }

    private void closeEncoderIfNoMoreReferences() {
        if (this.encoderReferencesCount.decrementAndGet() == 0) {
            this.queryDataEncoder.close();
        }
    }

    @Override
    public OperatorFactory duplicate() {
        return new OutputSpoolingOperatorFactory(this.operatorId, this.planNodeId, this.queryDataEncoderSupplier, this.spoolingManager);
    }

    static class OutputSpoolingOperator
    implements Operator {
        private final SpoolingController controller;
        private final ZoneId clientZoneId;
        private State state = State.NEEDS_INPUT;
        private final OperatorContext operatorContext;
        private final AggregatedMemoryContext aggregatedMemoryContext;
        private final LocalMemoryContext localMemoryContext;
        private final QueryDataEncoder queryDataEncoder;
        private final SpoolingManager spoolingManager;
        private final PageBuffer buffer;
        private final OperationTimer.OperationTiming spoolingTiming = new OperationTimer.OperationTiming();
        private final AtomicLong spooledEncodedBytes = new AtomicLong();
        private final AtomicLong inlinedEncodedBytes = new AtomicLong();
        private Page outputPage;

        public OutputSpoolingOperator(OperatorContext operatorContext, QueryDataEncoder queryDataEncoder, SpoolingManager spoolingManager) {
            this.operatorContext = Objects.requireNonNull(operatorContext, "operatorContext is null");
            this.clientZoneId = operatorContext.getSession().getTimeZoneKey().getZoneId();
            this.controller = new PipelineSpoolingController(operatorContext.getDriverContext().getPipelineContext(), new OperatorSpoolingController(SpoolingSessionProperties.getInitialSegmentSize(operatorContext.getSession()).toBytes(), SpoolingSessionProperties.getMaxSegmentSize(operatorContext.getSession()).toBytes()));
            this.aggregatedMemoryContext = operatorContext.newAggregateUserMemoryContext();
            this.queryDataEncoder = Objects.requireNonNull(queryDataEncoder, "queryDataEncoder is null");
            this.spoolingManager = Objects.requireNonNull(spoolingManager, "spoolingManager is null");
            this.buffer = PageBuffer.create(this.aggregatedMemoryContext.newLocalMemoryContext(OutputSpoolingOperator.class.getSimpleName() + ".buffer"));
            this.localMemoryContext = this.aggregatedMemoryContext.newLocalMemoryContext(OutputSpoolingOperator.class.getSimpleName());
            operatorContext.setInfoSupplier(new OutputSpoolingInfoSupplier(this.spoolingTiming, this.controller, this.inlinedEncodedBytes, this.spooledEncodedBytes));
        }

        @Override
        public OperatorContext getOperatorContext() {
            return this.operatorContext;
        }

        @Override
        public boolean needsInput() {
            return this.state == State.NEEDS_INPUT;
        }

        @Override
        public void addInput(Page page) {
            Preconditions.checkState((boolean)this.needsInput(), (Object)"Operator is already finishing");
            Objects.requireNonNull(page, "page is null");
            switch (this.controller.nextMode(page)) {
                default: {
                    throw new MatchException(null, null);
                }
                case SPOOL: {
                    this.buffer.add(page);
                    Page page2 = this.outputBuffer(false);
                    break;
                }
                case BUFFER: {
                    this.buffer.add(page);
                    Page page2 = null;
                    break;
                }
                case INLINE: {
                    Page page2 = this.outputPage = this.inline(page);
                }
            }
            if (this.outputPage != null) {
                this.state = State.HAS_OUTPUT;
            }
            this.updateMemoryReservation();
        }

        @Override
        public Page getOutput() {
            if (this.state != State.HAS_OUTPUT && this.state != State.HAS_LAST_OUTPUT) {
                return null;
            }
            Page toReturn = this.outputPage;
            this.outputPage = null;
            this.state = this.state == State.HAS_LAST_OUTPUT ? State.FINISHED : State.NEEDS_INPUT;
            this.updateMemoryReservation();
            return toReturn;
        }

        @Override
        public void finish() {
            if (this.state == State.NEEDS_INPUT) {
                this.outputPage = this.outputBuffer(true);
                this.state = this.outputPage != null ? State.HAS_LAST_OUTPUT : State.FINISHED;
            }
        }

        @Override
        public boolean isFinished() {
            return this.state == State.FINISHED;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Page outputBuffer(boolean finished) {
            if (this.buffer.isEmpty()) {
                return null;
            }
            PageBuffer pageBuffer = this.buffer;
            synchronized (pageBuffer) {
                return this.spool(this.buffer.removeAll(), finished);
            }
        }

        private Page spool(List<Page> pages, boolean finished) {
            long rows = OutputSpoolingOperator.reduce(pages, Page::getPositionCount);
            long size = OutputSpoolingOperator.reduce(pages, Page::getSizeInBytes);
            SpooledSegmentHandle segmentHandle = this.spoolingManager.create(new SpoolingContext(this.queryDataEncoder.encoding(), this.operatorContext.getDriverContext().getSession().getQueryId(), rows, size));
            String expiresAt = ZonedDateTime.ofInstant(segmentHandle.expirationTime(), this.clientZoneId).toLocalDateTime().toString();
            OperationTimer overallTimer = new OperationTimer(false);
            try {
                Page page;
                block12: {
                    OutputStream output = this.spoolingManager.createOutputStream(segmentHandle);
                    try {
                        DataAttributes attributes = this.queryDataEncoder.encodeTo(output, pages).toBuilder().set(DataAttribute.ROWS_COUNT, (Object)rows).set(DataAttribute.EXPIRES_AT, (Object)expiresAt).build();
                        this.spooledEncodedBytes.addAndGet(((Integer)attributes.get(DataAttribute.SEGMENT_SIZE, Integer.class)).intValue());
                        if (finished) {
                            this.controller.execute(SpoolingController.Mode.SPOOL, rows, size);
                        }
                        page = SpooledMetadataBlock.forSpooledLocation(this.spoolingManager.location(segmentHandle), attributes).serialize();
                        if (output == null) break block12;
                    }
                    catch (Throwable throwable) {
                        try {
                            if (output != null) {
                                try {
                                    output.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                            }
                            throw throwable;
                        }
                        catch (IOException e) {
                            throw new UncheckedIOException(e);
                        }
                    }
                    output.close();
                }
                return page;
            }
            finally {
                overallTimer.end(this.spoolingTiming);
            }
        }

        private Page inline(Page page) {
            OperationTimer overallTimer = new OperationTimer(false);
            try {
                Page page2;
                ByteArrayOutputStream output = new ByteArrayOutputStream();
                try {
                    DataAttributes attributes = this.queryDataEncoder.encodeTo(output, List.of(page)).toBuilder().set(DataAttribute.ROWS_COUNT, (Object)page.getPositionCount()).build();
                    this.inlinedEncodedBytes.addAndGet(((Integer)attributes.get(DataAttribute.SEGMENT_SIZE, Integer.class)).intValue());
                    page2 = SpooledMetadataBlock.forInlineData(attributes, output.toByteArray()).serialize();
                }
                catch (Throwable throwable) {
                    try {
                        try {
                            output.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                        throw throwable;
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }
                output.close();
                return page2;
            }
            finally {
                overallTimer.end(this.spoolingTiming);
            }
        }

        private void updateMemoryReservation() {
            if (this.outputPage == null) {
                this.localMemoryContext.setBytes(0L);
            } else {
                this.localMemoryContext.setBytes(this.outputPage.getSizeInBytes());
            }
        }

        static long reduce(List<Page> page, ToLongFunction<Page> reduce) {
            return page.stream().mapToLong(reduce).sum();
        }

        @Override
        public void close() throws Exception {
            this.aggregatedMemoryContext.close();
            this.queryDataEncoder.close();
        }

        static enum State {
            NEEDS_INPUT,
            HAS_OUTPUT,
            HAS_LAST_OUTPUT,
            FINISHED;

        }
    }

    public record OutputSpoolingInfo(Duration spoolingWallTime, Duration spoolingCpuTime, long inlinedPages, long inlinedPositions, long inlinedRawBytes, long inlinedEncodedBytes, long spooledPages, long spooledPositions, long spooledRawBytes, long spooledEncodedBytes) implements Mergeable<OutputSpoolingInfo>,
    OperatorInfo
    {
        public OutputSpoolingInfo {
            Objects.requireNonNull(spoolingWallTime, "spoolingWallTime is null");
            Objects.requireNonNull(spoolingCpuTime, "spoolingCpuTime is null");
        }

        public OutputSpoolingInfo mergeWith(OutputSpoolingInfo other) {
            return new OutputSpoolingInfo(Duration.succinctDuration((double)(this.spoolingWallTime.toMillis() + other.spoolingWallTime().toMillis()), (TimeUnit)TimeUnit.MILLISECONDS), Duration.succinctDuration((double)(this.spoolingCpuTime.toMillis() + other.spoolingCpuTime().toMillis()), (TimeUnit)TimeUnit.MILLISECONDS), this.inlinedPages + other.inlinedPages(), this.inlinedPositions + other.inlinedPositions, this.inlinedRawBytes + other.inlinedRawBytes, this.inlinedEncodedBytes + other.inlinedEncodedBytes, this.spooledPages + other.spooledPages, this.spooledPositions + other.spooledPositions, this.spooledRawBytes + other.spooledRawBytes, this.spooledEncodedBytes + other.spooledEncodedBytes);
        }

        @JsonProperty
        public double getEncodedToRawBytesRatio() {
            return 1.0 * (double)(this.spooledEncodedBytes + this.inlinedEncodedBytes) / (double)(this.spooledRawBytes + this.inlinedRawBytes);
        }

        @Override
        public boolean isFinal() {
            return true;
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("spoolingWallTime", (Object)this.spoolingWallTime).add("spoolingCpuTime", (Object)this.spoolingCpuTime).add("inlinedPages", this.inlinedPages).add("inlinedPositions", this.inlinedPositions).add("inlinedRawBytes", this.inlinedRawBytes).add("inlinedEncodedBytes", this.inlinedEncodedBytes).add("spooledPages", this.spooledPages).add("spooledPositions", this.spooledPositions).add("spooledRawBytes", this.spooledRawBytes).add("spooledEncodedBytes", this.spooledEncodedBytes).add("encodedToRawBytesRatio", this.getEncodedToRawBytesRatio()).toString();
        }
    }

    private record OutputSpoolingInfoSupplier(OperationTimer.OperationTiming spoolingTiming, SpoolingController controller, AtomicLong inlinedEncodedBytes, AtomicLong spooledEncodedBytes) implements Supplier<OutputSpoolingInfo>
    {
        private OutputSpoolingInfoSupplier {
            Objects.requireNonNull(spoolingTiming, "spoolingTiming is null");
            Objects.requireNonNull(controller, "controller is null");
            Objects.requireNonNull(inlinedEncodedBytes, "inlinedEncodedBytes is null");
            Objects.requireNonNull(spooledEncodedBytes, "spooledEncodedBytes is null");
        }

        @Override
        public OutputSpoolingInfo get() {
            SpoolingController.MetricSnapshot inlined = this.controller.getMetrics(SpoolingController.Mode.INLINE);
            SpoolingController.MetricSnapshot spooled = this.controller.getMetrics(SpoolingController.Mode.SPOOL);
            return new OutputSpoolingInfo(Duration.succinctDuration((double)this.spoolingTiming.getWallNanos(), (TimeUnit)TimeUnit.NANOSECONDS), Duration.succinctDuration((double)this.spoolingTiming.getCpuNanos(), (TimeUnit)TimeUnit.NANOSECONDS), inlined.pages(), inlined.positions(), inlined.size(), this.inlinedEncodedBytes.get(), spooled.pages(), spooled.positions(), spooled.size(), this.spooledEncodedBytes.get());
        }
    }

    private static class PageBuffer {
        private final List<Page> buffer = new ArrayList<Page>();
        private final LocalMemoryContext memoryContext;

        private PageBuffer(LocalMemoryContext memoryContext) {
            this.memoryContext = Objects.requireNonNull(memoryContext, "memoryContext is null");
        }

        public static PageBuffer create(LocalMemoryContext memoryContext) {
            return new PageBuffer(memoryContext);
        }

        public void add(Page page) {
            this.buffer.add(page);
            this.memoryContext.setBytes(this.memoryContext.getBytes() + page.getRetainedSizeInBytes());
        }

        public boolean isEmpty() {
            return this.buffer.isEmpty();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<Page> removeAll() {
            ImmutableList pages;
            List<Page> list = this.buffer;
            synchronized (list) {
                pages = ImmutableList.copyOf(this.buffer);
                this.buffer.clear();
                this.memoryContext.setBytes(0L);
            }
            return pages;
        }
    }
}

