/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.pinot;

import com.facebook.presto.pinot.PinotClusterInfoFetcher;
import com.facebook.presto.pinot.PinotColumnHandle;
import com.facebook.presto.pinot.PinotConnection;
import com.facebook.presto.pinot.PinotErrorCode;
import com.facebook.presto.pinot.PinotException;
import com.facebook.presto.pinot.PinotSessionProperties;
import com.facebook.presto.pinot.PinotSplit;
import com.facebook.presto.pinot.PinotTableHandle;
import com.facebook.presto.pinot.PinotTableLayoutHandle;
import com.facebook.presto.pinot.query.PinotQueryGenerator;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import javax.inject.Inject;
import org.apache.pinot.spi.config.table.TableType;

public class PinotSplitManager
implements ConnectorSplitManager {
    private static final String REALTIME_SUFFIX = "_" + TableType.REALTIME;
    private static final String OFFLINE_SUFFIX = "_" + TableType.OFFLINE;
    private final String connectorId;
    private final PinotConnection pinotPrestoConnection;

    @Inject
    public PinotSplitManager(ConnectorId connectorId, PinotConnection pinotPrestoConnection) {
        this.connectorId = Objects.requireNonNull(connectorId, "connectorId is null").toString();
        this.pinotPrestoConnection = Objects.requireNonNull(pinotPrestoConnection, "pinotPrestoConnection is null");
    }

    protected ConnectorSplitSource generateSplitForBrokerBasedScan(PinotQueryGenerator.GeneratedPinotQuery brokerPinotQuery, List<PinotColumnHandle> expectedColumnHandles) {
        return new FixedSplitSource(Collections.singletonList(PinotSplit.createBrokerSplit(this.connectorId, expectedColumnHandles, brokerPinotQuery)));
    }

    protected ConnectorSplitSource generateSplitsForSegmentBasedScan(PinotTableLayoutHandle pinotLayoutHandle, ConnectorSession session, List<PinotColumnHandle> expectedColumnHandles) {
        PinotTableHandle tableHandle = pinotLayoutHandle.getTable();
        String tableName = tableHandle.getTableName();
        Map<String, Map<String, List<String>>> routingTable = this.pinotPrestoConnection.getRoutingTable(tableName);
        ArrayList<ConnectorSplit> splits = new ArrayList<ConnectorSplit>();
        if (!routingTable.isEmpty()) {
            PinotQueryGenerator.GeneratedPinotQuery segmentPinotQuery = tableHandle.getPinotQuery().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Expected to find realtime and offline pinot query in " + tableHandle));
            PinotClusterInfoFetcher.TimeBoundary timeBoundary = new PinotClusterInfoFetcher.TimeBoundary(null, null);
            if (routingTable.containsKey(tableName + REALTIME_SUFFIX) && routingTable.containsKey(tableName + OFFLINE_SUFFIX)) {
                timeBoundary = this.pinotPrestoConnection.getTimeBoundary(tableName);
            }
            String realtime = this.getSegmentPinotQuery(segmentPinotQuery, REALTIME_SUFFIX, timeBoundary.getOnlineTimePredicate());
            String offline = this.getSegmentPinotQuery(segmentPinotQuery, OFFLINE_SUFFIX, timeBoundary.getOfflineTimePredicate());
            this.generateSegmentSplits(splits, expectedColumnHandles, routingTable, tableName, "_REALTIME", session, realtime);
            this.generateSegmentSplits(splits, expectedColumnHandles, routingTable, tableName, "_OFFLINE", session, offline);
        }
        Collections.shuffle(splits);
        return new FixedSplitSource(splits);
    }

    private String getSegmentPinotQuery(PinotQueryGenerator.GeneratedPinotQuery basePinotQuery, String suffix, Optional<String> timePredicate) {
        String pinotQuery = basePinotQuery.getQuery().replace("__TABLE_NAME_SUFFIX_TEMPLATE__", suffix);
        if (timePredicate.isPresent()) {
            String tp = timePredicate.get();
            pinotQuery = pinotQuery.replace("__TIME_BOUNDARY_FILTER_TEMPLATE__", basePinotQuery.isHaveFilter() ? " AND " + tp : " WHERE " + tp);
        } else {
            pinotQuery = pinotQuery.replace("__TIME_BOUNDARY_FILTER_TEMPLATE__", "");
        }
        return pinotQuery;
    }

    protected void generateSegmentSplits(List<ConnectorSplit> splits, List<PinotColumnHandle> expectedColumnHandles, Map<String, Map<String, List<String>>> routingTable, String tableName, String tableNameSuffix, ConnectorSession session, String pinotQuery) {
        String finalTableName = tableName + tableNameSuffix;
        int segmentsPerSplitConfigured = PinotSessionProperties.getNumSegmentsPerSplit(session);
        for (String routingTableName : routingTable.keySet()) {
            if (!routingTableName.equalsIgnoreCase(finalTableName)) continue;
            Map<String, List<String>> hostToSegmentsMap = routingTable.get(routingTableName);
            hostToSegmentsMap.forEach((host, segments) -> {
                int numSegmentsInThisSplit = Math.min(segments.size(), segmentsPerSplitConfigured);
                Iterables.partition((Iterable)segments, (int)numSegmentsInThisSplit).forEach(segmentsForThisSplit -> splits.add(PinotSplit.createSegmentSplit(this.connectorId, pinotQuery, expectedColumnHandles, segmentsForThisSplit, host, this.getGrpcPort((String)host))));
            });
        }
    }

    private int getGrpcPort(String host) {
        return this.pinotPrestoConnection.getGrpcPort(host);
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, ConnectorSplitManager.SplitSchedulingContext splitSchedulingContext) {
        PinotTableLayoutHandle pinotLayoutHandle = (PinotTableLayoutHandle)layout;
        PinotTableHandle pinotTableHandle = pinotLayoutHandle.getTable();
        Supplier<PrestoException> errorSupplier = () -> new QueryNotAdequatelyPushedDownException(PinotErrorCode.PINOT_PUSH_DOWN_QUERY_NOT_PRESENT, pinotTableHandle, this.connectorId);
        if (!pinotTableHandle.getForBroker().orElseThrow(errorSupplier).booleanValue()) {
            if (PinotSessionProperties.isForbidSegmentQueries(session)) {
                throw errorSupplier.get();
            }
            return this.generateSplitsForSegmentBasedScan(pinotLayoutHandle, session, pinotTableHandle.getExpectedColumnHandles().orElseThrow(errorSupplier));
        }
        return this.generateSplitForBrokerBasedScan(pinotTableHandle.getPinotQuery().orElseThrow(errorSupplier), pinotTableHandle.getExpectedColumnHandles().orElseThrow(errorSupplier));
    }

    public static class QueryNotAdequatelyPushedDownException
    extends PinotException {
        private final String connectorId;
        private final ConnectorTableHandle connectorTableHandle;

        public QueryNotAdequatelyPushedDownException(PinotErrorCode errorCode, ConnectorTableHandle connectorTableHandle, String connectorId) {
            super(Objects.requireNonNull(errorCode, "error code is null"), Optional.empty(), "Query uses unsupported expressions that cannot be pushed into Pinot.");
            this.connectorId = Objects.requireNonNull(connectorId, "connector id is null");
            this.connectorTableHandle = Objects.requireNonNull(connectorTableHandle, "connector table handle is null");
        }

        @Override
        public String getMessage() {
            return super.getMessage() + String.format(" table: %s:%s", this.connectorId, this.connectorTableHandle);
        }
    }
}

