/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.FormatMethod;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.metadata.Split;
import io.trino.operator.BlockedReason;
import io.trino.operator.DriverContext;
import io.trino.operator.OperationTimer;
import io.trino.operator.OperatorContext;
import io.trino.operator.OperatorFactory;
import io.trino.operator.OperatorInfo;
import io.trino.operator.OperatorStats;
import io.trino.operator.PageUtils;
import io.trino.operator.ProcessorContext;
import io.trino.operator.SourceOperator;
import io.trino.operator.SourceOperatorFactory;
import io.trino.operator.SplitOperatorInfo;
import io.trino.operator.WorkProcessor;
import io.trino.operator.WorkProcessorOperator;
import io.trino.operator.WorkProcessorOperatorFactory;
import io.trino.operator.WorkProcessorSourceOperator;
import io.trino.operator.WorkProcessorSourceOperatorFactory;
import io.trino.operator.project.MergePages;
import io.trino.spi.Page;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.type.Type;
import io.trino.sql.planner.LocalExecutionPlanner;
import io.trino.sql.planner.plan.PlanNodeId;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;

public class WorkProcessorPipelineSourceOperator
implements SourceOperator {
    private static final Logger log = Logger.get(WorkProcessorPipelineSourceOperator.class);
    private static final Duration ZERO_DURATION = new Duration(0.0, TimeUnit.NANOSECONDS);
    private static final int OPERATOR_ID = Integer.MAX_VALUE;
    private final int stageId;
    private final int pipelineId;
    private final PlanNodeId sourceId;
    private final OperatorContext operatorContext;
    private final WorkProcessor<Page> pages;
    private final OperationTimer timer;
    private final List<WorkProcessorOperatorContext> workProcessorOperatorContexts = new ArrayList<WorkProcessorOperatorContext>();
    private final List<Split> pendingSplits = new ArrayList<Split>();
    private ListenableFuture<Void> blockedFuture;
    private WorkProcessorSourceOperator sourceOperator;
    private SettableFuture<Void> blockedOnSplits = SettableFuture.create();
    private boolean operatorFinishing;

    public static List<OperatorFactory> convertOperators(List<LocalExecutionPlanner.OperatorFactoryWithTypes> operatorFactoriesWithTypes, DataSize minOutputPageSize, int minOutputPageRowCount) {
        OperatorFactory operatorFactory;
        int operatorIndex;
        OperatorFactory operatorFactory2;
        if (operatorFactoriesWithTypes.isEmpty() || !((operatorFactory2 = operatorFactoriesWithTypes.get(0).getOperatorFactory()) instanceof WorkProcessorSourceOperatorFactory)) {
            return WorkProcessorPipelineSourceOperator.toOperatorFactories(operatorFactoriesWithTypes);
        }
        WorkProcessorSourceOperatorFactory sourceOperatorFactory = (WorkProcessorSourceOperatorFactory)((Object)operatorFactory2);
        ImmutableList.Builder workProcessorOperatorFactoriesBuilder = ImmutableList.builder();
        for (operatorIndex = 1; operatorIndex < operatorFactoriesWithTypes.size() && (operatorFactory = operatorFactoriesWithTypes.get(operatorIndex).getOperatorFactory()) instanceof WorkProcessorOperatorFactory; ++operatorIndex) {
            workProcessorOperatorFactoriesBuilder.add((Object)((WorkProcessorOperatorFactory)((Object)operatorFactory)));
        }
        ImmutableList workProcessorOperatorFactories = workProcessorOperatorFactoriesBuilder.build();
        if (workProcessorOperatorFactories.isEmpty()) {
            return WorkProcessorPipelineSourceOperator.toOperatorFactories(operatorFactoriesWithTypes);
        }
        return ImmutableList.builder().add((Object)new WorkProcessorPipelineSourceOperatorFactory(sourceOperatorFactory, (List<WorkProcessorOperatorFactory>)workProcessorOperatorFactories, operatorFactoriesWithTypes.get(operatorIndex - 1).getTypes(), minOutputPageSize, minOutputPageRowCount)).addAll(WorkProcessorPipelineSourceOperator.toOperatorFactories(operatorFactoriesWithTypes.subList(operatorIndex, operatorFactoriesWithTypes.size()))).build();
    }

    public static List<OperatorFactory> toOperatorFactories(List<LocalExecutionPlanner.OperatorFactoryWithTypes> operatorFactoriesWithTypes) {
        return (List)operatorFactoriesWithTypes.stream().map(LocalExecutionPlanner.OperatorFactoryWithTypes::getOperatorFactory).collect(ImmutableList.toImmutableList());
    }

    private WorkProcessorPipelineSourceOperator(DriverContext driverContext, WorkProcessorSourceOperatorFactory sourceOperatorFactory, List<WorkProcessorOperatorFactory> operatorFactories, List<Type> outputTypes, DataSize minOutputPageSize, int minOutputPageRowCount) {
        Objects.requireNonNull(driverContext, "driverContext is null");
        Objects.requireNonNull(sourceOperatorFactory, "sourceOperatorFactory is null");
        Objects.requireNonNull(operatorFactories, "operatorFactories is null");
        this.stageId = driverContext.getTaskId().getStageId().getId();
        this.pipelineId = driverContext.getPipelineContext().getPipelineId();
        this.sourceId = Objects.requireNonNull(sourceOperatorFactory.getSourceId(), "sourceId is null");
        this.operatorContext = driverContext.addOperatorContext(Integer.MAX_VALUE, this.sourceId, WorkProcessorPipelineSourceOperator.class.getSimpleName());
        this.timer = new OperationTimer(this.operatorContext.getDriverContext().isCpuTimerEnabled(), this.operatorContext.getDriverContext().isCpuTimerEnabled() && this.operatorContext.getDriverContext().isPerOperatorCpuTimerEnabled());
        MemoryTrackingContext sourceOperatorMemoryTrackingContext = this.createMemoryTrackingContext(this.operatorContext, 0);
        sourceOperatorMemoryTrackingContext.initializeLocalMemoryContexts(sourceOperatorFactory.getOperatorType());
        WorkProcessor<Split> splits = WorkProcessor.create(new Splits());
        this.sourceOperator = sourceOperatorFactory.create(this.operatorContext.getSession(), sourceOperatorMemoryTrackingContext, this.operatorContext.getDriverContext().getYieldSignal(), splits);
        this.workProcessorOperatorContexts.add(new WorkProcessorOperatorContext(this.sourceOperator, sourceOperatorFactory.getOperatorId(), sourceOperatorFactory.getPlanNodeId(), sourceOperatorFactory.getOperatorType(), sourceOperatorMemoryTrackingContext));
        WorkProcessor<Object> pages = this.sourceOperator.getOutputPages();
        pages = pages.yielding(() -> this.operatorContext.getDriverContext().getYieldSignal().isSet()).withProcessEntryMonitor(() -> this.workProcessorOperatorEntryMonitor(0)).withProcessStateMonitor(state -> this.workProcessorOperatorStateMonitor((WorkProcessor.ProcessState<Page>)state, 0)).map(page -> this.recordProcessedOutput((Page)page, 0));
        for (int i = 0; i < operatorFactories.size(); ++i) {
            int operatorIndex = i + 1;
            WorkProcessorOperatorFactory operatorFactory = operatorFactories.get(i);
            MemoryTrackingContext operatorMemoryTrackingContext = this.createMemoryTrackingContext(this.operatorContext, operatorIndex);
            operatorMemoryTrackingContext.initializeLocalMemoryContexts(operatorFactory.getOperatorType());
            WorkProcessorOperator operator = operatorFactory.create(new ProcessorContext(this.operatorContext.getSession(), operatorMemoryTrackingContext, this.operatorContext), pages);
            this.workProcessorOperatorContexts.add(new WorkProcessorOperatorContext(operator, operatorFactory.getOperatorId(), operatorFactory.getPlanNodeId(), operatorFactory.getOperatorType(), operatorMemoryTrackingContext));
            pages = operator.getOutputPages();
            if (i == operatorFactories.size() - 1) {
                pages = pages.map(Page::getLoadedPage);
                pages = pages.transformProcessor(processor -> MergePages.mergePages(outputTypes, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, this.operatorContext.aggregateUserMemoryContext()));
            }
            pages = pages.yielding(() -> this.operatorContext.getDriverContext().getYieldSignal().isSet()).withProcessEntryMonitor(() -> this.workProcessorOperatorEntryMonitor(operatorIndex)).withProcessStateMonitor(state -> this.workProcessorOperatorStateMonitor((WorkProcessor.ProcessState<Page>)state, operatorIndex));
            pages = pages.map(page -> this.recordProcessedOutput((Page)page, operatorIndex));
        }
        this.pages = pages.finishWhen(() -> this.operatorFinishing);
        this.operatorContext.setNestedOperatorStatsSupplier(this::getNestedOperatorStats);
    }

    private void workProcessorOperatorEntryMonitor(int operatorIndex) {
        if (this.isLastOperator(operatorIndex)) {
            this.timer.resetInterval();
        } else {
            this.timer.recordOperationComplete(this.workProcessorOperatorContexts.get((int)(operatorIndex + 1)).operatorTiming);
        }
    }

    private void workProcessorOperatorStateMonitor(WorkProcessor.ProcessState<Page> state, int operatorIndex) {
        WorkProcessorOperatorContext context = this.workProcessorOperatorContexts.get(operatorIndex);
        this.timer.recordOperationComplete(context.operatorTiming);
        context.metrics.set(context.operator.getMetrics());
        if (operatorIndex == 0) {
            WorkProcessorSourceOperator sourceOperator = (WorkProcessorSourceOperator)context.operator;
            long deltaPhysicalInputDataSize = WorkProcessorPipelineSourceOperator.deltaAndSet(context.physicalInputDataSize, sourceOperator.getPhysicalInputDataSize().toBytes());
            long deltaPhysicalInputPositions = WorkProcessorPipelineSourceOperator.deltaAndSet(context.physicalInputPositions, sourceOperator.getPhysicalInputPositions());
            long deltaInternalNetworkInputDataSize = WorkProcessorPipelineSourceOperator.deltaAndSet(context.internalNetworkInputDataSize, sourceOperator.getInternalNetworkInputDataSize().toBytes());
            long deltaInternalNetworkInputPositions = WorkProcessorPipelineSourceOperator.deltaAndSet(context.internalNetworkInputPositions, sourceOperator.getInternalNetworkPositions());
            long deltaInputDataSize = WorkProcessorPipelineSourceOperator.deltaAndSet(context.inputDataSize, sourceOperator.getInputDataSize().toBytes());
            long deltaInputPositions = WorkProcessorPipelineSourceOperator.deltaAndSet(context.inputPositions, sourceOperator.getInputPositions());
            long deltaReadTimeNanos = WorkProcessorPipelineSourceOperator.deltaAndSet(context.readTimeNanos, sourceOperator.getReadTime().roundTo(TimeUnit.NANOSECONDS));
            context.dynamicFilterSplitsProcessed.set(sourceOperator.getDynamicFilterSplitsProcessed());
            context.connectorMetrics.set(sourceOperator.getConnectorMetrics());
            this.operatorContext.recordPhysicalInputWithTiming(deltaPhysicalInputDataSize, deltaPhysicalInputPositions, deltaReadTimeNanos);
            this.operatorContext.recordNetworkInput(deltaInternalNetworkInputDataSize, deltaInternalNetworkInputPositions);
            this.operatorContext.recordProcessedInput(deltaInputDataSize, deltaInputPositions);
        }
        if (state.getType() == WorkProcessor.ProcessState.Type.FINISHED) {
            this.closeOperators(operatorIndex);
        } else if (state.getType() == WorkProcessor.ProcessState.Type.BLOCKED && this.blockedFuture != state.getBlocked()) {
            this.blockedFuture = state.getBlocked();
            long start = System.nanoTime();
            this.blockedFuture.addListener(() -> context.blockedWallNanos.getAndAdd(System.nanoTime() - start), MoreExecutors.directExecutor());
        }
    }

    private static long deltaAndSet(AtomicLong currentValue, long newValue) {
        return newValue - currentValue.getAndSet(newValue);
    }

    private Page recordProcessedOutput(Page page, int operatorIndex) {
        WorkProcessorOperatorContext downstreamOperatorContext;
        WorkProcessorOperatorContext operatorContext = this.workProcessorOperatorContexts.get(operatorIndex);
        operatorContext.outputPositions.getAndAdd(page.getPositionCount());
        if (!this.isLastOperator(operatorIndex)) {
            downstreamOperatorContext = this.workProcessorOperatorContexts.get(operatorIndex + 1);
            downstreamOperatorContext.inputPositions.getAndAdd(page.getPositionCount());
        } else {
            downstreamOperatorContext = null;
        }
        PageUtils.recordMaterializedBytes(page, sizeInBytes -> {
            operatorContext.outputDataSize.getAndAdd(sizeInBytes);
            if (downstreamOperatorContext != null) {
                downstreamOperatorContext.inputDataSize.getAndAdd(sizeInBytes);
            }
        });
        return page;
    }

    private boolean isLastOperator(int operatorIndex) {
        return operatorIndex + 1 == this.workProcessorOperatorContexts.size();
    }

    private MemoryTrackingContext createMemoryTrackingContext(OperatorContext operatorContext, int operatorIndex) {
        return new MemoryTrackingContext((AggregatedMemoryContext)new InternalAggregatedMemoryContext(operatorContext.newAggregateUserMemoryContext(), () -> this.updatePeakMemoryReservations(operatorIndex)), (AggregatedMemoryContext)new InternalAggregatedMemoryContext(operatorContext.newAggregateRevocableMemoryContext(), () -> this.updatePeakMemoryReservations(operatorIndex)));
    }

    private void updatePeakMemoryReservations(int operatorIndex) {
        this.workProcessorOperatorContexts.get(operatorIndex).updatePeakMemoryReservations();
    }

    private List<OperatorStats> getNestedOperatorStats() {
        return (List)this.workProcessorOperatorContexts.stream().map(context -> new OperatorStats(this.stageId, this.pipelineId, context.operatorId, context.planNodeId, context.operatorType, 1L, 0L, new Duration(0.0, TimeUnit.NANOSECONDS), ZERO_DURATION, DataSize.succinctBytes((long)context.physicalInputDataSize.get()), context.physicalInputPositions.get(), new Duration((double)context.readTimeNanos.get(), TimeUnit.NANOSECONDS), DataSize.succinctBytes((long)context.internalNetworkInputDataSize.get()), context.internalNetworkInputPositions.get(), DataSize.succinctBytes((long)(context.physicalInputDataSize.get() + context.internalNetworkInputDataSize.get())), DataSize.succinctBytes((long)context.inputDataSize.get()), context.inputPositions.get(), (double)context.inputPositions.get() * (double)context.inputPositions.get(), context.operatorTiming.getCalls(), new Duration((double)context.operatorTiming.getWallNanos(), TimeUnit.NANOSECONDS), new Duration((double)context.operatorTiming.getCpuNanos(), TimeUnit.NANOSECONDS), DataSize.succinctBytes((long)context.outputDataSize.get()), context.outputPositions.get(), context.dynamicFilterSplitsProcessed.get(), OperatorContext.getOperatorMetrics(context.metrics.get(), context.inputPositions.get(), new Duration((double)context.operatorTiming.getCpuNanos(), TimeUnit.NANOSECONDS).convertTo(TimeUnit.SECONDS).getValue(), new Duration((double)context.operatorTiming.getWallNanos(), TimeUnit.NANOSECONDS).convertTo(TimeUnit.SECONDS).getValue(), new Duration((double)context.blockedWallNanos.get(), TimeUnit.NANOSECONDS).convertTo(TimeUnit.SECONDS).getValue()), context.connectorMetrics.get(), DataSize.ofBytes((long)0L), new Duration((double)context.blockedWallNanos.get(), TimeUnit.NANOSECONDS), 0L, ZERO_DURATION, ZERO_DURATION, DataSize.succinctBytes((long)context.memoryTrackingContext.getUserMemory()), DataSize.succinctBytes((long)context.memoryTrackingContext.getRevocableMemory()), DataSize.succinctBytes((long)context.peakUserMemoryReservation.get()), DataSize.succinctBytes((long)context.peakRevocableMemoryReservation.get()), DataSize.succinctBytes((long)context.peakTotalMemoryReservation.get()), DataSize.ofBytes((long)0L), this.operatorContext.isWaitingForMemory().isDone() ? Optional.empty() : Optional.of(BlockedReason.WAITING_FOR_MEMORY), this.getOperatorInfo((WorkProcessorOperatorContext)context))).collect(ImmutableList.toImmutableList());
    }

    @Nullable
    private OperatorInfo getOperatorInfo(WorkProcessorOperatorContext context) {
        WorkProcessorOperator operator = context.operator;
        if (operator != null) {
            return operator.getOperatorInfo().orElse(null);
        }
        return context.finalOperatorInfo;
    }

    @Override
    public PlanNodeId getSourceId() {
        return this.sourceId;
    }

    @Override
    public void addSplit(Split split) {
        if (this.sourceOperator == null) {
            return;
        }
        Object splitInfo = split.getInfo();
        if (splitInfo != null) {
            this.operatorContext.setInfoSupplier((Supplier<? extends OperatorInfo>)Suppliers.ofInstance((Object)new SplitOperatorInfo(split.getCatalogHandle(), splitInfo)));
        }
        this.pendingSplits.add(split);
        this.blockedOnSplits.set(null);
    }

    @Override
    public void noMoreSplits() {
        this.blockedOnSplits.set(null);
        this.sourceOperator = null;
    }

    @Override
    public OperatorContext getOperatorContext() {
        return this.operatorContext;
    }

    @Override
    public boolean needsInput() {
        return false;
    }

    @Override
    public void addInput(Page page) {
        throw new UnsupportedOperationException();
    }

    @Override
    public Page getOutput() {
        if (!this.pages.process()) {
            return null;
        }
        if (this.pages.isFinished()) {
            return null;
        }
        return this.pages.getResult();
    }

    @Override
    public ListenableFuture<Void> startMemoryRevoke() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void finishMemoryRevoke() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void finish() {
        this.operatorFinishing = true;
        this.noMoreSplits();
        this.closeOperators(this.workProcessorOperatorContexts.size() - 1);
    }

    @Override
    public boolean isFinished() {
        return this.pages.isFinished();
    }

    @Override
    public ListenableFuture<Void> isBlocked() {
        if (!this.pages.isBlocked()) {
            return NOT_BLOCKED;
        }
        return this.pages.getBlockedFuture();
    }

    @Override
    public void close() {
        this.finish();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeOperators(int lastOperatorIndex) {
        boolean wasInterrupted = Thread.interrupted();
        Throwable inFlightException = null;
        try {
            for (int i = 0; i <= lastOperatorIndex; ++i) {
                WorkProcessorOperatorContext workProcessorOperatorContext = this.workProcessorOperatorContexts.get(i);
                WorkProcessorOperator operator = workProcessorOperatorContext.operator;
                if (operator == null) continue;
                try {
                    operator.close();
                    continue;
                }
                catch (InterruptedException t) {
                    wasInterrupted = true;
                    continue;
                }
                catch (Throwable t) {
                    inFlightException = WorkProcessorPipelineSourceOperator.handleOperatorCloseError(inFlightException, t, "Error closing WorkProcessor operator %s for task %s", workProcessorOperatorContext.operatorId, this.operatorContext.getDriverContext().getTaskId());
                    continue;
                }
                finally {
                    workProcessorOperatorContext.metrics.set(operator.getMetrics());
                    if (operator instanceof WorkProcessorSourceOperator) {
                        WorkProcessorSourceOperator sourceOperator = (WorkProcessorSourceOperator)operator;
                        workProcessorOperatorContext.connectorMetrics.set(sourceOperator.getConnectorMetrics());
                    }
                    workProcessorOperatorContext.memoryTrackingContext.close();
                    workProcessorOperatorContext.finalOperatorInfo = operator.getOperatorInfo().orElse(null);
                    workProcessorOperatorContext.operator = null;
                }
            }
        }
        finally {
            if (wasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
        if (inFlightException != null) {
            Throwables.throwIfUnchecked(inFlightException);
            throw new RuntimeException(inFlightException);
        }
    }

    @FormatMethod
    private static Throwable handleOperatorCloseError(Throwable inFlightException, Throwable newException, String message, Object ... args) {
        if (newException instanceof Error) {
            if (inFlightException == null) {
                inFlightException = newException;
            } else if (inFlightException != newException) {
                inFlightException.addSuppressed(newException);
            }
        } else {
            log.error(newException, message, args);
        }
        return inFlightException;
    }

    public static class WorkProcessorPipelineSourceOperatorFactory
    implements SourceOperatorFactory {
        private final WorkProcessorSourceOperatorFactory sourceOperatorFactory;
        private final List<WorkProcessorOperatorFactory> operatorFactories;
        private final List<Type> outputTypes;
        private final DataSize minOutputPageSize;
        private final int minOutputPageRowCount;
        private boolean closed;

        private WorkProcessorPipelineSourceOperatorFactory(WorkProcessorSourceOperatorFactory sourceOperatorFactory, List<WorkProcessorOperatorFactory> operatorFactories, List<Type> outputTypes, DataSize minOutputPageSize, int minOutputPageRowCount) {
            this.sourceOperatorFactory = Objects.requireNonNull(sourceOperatorFactory, "sourceOperatorFactory is null");
            this.operatorFactories = Objects.requireNonNull(operatorFactories, "operatorFactories is null");
            this.outputTypes = Objects.requireNonNull(outputTypes, "outputTypes is null");
            this.minOutputPageSize = Objects.requireNonNull(minOutputPageSize, "minOutputPageSize is null");
            this.minOutputPageRowCount = minOutputPageRowCount;
        }

        @Override
        public PlanNodeId getSourceId() {
            return this.sourceOperatorFactory.getSourceId();
        }

        @Override
        public SourceOperator createOperator(DriverContext driverContext) {
            Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"Factory is already closed");
            return new WorkProcessorPipelineSourceOperator(driverContext, this.sourceOperatorFactory, this.operatorFactories, this.outputTypes, this.minOutputPageSize, this.minOutputPageRowCount);
        }

        @Override
        public void noMoreOperators() {
            this.operatorFactories.forEach(WorkProcessorOperatorFactory::close);
            this.closed = true;
        }
    }

    private class Splits
    implements WorkProcessor.Process<Split> {
        private Splits() {
        }

        @Override
        public WorkProcessor.ProcessState<Split> process() {
            boolean noMoreSplits;
            boolean bl = noMoreSplits = WorkProcessorPipelineSourceOperator.this.sourceOperator == null;
            if (WorkProcessorPipelineSourceOperator.this.pendingSplits.isEmpty()) {
                if (noMoreSplits) {
                    return WorkProcessor.ProcessState.finished();
                }
                WorkProcessorPipelineSourceOperator.this.blockedOnSplits = SettableFuture.create();
                WorkProcessorPipelineSourceOperator.this.blockedFuture = WorkProcessorPipelineSourceOperator.this.blockedOnSplits;
                return WorkProcessor.ProcessState.blocked(WorkProcessorPipelineSourceOperator.this.blockedOnSplits);
            }
            return WorkProcessor.ProcessState.ofResult(WorkProcessorPipelineSourceOperator.this.pendingSplits.remove(0));
        }
    }

    private static class WorkProcessorOperatorContext {
        final int operatorId;
        final PlanNodeId planNodeId;
        final String operatorType;
        final MemoryTrackingContext memoryTrackingContext;
        final OperationTimer.OperationTiming operatorTiming = new OperationTimer.OperationTiming();
        final AtomicLong blockedWallNanos = new AtomicLong();
        final AtomicLong physicalInputDataSize = new AtomicLong();
        final AtomicLong physicalInputPositions = new AtomicLong();
        final AtomicLong internalNetworkInputDataSize = new AtomicLong();
        final AtomicLong internalNetworkInputPositions = new AtomicLong();
        final AtomicLong inputDataSize = new AtomicLong();
        final AtomicLong inputPositions = new AtomicLong();
        final AtomicLong readTimeNanos = new AtomicLong();
        final AtomicLong outputDataSize = new AtomicLong();
        final AtomicLong outputPositions = new AtomicLong();
        final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();
        final AtomicReference<Metrics> metrics = new AtomicReference<Metrics>(Metrics.EMPTY);
        final AtomicReference<Metrics> connectorMetrics = new AtomicReference<Metrics>(Metrics.EMPTY);
        final AtomicLong peakUserMemoryReservation = new AtomicLong();
        final AtomicLong peakRevocableMemoryReservation = new AtomicLong();
        final AtomicLong peakTotalMemoryReservation = new AtomicLong();
        @Nullable
        volatile WorkProcessorOperator operator;
        @Nullable
        volatile OperatorInfo finalOperatorInfo;

        private WorkProcessorOperatorContext(WorkProcessorOperator operator, int operatorId, PlanNodeId planNodeId, String operatorType, MemoryTrackingContext memoryTrackingContext) {
            this.operator = operator;
            this.operatorId = operatorId;
            this.planNodeId = planNodeId;
            this.operatorType = operatorType;
            this.memoryTrackingContext = memoryTrackingContext;
        }

        void updatePeakMemoryReservations() {
            long userMemory = this.memoryTrackingContext.getUserMemory();
            long revocableMemory = this.memoryTrackingContext.getRevocableMemory();
            long totalMemory = userMemory;
            this.peakUserMemoryReservation.accumulateAndGet(userMemory, Math::max);
            this.peakRevocableMemoryReservation.accumulateAndGet(revocableMemory, Math::max);
            this.peakTotalMemoryReservation.accumulateAndGet(totalMemory, Math::max);
        }
    }

    private static class InternalAggregatedMemoryContext
    implements AggregatedMemoryContext {
        final AggregatedMemoryContext delegate;
        final Runnable allocationListener;

        InternalAggregatedMemoryContext(AggregatedMemoryContext delegate, Runnable allocationListener) {
            this.delegate = Objects.requireNonNull(delegate, "delegate is null");
            this.allocationListener = Objects.requireNonNull(allocationListener, "allocationListener is null");
        }

        public AggregatedMemoryContext newAggregatedMemoryContext() {
            return new InternalAggregatedMemoryContext(this.delegate.newAggregatedMemoryContext(), this.allocationListener);
        }

        public LocalMemoryContext newLocalMemoryContext(String allocationTag) {
            return new InternalLocalMemoryContext(this.delegate.newLocalMemoryContext(allocationTag), this.allocationListener);
        }

        public long getBytes() {
            return this.delegate.getBytes();
        }

        public void close() {
            this.delegate.close();
        }
    }

    private static class InternalLocalMemoryContext
    implements LocalMemoryContext {
        final LocalMemoryContext delegate;
        final Runnable allocationListener;

        InternalLocalMemoryContext(LocalMemoryContext delegate, Runnable allocationListener) {
            this.delegate = Objects.requireNonNull(delegate, "delegate is null");
            this.allocationListener = Objects.requireNonNull(allocationListener, "allocationListener is null");
        }

        public long getBytes() {
            return this.delegate.getBytes();
        }

        public ListenableFuture<Void> setBytes(long bytes) {
            ListenableFuture blocked = this.delegate.setBytes(bytes);
            this.allocationListener.run();
            return blocked;
        }

        public boolean trySetBytes(long bytes) {
            if (this.delegate.trySetBytes(bytes)) {
                this.allocationListener.run();
                return true;
            }
            return false;
        }

        public void close() {
            this.delegate.close();
            this.allocationListener.run();
        }
    }
}

