/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.fnexecution.translation;

import com.google.auto.value.AutoValue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.direct_java.runners.core.construction.graph.SideInputReference;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.translation.AutoValue_BatchSideInputHandlerFactory_SideInputKey;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;

public class BatchSideInputHandlerFactory
implements StateRequestHandlers.SideInputHandlerFactory {
    private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PipelineNode.PCollectionNode> sideInputToCollection;
    private final SideInputGetter sideInputGetter;

    public static BatchSideInputHandlerFactory forStage(ExecutableStage stage, SideInputGetter sideInputGetter) {
        ImmutableMap.Builder sideInputBuilder = ImmutableMap.builder();
        for (SideInputReference sideInput : stage.getSideInputs()) {
            sideInputBuilder.put((Object)RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(sideInput.transform().getId()).setLocalName(sideInput.localName()).build(), (Object)sideInput.collection());
        }
        return new BatchSideInputHandlerFactory((Map<RunnerApi.ExecutableStagePayload.SideInputId, PipelineNode.PCollectionNode>)sideInputBuilder.build(), sideInputGetter);
    }

    private BatchSideInputHandlerFactory(Map<RunnerApi.ExecutableStagePayload.SideInputId, PipelineNode.PCollectionNode> sideInputToCollection, SideInputGetter sideInputGetter) {
        this.sideInputToCollection = sideInputToCollection;
        this.sideInputGetter = sideInputGetter;
    }

    @Override
    public <T, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forSideInput(String transformId, String sideInputId, RunnerApi.FunctionSpec accessPattern, Coder<T> elementCoder, Coder<W> windowCoder) {
        PipelineNode.PCollectionNode collectionNode = this.sideInputToCollection.get(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
        Preconditions.checkArgument((collectionNode != null ? 1 : 0) != 0, (String)"No side input for %s/%s", (Object)transformId, (Object)sideInputId);
        if (PTransformTranslation.ITERABLE_SIDE_INPUT.equals(accessPattern.getUrn())) {
            Coder<T> outputCoder = elementCoder;
            return this.forIterableSideInput(this.sideInputGetter.getSideInput(collectionNode.getId()), outputCoder, windowCoder);
        }
        if (PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())) {
            KvCoder kvCoder = (KvCoder)elementCoder;
            return this.forMultimapSideInput(this.sideInputGetter.getSideInput(collectionNode.getId()), kvCoder.getKeyCoder(), kvCoder.getValueCoder(), windowCoder);
        }
        throw new IllegalArgumentException(String.format("Unknown side input access pattern: %s", accessPattern));
    }

    private <T, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<T, W> forIterableSideInput(List<WindowedValue<T>> broadcastVariable, final Coder<T> elementCoder, final Coder<W> windowCoder) {
        ImmutableMultimap.Builder windowToValuesBuilder = ImmutableMultimap.builder();
        for (WindowedValue<T> windowedValue : broadcastVariable) {
            Iterator iterator = windowedValue.getWindows().iterator();
            while (iterator.hasNext()) {
                BoundedWindow boundedWindow;
                BoundedWindow window = boundedWindow = (BoundedWindow)iterator.next();
                windowToValuesBuilder.put(windowCoder.structuralValue((Object)window), windowedValue.getValue());
            }
        }
        final ImmutableMultimap windowToValues = windowToValuesBuilder.build();
        return new StateRequestHandlers.SideInputHandler<T, W>(){

            @Override
            public Iterable<T> get(byte[] key, W window) {
                return windowToValues.get(windowCoder.structuralValue(window));
            }

            @Override
            public Coder<T> resultCoder() {
                return elementCoder;
            }
        };
    }

    private <K, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forMultimapSideInput(List<WindowedValue<KV<K, V>>> broadcastVariable, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
        ImmutableMultimap.Builder multimap = ImmutableMultimap.builder();
        for (WindowedValue<KV<K, V>> windowedValue : broadcastVariable) {
            Object key = ((KV)windowedValue.getValue()).getKey();
            Object value = ((KV)windowedValue.getValue()).getValue();
            Iterator iterator = windowedValue.getWindows().iterator();
            while (iterator.hasNext()) {
                BoundedWindow boundedWindow;
                BoundedWindow window = boundedWindow = (BoundedWindow)iterator.next();
                multimap.put((Object)SideInputKey.of(keyCoder.structuralValue(key), windowCoder.structuralValue((Object)window)), value);
            }
        }
        return new MultimapSideInputHandler((Multimap)multimap.build(), keyCoder, valueCoder, windowCoder);
    }

    @AutoValue
    static abstract class SideInputKey {
        SideInputKey() {
        }

        static SideInputKey of(Object key, Object window) {
            return new AutoValue_BatchSideInputHandlerFactory_SideInputKey(key, window);
        }

        @Nullable
        abstract Object key();

        abstract Object window();
    }

    private static class MultimapSideInputHandler<K, V, W extends BoundedWindow>
    implements StateRequestHandlers.SideInputHandler<V, W> {
        private final Multimap<SideInputKey, V> collection;
        private final Coder<K> keyCoder;
        private final Coder<V> valueCoder;
        private final Coder<W> windowCoder;

        private MultimapSideInputHandler(Multimap<SideInputKey, V> collection, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
            this.collection = collection;
            this.keyCoder = keyCoder;
            this.valueCoder = valueCoder;
            this.windowCoder = windowCoder;
        }

        @Override
        public Iterable<V> get(byte[] keyBytes, W window) {
            Object key;
            try {
                key = this.keyCoder.decode((InputStream)new ByteArrayInputStream(keyBytes));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return this.collection.get((Object)SideInputKey.of(this.keyCoder.structuralValue(key), this.windowCoder.structuralValue(window)));
        }

        @Override
        public Coder<V> resultCoder() {
            return this.valueCoder;
        }
    }

    public static interface SideInputGetter {
        public <T> List<T> getSideInput(String var1);
    }
}

