/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
import javax.annotation.Nullable;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
import org.apache.tez.util.FastNumberFormat;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShuffleUtils {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
    private static final long MB = 0x100000L;
    static final ThreadLocal<DecimalFormat> MBPS_FORMAT = new ThreadLocal<DecimalFormat>(){

        @Override
        protected DecimalFormat initialValue() {
            return new DecimalFormat("0.00");
        }
    };
    static final ThreadLocal<FastNumberFormat> MBPS_FAST_FORMAT = new ThreadLocal<FastNumberFormat>(){

        @Override
        protected FastNumberFormat initialValue() {
            FastNumberFormat fmt = FastNumberFormat.getInstance();
            fmt.setMinimumIntegerDigits(2);
            return fmt;
        }
    };

    public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta) throws IOException {
        DataInputByteBuffer in = new DataInputByteBuffer();
        in.reset(new ByteBuffer[]{meta});
        Token jt = new Token();
        jt.readFields((DataInput)in);
        SecretKey sk = JobTokenSecretManager.createSecretKey((byte[])jt.getPassword());
        return sk;
    }

    public static ByteBuffer convertJobTokenToBytes(Token<JobTokenIdentifier> jobToken) throws IOException {
        return TezCommonUtils.convertJobTokenToBytes(jobToken);
    }

    public static int deserializeShuffleProviderMetaData(ByteBuffer meta) throws IOException {
        return TezRuntimeUtils.deserializeShuffleProviderMetaData(meta);
    }

    public static void shuffleToMemory(byte[] shuffleData, InputStream input, int decompressedLength, int compressedLength, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, Logger LOG, InputAttemptIdentifier identifier) throws IOException {
        try {
            IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec, ifileReadAhead, ifileReadAheadLength);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Read " + shuffleData.length + " bytes from input for " + identifier);
            }
        }
        catch (Exception | InternalError e) {
            LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength + ", decomp=" + decompressedLength + ". ExceptionMessage=" + e.getMessage());
            ShuffleUtils.ioCleanup(input);
            if (e instanceof InternalError) {
                throw new IOException(e);
            }
            if (e instanceof IOException) {
                throw e;
            }
            throw new IOException(e);
        }
    }

    public static void shuffleToDisk(OutputStream output, String hostIdentifier, InputStream input, long compressedLength, long decompressedLength, Logger LOG, InputAttemptIdentifier identifier, boolean ifileReadAhead, int ifileReadAheadLength, boolean verifyChecksum) throws IOException {
        long bytesLeft;
        try {
            if (verifyChecksum) {
                bytesLeft -= IFile.Reader.readToDisk(output, input, compressedLength, ifileReadAhead, ifileReadAheadLength);
            } else {
                int n;
                int BYTES_TO_READ = 65536;
                byte[] buf = new byte[65536];
                for (bytesLeft = compressedLength; bytesLeft > 0L; bytesLeft -= (long)n) {
                    n = input.read(buf, 0, (int)Math.min(bytesLeft, 65536L));
                    if (n < 0) {
                        throw new IOException("read past end of stream reading " + identifier);
                    }
                    output.write(buf, 0, n);
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Read " + (compressedLength - bytesLeft) + " bytes from input for " + identifier);
            }
            output.close();
        }
        catch (IOException ioe) {
            LOG.info("Failed to read data to disk for " + identifier + ". len=" + compressedLength + ", decomp=" + decompressedLength + ". ExceptionMessage=" + ioe.getMessage());
            ShuffleUtils.ioCleanup(input, output);
            throw ioe;
        }
        if (bytesLeft != 0L) {
            throw new IOException("Incomplete map output received for " + identifier + " from " + hostIdentifier + " (" + bytesLeft + " bytes missing of " + compressedLength + ")");
        }
    }

    public static void ioCleanup(Closeable ... closeables) {
        for (Closeable c : closeables) {
            if (c == null) continue;
            try {
                c.close();
            }
            catch (IOException e) {
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("Exception in closing " + c, (Throwable)e);
            }
        }
    }

    public static StringBuilder constructBaseURIForShuffleHandler(String host, int port, int partition, int partitionCount, String appId, int dagIdentifier, boolean sslShuffle) {
        String http_protocol = sslShuffle ? "https://" : "http://";
        StringBuilder sb = new StringBuilder(http_protocol);
        sb.append(host);
        sb.append(":");
        sb.append(port);
        sb.append("/");
        sb.append("mapOutput?job=");
        sb.append(appId.replace("application", "job"));
        sb.append("&dag=");
        sb.append(String.valueOf(dagIdentifier));
        sb.append("&reduce=");
        sb.append(String.valueOf(partition));
        if (partitionCount > 1) {
            sb.append("-");
            sb.append(String.valueOf(partition + partitionCount - 1));
        }
        sb.append("&map=");
        return sb;
    }

    public static URL constructInputURL(String baseURI, Collection<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException {
        StringBuilder url = new StringBuilder(baseURI);
        boolean first = true;
        for (InputAttemptIdentifier input : inputs) {
            if (first) {
                first = false;
                url.append(input.getPathComponent());
                continue;
            }
            url.append(",").append(input.getPathComponent());
        }
        if (keepAlive) {
            url.append("&keepAlive=true");
        }
        return new URL(url.toString());
    }

    public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url, HttpConnectionParams params, String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
        return TezRuntimeUtils.getHttpConnection(asyncHttp, url, params, logIdentifier, jobTokenSecretManager);
    }

    public static String stringify(ShuffleUserPayloads.DataMovementEventPayloadProto dmProto) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        if (dmProto.hasEmptyPartitions()) {
            sb.append("hasEmptyPartitions: ").append(dmProto.hasEmptyPartitions()).append(", ");
        }
        sb.append("host: " + dmProto.getHost()).append(", ");
        sb.append("port: " + dmProto.getPort()).append(", ");
        sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
        sb.append("runDuration: " + dmProto.getRunDuration());
        sb.append("]");
        return sb.toString();
    }

    static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails, int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context, int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, String auxiliaryService, Deflater deflater) throws IOException {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder payloadBuilder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        boolean outputGenerated = true;
        if (sendEmptyPartitionDetails) {
            BitSet emptyPartitionDetails = new BitSet();
            for (int i = 0; i < spillRecord.size(); ++i) {
                TezIndexRecord indexRecord = spillRecord.getIndex(i);
                if (indexRecord.hasData()) continue;
                emptyPartitionDetails.set(i);
            }
            int emptyPartitions = emptyPartitionDetails.cardinality();
            boolean bl = outputGenerated = spillRecord.size() != emptyPartitions;
            if (emptyPartitions > 0) {
                ByteString emptyPartitionsBytesString = TezCommonUtils.compressByteArrayToByteString((byte[])TezUtilsInternal.toByteArray((BitSet)emptyPartitionDetails), (Deflater)deflater);
                payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
                LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs=" + numPhysicalOutputs + ", emptyPartitions=" + emptyPartitions + ", compressedSize=" + emptyPartitionsBytesString.size());
            }
        }
        if (!sendEmptyPartitionDetails || outputGenerated) {
            String host = context.getExecutionContext().getHostName();
            ByteBuffer shuffleMetadata = context.getServiceProviderMetaData(auxiliaryService);
            int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
            payloadBuilder.setHost(host);
            payloadBuilder.setPort(shufflePort);
            payloadBuilder.setPathComponent(pathComponent);
        }
        if (!finalMergeEnabled) {
            payloadBuilder.setSpillId(spillId);
            payloadBuilder.setLastEvent(isLastEvent);
        }
        payloadBuilder.setRunDuration(0);
        ShuffleUserPayloads.DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
        ByteBuffer payload = payloadProto.toByteString().asReadOnlyByteBuffer();
        return payload;
    }

    public static void generateEventsForNonStartedOutput(List<Event> eventList, int numPhysicalOutputs, OutputContext context, boolean generateVmEvent, boolean isCompositeEvent, Deflater deflater) throws IOException {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder payloadBuilder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        if (generateVmEvent) {
            ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
            vmBuilder.setOutputSize(0L);
            VertexManagerEvent vmEvent = VertexManagerEvent.create((String)context.getDestinationVertexName(), (ByteBuffer)vmBuilder.build().toByteString().asReadOnlyByteBuffer());
            eventList.add((Event)vmEvent);
        }
        LOG.info("Setting all {} partitions as empty for non-started output", (Object)numPhysicalOutputs);
        BitSet emptyPartitionDetails = new BitSet(numPhysicalOutputs);
        emptyPartitionDetails.set(0, numPhysicalOutputs, true);
        ByteString emptyPartitionsBytesString = TezCommonUtils.compressByteArrayToByteString((byte[])TezUtilsInternal.toByteArray((BitSet)emptyPartitionDetails), (Deflater)deflater);
        payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
        payloadBuilder.setRunDuration(0);
        ShuffleUserPayloads.DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
        ByteBuffer dmePayload = payloadProto.toByteString().asReadOnlyByteBuffer();
        if (isCompositeEvent) {
            CompositeDataMovementEvent cdme = CompositeDataMovementEvent.create((int)0, (int)numPhysicalOutputs, (ByteBuffer)dmePayload);
            eventList.add((Event)cdme);
        } else {
            DataMovementEvent dme = DataMovementEvent.create((int)0, (ByteBuffer)dmePayload);
            eventList.add((Event)dme);
        }
    }

    public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled, boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord, int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent, @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, String auxiliaryService, Deflater deflater) throws IOException {
        Preconditions.checkArgument((eventList != null ? 1 : 0) != 0, (Object)"EventList can't be null");
        context.notifyProgress();
        if (finalMergeEnabled) {
            Preconditions.checkArgument((boolean)isLastEvent, (Object)"Can not send multiple events when final merge is enabled");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("pathComponent=" + pathComponent + ", isLastEvent=" + isLastEvent + ", spillId=" + spillId + ", finalMergeDisabled=" + finalMergeEnabled + ", numPhysicalOutputs=" + numPhysicalOutputs);
        }
        ByteBuffer payload = ShuffleUtils.generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs, spillRecord, context, spillId, finalMergeEnabled, isLastEvent, pathComponent, auxiliaryService, deflater);
        if (finalMergeEnabled || isLastEvent) {
            VertexManagerEvent vmEvent = ShuffleUtils.generateVMEvent(context, partitionStats, reportDetailedPartitionStats, deflater);
            eventList.add((Event)vmEvent);
        }
        CompositeDataMovementEvent csdme = CompositeDataMovementEvent.create((int)0, (int)numPhysicalOutputs, (ByteBuffer)payload);
        eventList.add((Event)csdme);
    }

    public static VertexManagerEvent generateVMEvent(OutputContext context, long[] sizePerPartition, boolean reportDetailedPartitionStats, Deflater deflater) throws IOException {
        ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
        long outputSize = context.getCounters().findCounter((Enum)TaskCounter.OUTPUT_BYTES).getValue();
        vmBuilder.setOutputSize(outputSize);
        vmBuilder.setNumRecord(context.getCounters().findCounter((Enum)TaskCounter.OUTPUT_RECORDS).getValue() + context.getCounters().findCounter((Enum)TaskCounter.OUTPUT_LARGE_RECORDS).getValue());
        if (sizePerPartition != null && sizePerPartition.length > 0) {
            if (reportDetailedPartitionStats) {
                vmBuilder.setDetailedPartitionStats(ShuffleUtils.getDetailedPartitionStatsForPhysicalOutput(sizePerPartition));
            } else {
                RoaringBitmap stats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizePerPartition);
                DataOutputBuffer dout = new DataOutputBuffer();
                stats.serialize((DataOutput)dout);
                ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString((byte[])dout.getData(), (Deflater)deflater);
                vmBuilder.setPartitionStats(partitionStatsBytes);
            }
        }
        VertexManagerEvent vmEvent = VertexManagerEvent.create((String)context.getDestinationVertexName(), (ByteBuffer)vmBuilder.build().toByteString().asReadOnlyByteBuffer());
        return vmEvent;
    }

    public static RoaringBitmap getPartitionStatsForPhysicalOutput(long[] sizes) {
        RoaringBitmap partitionStats = new RoaringBitmap();
        if (sizes == null || sizes.length == 0) {
            return partitionStats;
        }
        int RANGE_LEN = DATA_RANGE_IN_MB.values().length;
        for (int i = 0; i < sizes.length; ++i) {
            int bucket = DATA_RANGE_IN_MB.getRange(sizes[i]).ordinal();
            int index = i * RANGE_LEN;
            partitionStats.add(index + bucket);
        }
        return partitionStats;
    }

    static long ceil(long a, long b) {
        return (a + (b - 1L)) / b;
    }

    public static ShuffleUserPayloads.DetailedPartitionStatsProto getDetailedPartitionStatsForPhysicalOutput(long[] sizes) {
        ShuffleUserPayloads.DetailedPartitionStatsProto.Builder builder = ShuffleUserPayloads.DetailedPartitionStatsProto.newBuilder();
        for (int i = 0; i < sizes.length; ++i) {
            int sizeInMb = Ints.checkedCast((long)ShuffleUtils.ceil(sizes[i], 0x100000L));
            builder.addSizeInMb(sizeInMb);
        }
        return builder.build();
    }

    public static HttpConnectionParams getHttpConnectionParams(Configuration conf) {
        return TezRuntimeUtils.getHttpConnectionParams(conf);
    }

    public static boolean isTezShuffleHandler(Configuration config) {
        return config.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle").contains("tez");
    }

    public static class FetchStatsLogger {
        private final Logger activeLogger;
        private final Logger aggregateLogger;
        private final AtomicLong logCount = new AtomicLong();
        private final AtomicLong compressedSize = new AtomicLong();
        private final AtomicLong decompressedSize = new AtomicLong();
        private final AtomicLong totalTime = new AtomicLong();

        public FetchStatsLogger(Logger activeLogger, Logger aggregateLogger) {
            this.activeLogger = activeLogger;
            this.aggregateLogger = aggregateLogger;
        }

        private static StringBuilder toShortString(InputAttemptIdentifier inputAttemptIdentifier, StringBuilder sb) {
            sb.append("{");
            sb.append(inputAttemptIdentifier.getInputIdentifier());
            sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
            sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
            if (inputAttemptIdentifier.getFetchTypeInfo() != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) {
                sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal());
                sb.append(", ").append(inputAttemptIdentifier.getSpillEventId());
            }
            sb.append("}");
            return sb;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
            if (this.activeLogger.isInfoEnabled()) {
                long wholeMBs = 0L;
                long partialMBs = 0L;
                if (millis != 0L) {
                    wholeMBs = bytesCompressed * 3125L / (millis * 32768L);
                    partialMBs = wholeMBs % 100L;
                    wholeMBs /= 100L;
                }
                StringBuilder sb = new StringBuilder("Completed fetch for attempt: ");
                FetchStatsLogger.toShortString(srcAttemptIdentifier, sb);
                sb.append(" to ");
                sb.append(outputType);
                sb.append(", csize=");
                sb.append(bytesCompressed);
                sb.append(", dsize=");
                sb.append(bytesDecompressed);
                sb.append(", EndTime=");
                sb.append(System.currentTimeMillis());
                sb.append(", TimeTaken=");
                sb.append(millis);
                sb.append(", Rate=");
                sb.append(wholeMBs);
                sb.append(".");
                MBPS_FAST_FORMAT.get().format(partialMBs, sb);
                sb.append(" MB/s");
                this.activeLogger.info(sb.toString());
            } else {
                long currentTotalTime;
                long currentDecompressedSize;
                long currentCompressedSize;
                long currentCount;
                FetchStatsLogger fetchStatsLogger = this;
                synchronized (fetchStatsLogger) {
                    currentCount = this.logCount.incrementAndGet();
                    currentCompressedSize = this.compressedSize.addAndGet(bytesCompressed);
                    currentDecompressedSize = this.decompressedSize.addAndGet(bytesDecompressed);
                    currentTotalTime = this.totalTime.addAndGet(millis);
                    if (currentCount % 1000L == 0L) {
                        this.compressedSize.set(0L);
                        this.decompressedSize.set(0L);
                        this.totalTime.set(0L);
                    }
                }
                if (currentCount % 1000L == 0L) {
                    double avgRate = currentTotalTime == 0L ? 0.0 : (double)currentCompressedSize / (double)currentTotalTime / 1000.0 / 1024.0 / 1024.0;
                    this.aggregateLogger.info("Completed {} fetches, stats for last 1000 fetches: avg csize: {}, avg dsize: {}, avgTime: {}, avgRate: {}", new Object[]{currentCount, currentCompressedSize / 1000L, currentDecompressedSize / 1000L, currentTotalTime / 1000L, MBPS_FORMAT.get().format(avgRate)});
                }
            }
        }
    }
}

