/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state;

import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.protobuf.ByteString;
import org.apache.beam.repackaged.beam_runners_direct_java.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.DataStreams;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

public class StateRequestHandlers {
    public static StateRequestHandler forMultimapSideInputHandlerFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, MultimapSideInputHandlerFactory multimapSideInputHandlerFactory) {
        return new StateRequestHandlerToMultimapSideInputHandlerFactoryAdapter(processBundleDescriptor, multimapSideInputHandlerFactory);
    }

    static class StateRequestHandlerToMultimapSideInputHandlerFactoryAdapter
    implements StateRequestHandler {
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        private final MultimapSideInputHandlerFactory multimapSideInputHandlerFactory;
        private final ConcurrentHashMap<ProcessBundleDescriptors.MultimapSideInputSpec, MultimapSideInputHandler> cache;

        StateRequestHandlerToMultimapSideInputHandlerFactoryAdapter(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, MultimapSideInputHandlerFactory multimapSideInputHandlerFactory) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.multimapSideInputHandlerFactory = multimapSideInputHandlerFactory;
            this.cache = new ConcurrentHashMap();
        }

        @Override
        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest request) throws Exception {
            try {
                Preconditions.checkState(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT.equals((Object)request.getStateKey().getTypeCase()), "Unsupported %s type %s, expected %s", (Object)BeamFnApi.StateRequest.class.getSimpleName(), (Object)request.getStateKey().getTypeCase(), (Object)BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT);
                BeamFnApi.StateKey.MultimapSideInput stateKey = request.getStateKey().getMultimapSideInput();
                ProcessBundleDescriptors.MultimapSideInputSpec sideInputReferenceSpec = this.processBundleDescriptor.getMultimapSideInputSpecs().get(stateKey.getPtransformId()).get(stateKey.getSideInputId());
                MultimapSideInputHandler handler = this.cache.computeIfAbsent(sideInputReferenceSpec, this::createHandler);
                switch (request.getRequestCase()) {
                    case GET: {
                        return this.handleGetRequest(request, handler);
                    }
                }
                throw new Exception(String.format("Unsupported request type %s for side input.", request.getRequestCase()));
            }
            catch (Exception e) {
                CompletableFuture<BeamFnApi.StateResponse.Builder> f = new CompletableFuture<BeamFnApi.StateResponse.Builder>();
                f.completeExceptionally(e);
                return f;
            }
        }

        private <K, V, W extends BoundedWindow> CompletionStage<BeamFnApi.StateResponse.Builder> handleGetRequest(BeamFnApi.StateRequest request, MultimapSideInputHandler<K, V, W> handler) throws Exception {
            Preconditions.checkState(request.getGet().getContinuationToken().isEmpty(), "Continuation tokens are unsupported.");
            BeamFnApi.StateKey.MultimapSideInput stateKey = request.getStateKey().getMultimapSideInput();
            ProcessBundleDescriptors.MultimapSideInputSpec sideInputReferenceSpec = this.processBundleDescriptor.getMultimapSideInputSpecs().get(stateKey.getPtransformId()).get(stateKey.getSideInputId());
            Object key = sideInputReferenceSpec.keyCoder().decode(stateKey.getKey().newInput());
            BoundedWindow window = (BoundedWindow)sideInputReferenceSpec.windowCoder().decode(stateKey.getWindow().newInput());
            Iterable<V> values = handler.get(key, window);
            ArrayList<ByteString> encodedValues = new ArrayList<ByteString>();
            DataStreams.ElementDelimitedOutputStream outputStream = DataStreams.outbound(encodedValues::add);
            for (V value : values) {
                sideInputReferenceSpec.valueCoder().encode(value, (OutputStream)outputStream);
                outputStream.delimitElement();
            }
            outputStream.close();
            BeamFnApi.StateResponse.Builder response = BeamFnApi.StateResponse.newBuilder();
            response.setId(request.getId());
            response.setGet(BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(encodedValues)).build());
            return CompletableFuture.completedFuture(response);
        }

        private <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> createHandler(ProcessBundleDescriptors.MultimapSideInputSpec cacheKey) {
            return this.multimapSideInputHandlerFactory.forSideInput(cacheKey.transformId(), cacheKey.sideInputId(), cacheKey.keyCoder(), cacheKey.valueCoder(), cacheKey.windowCoder());
        }
    }

    @ThreadSafe
    public static interface BagUserStateHandlerFactory {
        public <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> forUserState(String var1, String var2, Coder<K> var3, Coder<V> var4, Coder<W> var5);

        public static BagUserStateHandlerFactory unsupported() {
            return new BagUserStateHandlerFactory(){

                @Override
                public <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> forUserState(String pTransformId, String userStateId, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
                    throw new UnsupportedOperationException(String.format("The %s does not support handling sides inputs for PTransform %s with user state id %s.", BagUserStateHandler.class.getSimpleName(), pTransformId, userStateId));
                }
            };
        }
    }

    @ThreadSafe
    public static interface BagUserStateHandler<K, V, W extends BoundedWindow> {
        public Iterable<V> get(K var1, W var2);

        public void append(K var1, W var2, Iterator<V> var3);

        public void clear(K var1, W var2);
    }

    @ThreadSafe
    public static interface MultimapSideInputHandlerFactory {
        public <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> forSideInput(String var1, String var2, Coder<K> var3, Coder<V> var4, Coder<W> var5);

        public static MultimapSideInputHandlerFactory unsupported() {
            return new MultimapSideInputHandlerFactory(){

                @Override
                public <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> forSideInput(String pTransformId, String sideInputId, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
                    throw new UnsupportedOperationException(String.format("The %s does not support handling sides inputs for PTransform %s with side input id %s.", MultimapSideInputHandler.class.getSimpleName(), pTransformId, sideInputId));
                }
            };
        }
    }

    @ThreadSafe
    public static interface MultimapSideInputHandler<K, V, W extends BoundedWindow> {
        public Iterable<V> get(K var1, W var2);
    }
}

