/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.pravega.auth.AuthenticationException;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.netty.impl.RawClient;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.impl.ConnectionClosedException;
import io.pravega.client.stream.impl.ModelHelper;
import io.pravega.client.tables.impl.IteratorState;
import io.pravega.client.tables.impl.KeyVersion;
import io.pravega.client.tables.impl.KeyVersionImpl;
import io.pravega.client.tables.impl.TableEntry;
import io.pravega.client.tables.impl.TableEntryImpl;
import io.pravega.client.tables.impl.TableKey;
import io.pravega.client.tables.impl.TableKeyImpl;
import io.pravega.client.tables.impl.TableSegment;
import io.pravega.common.Exceptions;
import io.pravega.common.cluster.Host;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.server.WireCommandFailedException;
import io.pravega.controller.store.host.HostControllerStore;
import io.pravega.controller.store.stream.records.RecordHelper;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.Reply;
import io.pravega.shared.protocol.netty.Request;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommandType;
import io.pravega.shared.protocol.netty.WireCommands;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.LoggerFactory;

public class SegmentHelper
implements AutoCloseable {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(SegmentHelper.class));
    private static final Map<Class<? extends Request>, Set<Class<? extends Reply>>> EXPECTED_SUCCESS_REPLIES = ImmutableMap.builder().put(WireCommands.CreateSegment.class, (Object)ImmutableSet.of(WireCommands.SegmentCreated.class, WireCommands.SegmentAlreadyExists.class)).put(WireCommands.CreateTableSegment.class, (Object)ImmutableSet.of(WireCommands.SegmentCreated.class, WireCommands.SegmentAlreadyExists.class)).put(WireCommands.DeleteSegment.class, (Object)ImmutableSet.of(WireCommands.SegmentDeleted.class, WireCommands.NoSuchSegment.class)).put(WireCommands.DeleteTableSegment.class, (Object)ImmutableSet.of(WireCommands.SegmentDeleted.class, WireCommands.NoSuchSegment.class)).put(WireCommands.UpdateSegmentPolicy.class, (Object)ImmutableSet.of(WireCommands.SegmentPolicyUpdated.class)).put(WireCommands.SealSegment.class, (Object)ImmutableSet.of(WireCommands.SegmentSealed.class, WireCommands.SegmentIsSealed.class)).put(WireCommands.TruncateSegment.class, (Object)ImmutableSet.of(WireCommands.SegmentTruncated.class, WireCommands.SegmentIsTruncated.class)).put(WireCommands.GetStreamSegmentInfo.class, (Object)ImmutableSet.of(WireCommands.StreamSegmentInfo.class, WireCommands.SegmentIsTruncated.class)).put(WireCommands.MergeSegments.class, (Object)ImmutableSet.of(WireCommands.SegmentsMerged.class, WireCommands.NoSuchSegment.class)).put(WireCommands.UpdateTableEntries.class, (Object)ImmutableSet.of(WireCommands.TableEntriesUpdated.class)).put(WireCommands.RemoveTableKeys.class, (Object)ImmutableSet.of(WireCommands.TableKeysRemoved.class, WireCommands.TableKeyDoesNotExist.class)).put(WireCommands.ReadTable.class, (Object)ImmutableSet.of(WireCommands.TableRead.class)).put(WireCommands.ReadTableKeys.class, (Object)ImmutableSet.of(WireCommands.TableKeysRead.class)).put(WireCommands.ReadTableEntries.class, (Object)ImmutableSet.of(WireCommands.TableEntriesRead.class)).build();
    private static final Map<Class<? extends Request>, Set<Class<? extends Reply>>> EXPECTED_FAILING_REPLIES = ImmutableMap.builder().put(WireCommands.UpdateTableEntries.class, (Object)ImmutableSet.of(WireCommands.TableKeyDoesNotExist.class, WireCommands.TableKeyBadVersion.class, WireCommands.NoSuchSegment.class)).put(WireCommands.RemoveTableKeys.class, (Object)ImmutableSet.of(WireCommands.TableKeyBadVersion.class, WireCommands.NoSuchSegment.class)).put(WireCommands.DeleteTableSegment.class, (Object)ImmutableSet.of(WireCommands.TableSegmentNotEmpty.class)).put(WireCommands.ReadTable.class, (Object)ImmutableSet.of(WireCommands.NoSuchSegment.class)).put(WireCommands.ReadTableKeys.class, (Object)ImmutableSet.of(WireCommands.NoSuchSegment.class)).put(WireCommands.ReadTableEntries.class, (Object)ImmutableSet.of(WireCommands.NoSuchSegment.class)).build();
    private final HostControllerStore hostStore;
    private final ConnectionFactory connectionFactory;

    public SegmentHelper(ConnectionFactory clientCF, HostControllerStore hostStore) {
        this.connectionFactory = clientCF;
        this.hostStore = hostStore;
    }

    public Controller.NodeUri getSegmentUri(String scope, String stream, long segmentId) {
        Host host = this.hostStore.getHostForSegment(scope, stream, segmentId);
        return Controller.NodeUri.newBuilder().setEndpoint(host.getIpAddr()).setPort(host.getPort()).build();
    }

    public Controller.NodeUri getTableUri(String tableName) {
        Host host = this.hostStore.getHostForTableSegment(tableName);
        return Controller.NodeUri.newBuilder().setEndpoint(host.getIpAddr()).setPort(host.getPort()).build();
    }

    public CompletableFuture<Void> createSegment(String scope, String stream, long segmentId, ScalingPolicy policy, String controllerToken, long clientRequestId) {
        String qualifiedStreamSegmentName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)scope, (String)stream, (long)segmentId);
        Controller.NodeUri uri = this.getSegmentUri(scope, stream, segmentId);
        WireCommandType type = WireCommandType.CREATE_SEGMENT;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        Pair<Byte, Integer> extracted = this.extractFromPolicy(policy);
        return this.sendRequest(connection, requestId, new WireCommands.CreateSegment(requestId, qualifiedStreamSegmentName, ((Byte)extracted.getLeft()).byteValue(), ((Integer)extracted.getRight()).intValue(), controllerToken)).thenAccept(r -> this.handleReply(clientRequestId, (Reply)r, connection, qualifiedStreamSegmentName, (Class<? extends Request>)WireCommands.CreateSegment.class, type));
    }

    public CompletableFuture<Void> truncateSegment(String scope, String stream, long segmentId, long offset, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getSegmentUri(scope, stream, segmentId);
        String qualifiedStreamSegmentName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)scope, (String)stream, (long)segmentId);
        WireCommandType type = WireCommandType.TRUNCATE_SEGMENT;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        return this.sendRequest(connection, requestId, new WireCommands.TruncateSegment(requestId, qualifiedStreamSegmentName, offset, delegationToken)).thenAccept(r -> this.handleReply(clientRequestId, (Reply)r, connection, qualifiedStreamSegmentName, (Class<? extends Request>)WireCommands.TruncateSegment.class, type));
    }

    public CompletableFuture<Void> deleteSegment(String scope, String stream, long segmentId, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getSegmentUri(scope, stream, segmentId);
        String qualifiedStreamSegmentName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)scope, (String)stream, (long)segmentId);
        WireCommandType type = WireCommandType.DELETE_SEGMENT;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        return this.sendRequest(connection, requestId, new WireCommands.DeleteSegment(requestId, qualifiedStreamSegmentName, delegationToken)).thenAccept(r -> this.handleReply(clientRequestId, (Reply)r, connection, qualifiedStreamSegmentName, (Class<? extends Request>)WireCommands.DeleteSegment.class, type));
    }

    public CompletableFuture<Void> sealSegment(String scope, String stream, long segmentId, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getSegmentUri(scope, stream, segmentId);
        String qualifiedName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)scope, (String)stream, (long)segmentId);
        WireCommandType type = WireCommandType.SEAL_SEGMENT;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        return this.sendRequest(connection, requestId, new WireCommands.SealSegment(requestId, qualifiedName, delegationToken)).thenAccept(r -> this.handleReply(clientRequestId, (Reply)r, connection, qualifiedName, (Class<? extends Request>)WireCommands.SealSegment.class, type));
    }

    public CompletableFuture<Void> createTransaction(String scope, String stream, long segmentId, UUID txId, String delegationToken) {
        Controller.NodeUri uri = this.getSegmentUri(scope, stream, segmentId);
        String transactionName = this.getTransactionName(scope, stream, segmentId, txId);
        WireCommandType type = WireCommandType.CREATE_SEGMENT;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        WireCommands.CreateSegment request = new WireCommands.CreateSegment(requestId, transactionName, WireCommands.CreateSegment.NO_SCALE, 0, delegationToken);
        return this.sendRequest(connection, requestId, request).thenAccept(r -> this.handleReply(requestId, (Reply)r, connection, transactionName, (Class<? extends Request>)WireCommands.CreateSegment.class, type));
    }

    private String getTransactionName(String scope, String stream, long segmentId, UUID txId) {
        long generalizedSegmentId = RecordHelper.generalizedSegmentId(segmentId, txId);
        String qualifiedName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)scope, (String)stream, (long)generalizedSegmentId);
        return StreamSegmentNameUtils.getTransactionNameFromId((String)qualifiedName, (UUID)txId);
    }

    public CompletableFuture<Controller.TxnStatus> commitTransaction(String scope, String stream, long targetSegmentId, long sourceSegmentId, UUID txId, String delegationToken) {
        Preconditions.checkArgument((StreamSegmentNameUtils.getSegmentNumber((long)targetSegmentId) == StreamSegmentNameUtils.getSegmentNumber((long)sourceSegmentId) ? 1 : 0) != 0);
        Controller.NodeUri uri = this.getSegmentUri(scope, stream, sourceSegmentId);
        String qualifiedNameTarget = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)scope, (String)stream, (long)targetSegmentId);
        String transactionName = this.getTransactionName(scope, stream, sourceSegmentId, txId);
        WireCommandType type = WireCommandType.MERGE_SEGMENTS;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        WireCommands.MergeSegments request = new WireCommands.MergeSegments(requestId, qualifiedNameTarget, transactionName, delegationToken);
        return this.sendRequest(connection, requestId, request).thenApply(r -> {
            this.handleReply(requestId, (Reply)r, connection, transactionName, (Class<? extends Request>)WireCommands.MergeSegments.class, type);
            if (r instanceof WireCommands.NoSuchSegment) {
                WireCommands.NoSuchSegment reply = (WireCommands.NoSuchSegment)r;
                if (reply.getSegment().equals(transactionName)) {
                    return Controller.TxnStatus.newBuilder().setStatus(Controller.TxnStatus.Status.SUCCESS).build();
                }
                log.error(requestId, "Commit Transaction: Source segment {} not found.", new Object[]{reply.getSegment()});
                return Controller.TxnStatus.newBuilder().setStatus(Controller.TxnStatus.Status.FAILURE).build();
            }
            return Controller.TxnStatus.newBuilder().setStatus(Controller.TxnStatus.Status.SUCCESS).build();
        });
    }

    public CompletableFuture<Controller.TxnStatus> abortTransaction(String scope, String stream, long segmentId, UUID txId, String delegationToken) {
        String transactionName = this.getTransactionName(scope, stream, segmentId, txId);
        Controller.NodeUri uri = this.getSegmentUri(scope, stream, segmentId);
        WireCommandType type = WireCommandType.DELETE_SEGMENT;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        WireCommands.DeleteSegment request = new WireCommands.DeleteSegment(requestId, transactionName, delegationToken);
        return ((CompletableFuture)this.sendRequest(connection, requestId, request).thenAccept(r -> this.handleReply(requestId, (Reply)r, connection, transactionName, (Class<? extends Request>)WireCommands.DeleteSegment.class, type))).thenApply(v -> Controller.TxnStatus.newBuilder().setStatus(Controller.TxnStatus.Status.SUCCESS).build());
    }

    public CompletableFuture<Void> updatePolicy(String scope, String stream, ScalingPolicy policy, long segmentId, String delegationToken, long clientRequestId) {
        String qualifiedName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)scope, (String)stream, (long)segmentId);
        Controller.NodeUri uri = this.getSegmentUri(scope, stream, segmentId);
        WireCommandType type = WireCommandType.UPDATE_SEGMENT_POLICY;
        Pair<Byte, Integer> extracted = this.extractFromPolicy(policy);
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        WireCommands.UpdateSegmentPolicy request = new WireCommands.UpdateSegmentPolicy(requestId, qualifiedName, ((Byte)extracted.getLeft()).byteValue(), ((Integer)extracted.getRight()).intValue(), delegationToken);
        return this.sendRequest(connection, requestId, request).thenAccept(r -> this.handleReply(clientRequestId, (Reply)r, connection, qualifiedName, (Class<? extends Request>)WireCommands.UpdateSegmentPolicy.class, type));
    }

    public CompletableFuture<WireCommands.StreamSegmentInfo> getSegmentInfo(String scope, String stream, long segmentId, String delegationToken) {
        String qualifiedName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)scope, (String)stream, (long)segmentId);
        Controller.NodeUri uri = this.getSegmentUri(scope, stream, segmentId);
        WireCommandType type = WireCommandType.GET_STREAM_SEGMENT_INFO;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        WireCommands.GetStreamSegmentInfo request = new WireCommands.GetStreamSegmentInfo(requestId, qualifiedName, delegationToken);
        return this.sendRequest(connection, requestId, request).thenApply(r -> {
            this.handleReply(requestId, (Reply)r, connection, qualifiedName, (Class<? extends Request>)WireCommands.GetStreamSegmentInfo.class, type);
            assert (r instanceof WireCommands.StreamSegmentInfo);
            return (WireCommands.StreamSegmentInfo)r;
        });
    }

    public CompletableFuture<Void> createTableSegment(String tableName, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getTableUri(tableName);
        WireCommandType type = WireCommandType.CREATE_TABLE_SEGMENT;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        return this.sendRequest(connection, requestId, new WireCommands.CreateTableSegment(requestId, tableName, delegationToken)).thenAccept(rpl -> this.handleReply(clientRequestId, (Reply)rpl, connection, tableName, (Class<? extends Request>)WireCommands.CreateTableSegment.class, type));
    }

    public CompletableFuture<Void> deleteTableSegment(String tableName, boolean mustBeEmpty, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getTableUri(tableName);
        WireCommandType type = WireCommandType.DELETE_TABLE_SEGMENT;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        return this.sendRequest(connection, requestId, new WireCommands.DeleteTableSegment(requestId, tableName, mustBeEmpty, delegationToken)).thenAccept(rpl -> this.handleReply(clientRequestId, (Reply)rpl, connection, tableName, (Class<? extends Request>)WireCommands.DeleteTableSegment.class, type));
    }

    public CompletableFuture<List<KeyVersion>> updateTableEntries(String tableName, List<TableEntry<byte[], byte[]>> entries, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getTableUri(tableName);
        WireCommandType type = WireCommandType.UPDATE_TABLE_ENTRIES;
        ArrayList buffersToRelease = new ArrayList();
        List wireCommandEntries = entries.stream().map(te -> {
            WireCommands.TableKey key = this.convertToWireCommand((TableKey<byte[]>)te.getKey());
            ByteBuf valueBuffer = Unpooled.wrappedBuffer((byte[])((byte[])te.getValue()));
            buffersToRelease.add(key.getData());
            buffersToRelease.add(valueBuffer);
            WireCommands.TableValue value = new WireCommands.TableValue(valueBuffer);
            return new AbstractMap.SimpleImmutableEntry<WireCommands.TableKey, WireCommands.TableValue>(key, value);
        }).collect(Collectors.toList());
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        WireCommands.UpdateTableEntries request = new WireCommands.UpdateTableEntries(requestId, tableName, delegationToken, new WireCommands.TableEntries(wireCommandEntries));
        return ((CompletableFuture)this.sendRequest(connection, requestId, request).thenApply(rpl -> {
            this.handleReply(clientRequestId, (Reply)rpl, connection, tableName, (Class<? extends Request>)WireCommands.UpdateTableEntries.class, type);
            List result = ((WireCommands.TableEntriesUpdated)rpl).getUpdatedVersions().stream().map(KeyVersionImpl::new).collect(Collectors.toList());
            return result;
        })).whenComplete((r, e) -> this.release(buffersToRelease));
    }

    public CompletableFuture<Void> removeTableKeys(String tableName, List<TableKey<byte[]>> keys, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getTableUri(tableName);
        WireCommandType type = WireCommandType.REMOVE_TABLE_KEYS;
        ArrayList buffersToRelease = new ArrayList(keys.size());
        List keyList = keys.stream().map(x -> {
            WireCommands.TableKey key = this.convertToWireCommand((TableKey<byte[]>)x);
            buffersToRelease.add(key.getData());
            return key;
        }).collect(Collectors.toList());
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        WireCommands.RemoveTableKeys request = new WireCommands.RemoveTableKeys(requestId, tableName, delegationToken, keyList);
        return ((CompletableFuture)this.sendRequest(connection, requestId, request).thenAccept(rpl -> this.handleReply(clientRequestId, (Reply)rpl, connection, tableName, (Class<? extends Request>)WireCommands.RemoveTableKeys.class, type))).whenComplete((r, e) -> this.release(buffersToRelease));
    }

    public CompletableFuture<List<TableEntry<byte[], byte[]>>> readTable(String tableName, List<TableKey<byte[]>> keys, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getTableUri(tableName);
        WireCommandType type = WireCommandType.READ_TABLE;
        ArrayList buffersToRelease = new ArrayList();
        List keyList = keys.stream().map(k -> {
            ByteBuf buffer = Unpooled.wrappedBuffer((byte[])((byte[])k.getKey()));
            buffersToRelease.add(buffer);
            return new WireCommands.TableKey(buffer, Long.MIN_VALUE);
        }).collect(Collectors.toList());
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        WireCommands.ReadTable request = new WireCommands.ReadTable(requestId, tableName, delegationToken, keyList);
        return ((CompletableFuture)this.sendRequest(connection, requestId, request).thenApply(rpl -> {
            this.handleReply(clientRequestId, (Reply)rpl, connection, tableName, (Class<? extends Request>)WireCommands.ReadTable.class, type);
            List tableEntries = ((WireCommands.TableRead)rpl).getEntries().getEntries().stream().map(e -> new TableEntryImpl(this.convertFromWireCommand((WireCommands.TableKey)e.getKey()), (Object)this.getArray(((WireCommands.TableValue)e.getValue()).getData()))).collect(Collectors.toList());
            return tableEntries;
        })).whenComplete((r, e) -> this.release(buffersToRelease));
    }

    public CompletableFuture<TableSegment.IteratorItem<TableKey<byte[]>>> readTableKeys(String tableName, int suggestedKeyCount, IteratorState state, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getTableUri(tableName);
        WireCommandType type = WireCommandType.READ_TABLE_KEYS;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        IteratorState token = state == null ? IteratorState.EMPTY : state;
        WireCommands.ReadTableKeys request = new WireCommands.ReadTableKeys(requestId, tableName, delegationToken, suggestedKeyCount, token.toBytes());
        return this.sendRequest(connection, requestId, request).thenApply(rpl -> {
            this.handleReply(clientRequestId, (Reply)rpl, connection, tableName, (Class<? extends Request>)WireCommands.ReadTableKeys.class, type);
            WireCommands.TableKeysRead tableKeysRead = (WireCommands.TableKeysRead)rpl;
            IteratorState newState = IteratorState.fromBytes((ByteBuf)tableKeysRead.getContinuationToken());
            List keys = tableKeysRead.getKeys().stream().map(k -> new TableKeyImpl((Object)this.getArray(k.getData()), (KeyVersion)new KeyVersionImpl(k.getKeyVersion()))).collect(Collectors.toList());
            return new TableSegment.IteratorItem(newState, keys);
        });
    }

    public CompletableFuture<TableSegment.IteratorItem<TableEntry<byte[], byte[]>>> readTableEntries(String tableName, int suggestedEntryCount, IteratorState state, String delegationToken, long clientRequestId) {
        Controller.NodeUri uri = this.getTableUri(tableName);
        WireCommandType type = WireCommandType.READ_TABLE_ENTRIES;
        RawClient connection = new RawClient(ModelHelper.encode((Controller.NodeUri)uri), this.connectionFactory);
        long requestId = connection.getFlow().asLong();
        IteratorState token = state == null ? IteratorState.EMPTY : state;
        WireCommands.ReadTableEntries request = new WireCommands.ReadTableEntries(requestId, tableName, delegationToken, suggestedEntryCount, token.toBytes());
        return this.sendRequest(connection, requestId, request).thenApply(rpl -> {
            this.handleReply(clientRequestId, (Reply)rpl, connection, tableName, (Class<? extends Request>)WireCommands.ReadTableEntries.class, type);
            WireCommands.TableEntriesRead tableEntriesRead = (WireCommands.TableEntriesRead)rpl;
            IteratorState newState = IteratorState.fromBytes((ByteBuf)tableEntriesRead.getContinuationToken());
            List entries = tableEntriesRead.getEntries().getEntries().stream().map(e -> {
                WireCommands.TableKey k = (WireCommands.TableKey)e.getKey();
                TableKeyImpl tableKey = new TableKeyImpl((Object)this.getArray(k.getData()), (KeyVersion)new KeyVersionImpl(k.getKeyVersion()));
                return new TableEntryImpl((TableKey)tableKey, (Object)this.getArray(((WireCommands.TableValue)e.getValue()).getData()));
            }).collect(Collectors.toList());
            return new TableSegment.IteratorItem(newState, entries);
        });
    }

    private byte[] getArray(ByteBuf buf) {
        byte[] bytes = new byte[buf.readableBytes()];
        int readerIndex = buf.readerIndex();
        buf.getBytes(readerIndex, bytes);
        this.release(Collections.singleton(buf));
        return bytes;
    }

    private void release(Collection<ByteBuf> buffers) {
        buffers.forEach(ReferenceCountUtil::safeRelease);
    }

    private WireCommands.TableKey convertToWireCommand(TableKey<byte[]> k) {
        WireCommands.TableKey key = k.getVersion() == null || k.getVersion() == KeyVersion.NO_VERSION ? new WireCommands.TableKey(Unpooled.wrappedBuffer((byte[])((byte[])k.getKey())), Long.MIN_VALUE) : new WireCommands.TableKey(Unpooled.wrappedBuffer((byte[])((byte[])k.getKey())), k.getVersion().getSegmentVersion());
        return key;
    }

    private TableKey<byte[]> convertFromWireCommand(WireCommands.TableKey k) {
        TableKeyImpl key = k.getKeyVersion() == -1L ? new TableKeyImpl((Object)this.getArray(k.getData()), KeyVersion.NOT_EXISTS) : new TableKeyImpl((Object)this.getArray(k.getData()), (KeyVersion)new KeyVersionImpl(k.getKeyVersion()));
        return key;
    }

    private Pair<Byte, Integer> extractFromPolicy(ScalingPolicy policy) {
        byte rateType;
        int desiredRate;
        if (policy.getScaleType().equals((Object)ScalingPolicy.ScaleType.FIXED_NUM_SEGMENTS)) {
            desiredRate = 0;
            rateType = WireCommands.CreateSegment.NO_SCALE;
        } else {
            desiredRate = Math.toIntExact(policy.getTargetRate());
            rateType = policy.getScaleType().equals((Object)ScalingPolicy.ScaleType.BY_RATE_IN_KBYTES_PER_SEC) ? WireCommands.CreateSegment.IN_KBYTES_PER_SEC : WireCommands.CreateSegment.IN_EVENTS_PER_SEC;
        }
        return new ImmutablePair((Object)rateType, (Object)desiredRate);
    }

    private void closeConnection(Reply reply, RawClient client) {
        log.info("Closing connection as a result of receiving: {}", (Object)reply);
        if (client != null) {
            try {
                client.close();
            }
            catch (Exception e) {
                log.warn("Exception tearing down connection: ", (Throwable)e);
            }
        }
    }

    private <T extends Request & WireCommand> CompletableFuture<Reply> sendRequest(RawClient connection, long requestId, T request) {
        return connection.sendRequest(requestId, request).exceptionally(e -> {
            Throwable unwrap = Exceptions.unwrap((Throwable)e);
            if (unwrap instanceof ConnectionFailedException || unwrap instanceof ConnectionClosedException) {
                log.warn(requestId, "Connection dropped", new Object[0]);
                throw new WireCommandFailedException(((WireCommand)request).getType(), WireCommandFailedException.Reason.ConnectionFailed);
            }
            if (unwrap instanceof AuthenticationException) {
                log.warn(requestId, "Authentication Exception", new Object[0]);
                throw new WireCommandFailedException(((WireCommand)request).getType(), WireCommandFailedException.Reason.AuthFailed);
            }
            log.error(requestId, "Request failed", new Object[]{e});
            throw new CompletionException((Throwable)e);
        });
    }

    private void handleReply(long callerRequestId, Reply reply, RawClient client, String qualifiedStreamSegmentName, Class<? extends Request> requestType, WireCommandType type) {
        block10: {
            this.closeConnection(reply, client);
            Set<Class<? extends Reply>> expectedReplies = EXPECTED_SUCCESS_REPLIES.get(requestType);
            Set<Class<? extends Reply>> expectedFailingReplies = EXPECTED_FAILING_REPLIES.get(requestType);
            if (expectedReplies != null && expectedReplies.contains(reply.getClass())) {
                log.info(callerRequestId, "{} {} {} {}.", new Object[]{requestType.getSimpleName(), qualifiedStreamSegmentName, reply.getClass().getSimpleName(), reply.getRequestId()});
                break block10;
            }
            if (expectedFailingReplies != null && expectedFailingReplies.contains(reply.getClass())) {
                log.info(callerRequestId, "{} {} {} {}.", new Object[]{requestType.getSimpleName(), qualifiedStreamSegmentName, reply.getClass().getSimpleName(), reply.getRequestId()});
                if (reply instanceof WireCommands.NoSuchSegment) {
                    throw new WireCommandFailedException(type, WireCommandFailedException.Reason.SegmentDoesNotExist);
                }
                if (reply instanceof WireCommands.TableSegmentNotEmpty) {
                    throw new WireCommandFailedException(type, WireCommandFailedException.Reason.TableSegmentNotEmpty);
                }
                if (reply instanceof WireCommands.TableKeyDoesNotExist) {
                    throw new WireCommandFailedException(type, WireCommandFailedException.Reason.TableKeyDoesNotExist);
                }
                if (reply instanceof WireCommands.TableKeyBadVersion) {
                    throw new WireCommandFailedException(type, WireCommandFailedException.Reason.TableKeyBadVersion);
                }
                break block10;
            }
            if (reply instanceof WireCommands.AuthTokenCheckFailed) {
                log.warn(callerRequestId, "Auth Check Failed {} {} {} {}.", new Object[]{requestType.getSimpleName(), qualifiedStreamSegmentName, reply.getClass().getSimpleName(), reply.getRequestId()});
                throw new WireCommandFailedException((Throwable)new AuthenticationException(reply.toString()), type, WireCommandFailedException.Reason.AuthFailed);
            }
            if (reply instanceof WireCommands.WrongHost) {
                log.warn(callerRequestId, "Wrong Host {} {} {} {}.", new Object[]{requestType.getSimpleName(), qualifiedStreamSegmentName, reply.getClass().getSimpleName(), reply.getRequestId()});
                throw new WireCommandFailedException(type, WireCommandFailedException.Reason.UnknownHost);
            }
            log.error(callerRequestId, "Unexpected reply {} {} {} {}.", new Object[]{requestType.getSimpleName(), qualifiedStreamSegmentName, reply.getClass().getSimpleName(), reply.getRequestId()});
            throw new ConnectionFailedException("Unexpected reply of " + reply + " when expecting one of " + expectedReplies.stream().map(Object::toString).collect(Collectors.joining(", ")));
        }
    }

    @Override
    public void close() {
    }
}

