/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DirectDruidClientFactory;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.metadata.AbstractSegmentMetadataCache;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.FingerprintGenerator;
import org.apache.druid.segment.metadata.SegmentMetadataCacheConfig;
import org.apache.druid.segment.metadata.SegmentMetadataCacheTestBase;
import org.apache.druid.segment.metadata.SegmentSchemaBackFillQueue;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.metadata.TestSegmentMetadataQueryWalker;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

public class CoordinatorSegmentDataCacheConcurrencyTest
extends SegmentMetadataCacheTestBase {
    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(CentralizedDatasourceSchemaConfig.create((boolean)true));
    private static final String DATASOURCE = "datasource";
    static final SegmentMetadataCacheConfig SEGMENT_CACHE_CONFIG_DEFAULT = SegmentMetadataCacheConfig.create((String)"PT1S");
    private File tmpDir;
    private TestServerInventoryView inventoryView;
    private TestCoordinatorServerView serverView;
    private AbstractSegmentMetadataCache schema;
    private ExecutorService exec;
    private TestSegmentMetadataQueryWalker walker;
    private SegmentSchemaCache segmentSchemaCache;
    private SegmentSchemaBackFillQueue backFillQueue;
    private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
    private Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;
    private final ObjectMapper mapper = TestHelper.makeJsonMapper();

    @Before
    public void setUp() throws Exception {
        this.setUpData();
        this.setUpCommon();
        this.tmpDir = this.temporaryFolder.newFolder();
        this.inventoryView = new TestServerInventoryView();
        this.serverView = CoordinatorSegmentDataCacheConcurrencyTest.newCoordinatorServerView(this.inventoryView);
        this.walker = new TestSegmentMetadataQueryWalker(this.serverView, new DruidHttpClientConfig(){

            public long getMaxQueuedBytes() {
                return 0L;
            }
        }, new ServerConfig(), new NoopServiceEmitter(), this.conglomerate, new HashMap<SegmentDescriptor, Pair<QueryableIndex, DataSegment>>());
        this.segmentSchemaCache = new SegmentSchemaCache((ServiceEmitter)new NoopServiceEmitter());
        CentralizedDatasourceSchemaConfig config = CentralizedDatasourceSchemaConfig.create();
        config.setEnabled(true);
        config.setBackFillEnabled(false);
        config.setBackFillPeriod(1L);
        SegmentSchemaManager segmentSchemaManager = new SegmentSchemaManager((MetadataStorageTablesConfig)this.derbyConnectorRule.metadataTablesConfigSupplier().get(), this.mapper, (SQLMetadataConnector)this.derbyConnectorRule.getConnector());
        this.backFillQueue = new SegmentSchemaBackFillQueue(segmentSchemaManager, ScheduledExecutors::fixed, this.segmentSchemaCache, new FingerprintGenerator(this.mapper), (ServiceEmitter)new NoopServiceEmitter(), config);
        this.segmentSchemaCache.setInitialized();
        final CountDownLatch initLatch = new CountDownLatch(1);
        this.serverView.registerTimelineCallback(Execs.singleThreaded((String)"ServerViewInit-DruidSchemaConcurrencyTest-%d"), new TimelineServerView.TimelineCallback(){

            public ServerView.CallbackAction timelineInitialized() {
                initLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                return null;
            }

            public ServerView.CallbackAction segmentRemoved(DataSegment segment) {
                return null;
            }

            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment) {
                return null;
            }

            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                return ServerView.CallbackAction.CONTINUE;
            }
        });
        this.sqlSegmentsMetadataManager = (SqlSegmentsMetadataManager)Mockito.mock(SqlSegmentsMetadataManager.class);
        Mockito.when((Object)this.sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
        SegmentsMetadataManagerConfig metadataManagerConfig = (SegmentsMetadataManagerConfig)Mockito.mock(SegmentsMetadataManagerConfig.class);
        Mockito.when((Object)metadataManagerConfig.getPollDuration()).thenReturn((Object)Period.millis((int)1000));
        this.segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance((Object)metadataManagerConfig);
        this.inventoryView.init();
        initLatch.await();
        this.exec = Execs.multiThreaded((int)4, (String)"DruidSchemaConcurrencyTest-%d");
    }

    @Override
    @After
    public void tearDown() throws Exception {
        super.tearDown();
        this.exec.shutdownNow();
    }

    @Test(timeout=30000L)
    public void testSegmentMetadataRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline() throws InterruptedException, ExecutionException, TimeoutException {
        this.schema = new CoordinatorSegmentMetadataCache(this.getQueryLifecycleFactory((QuerySegmentWalker)this.walker), this.serverView, SEGMENT_CACHE_CONFIG_DEFAULT, (Escalator)new NoopEscalator(), new InternalQueryConfig(), new NoopServiceEmitter(), this.segmentSchemaCache, this.backFillQueue, this.sqlSegmentsMetadataManager, this.segmentsMetadataManagerConfigSupplier){

            public RowSignature buildDataSourceRowSignature(String dataSource) {
                this.doInLock(() -> {
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
                return super.buildDataSourceRowSignature(dataSource);
            }
        };
        int numExistingSegments = 100;
        int numServers = 19;
        final CountDownLatch segmentLoadLatch = new CountDownLatch(numExistingSegments);
        this.serverView.registerTimelineCallback((Executor)Execs.directExecutor(), new TimelineServerView.TimelineCallback(){

            public ServerView.CallbackAction timelineInitialized() {
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                segmentLoadLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentRemoved(DataSegment segment) {
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment) {
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                return ServerView.CallbackAction.CONTINUE;
            }
        });
        this.addSegmentsToCluster(0, numServers, numExistingSegments);
        Future<Object> refreshFuture = this.exec.submit(() -> {
            this.schema.refresh(this.walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()), (Set)Sets.newHashSet((Object[])new String[]{DATASOURCE}));
            return null;
        });
        this.addSegmentsToCluster(numExistingSegments, numServers, 50);
        this.addReplicasToCluster(1, numServers, 30);
        this.removeSegmentsFromCluster(numServers, 50);
        Assert.assertFalse((boolean)refreshFuture.isDone());
        for (int i = 0; i < 1000; ++i) {
            boolean hasTimeline = this.exec.submit(() -> this.serverView.getTimeline((DataSource)new TableDataSource(DATASOURCE)) != null).get(100L, TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)hasTimeline);
            Thread.sleep(2L);
        }
        refreshFuture.get(10L, TimeUnit.SECONDS);
    }

    @Test(timeout=30000L)
    public void testSegmentMetadataRefreshAndDruidSchemaGetSegmentMetadata() throws InterruptedException, ExecutionException, TimeoutException {
        this.schema = new CoordinatorSegmentMetadataCache(this.getQueryLifecycleFactory((QuerySegmentWalker)this.walker), this.serverView, SEGMENT_CACHE_CONFIG_DEFAULT, (Escalator)new NoopEscalator(), new InternalQueryConfig(), new NoopServiceEmitter(), this.segmentSchemaCache, this.backFillQueue, this.sqlSegmentsMetadataManager, this.segmentsMetadataManagerConfigSupplier){

            public RowSignature buildDataSourceRowSignature(String dataSource) {
                this.doInLock(() -> {
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
                return super.buildDataSourceRowSignature(dataSource);
            }
        };
        int numExistingSegments = 100;
        int numServers = 19;
        final CountDownLatch segmentLoadLatch = new CountDownLatch(numExistingSegments);
        this.serverView.registerTimelineCallback((Executor)Execs.directExecutor(), new TimelineServerView.TimelineCallback(){

            public ServerView.CallbackAction timelineInitialized() {
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                segmentLoadLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentRemoved(DataSegment segment) {
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment) {
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                return ServerView.CallbackAction.CONTINUE;
            }
        });
        this.addSegmentsToCluster(0, numServers, numExistingSegments);
        Assert.assertTrue((boolean)segmentLoadLatch.await(5L, TimeUnit.SECONDS));
        Future<Object> refreshFuture = this.exec.submit(() -> {
            this.schema.refresh(this.walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()), (Set)Sets.newHashSet((Object[])new String[]{DATASOURCE}));
            return null;
        });
        Assert.assertFalse((boolean)refreshFuture.isDone());
        for (int i = 0; i < 1000; ++i) {
            Map segmentsMetadata = this.exec.submit(() -> this.schema.getSegmentMetadataSnapshot()).get(100L, TimeUnit.MILLISECONDS);
            Assert.assertFalse((boolean)segmentsMetadata.isEmpty());
            Thread.sleep(2L);
        }
        refreshFuture.get(10L, TimeUnit.SECONDS);
    }

    private void addSegmentsToCluster(int partitionIdStart, int numServers, int numSegments) {
        for (int i = 0; i < numSegments; ++i) {
            DataSegment segment = this.newSegment(i + partitionIdStart);
            QueryableIndex index = this.newQueryableIndex(i + partitionIdStart);
            int serverIndex = i % numServers;
            this.inventoryView.addServerSegment(CoordinatorSegmentDataCacheConcurrencyTest.newServer("server_" + serverIndex), segment);
            this.walker.add(segment, index);
        }
    }

    private void addReplicasToCluster(int serverIndexOffFrom, int numServers, int numSegments) {
        for (int i = 0; i < numSegments; ++i) {
            DataSegment segment = this.newSegment(i);
            int serverIndex = i % numServers + serverIndexOffFrom;
            serverIndex = serverIndex < numServers ? serverIndex : serverIndex - numServers;
            this.inventoryView.addServerSegment(CoordinatorSegmentDataCacheConcurrencyTest.newServer("server_" + serverIndex), segment);
        }
    }

    private void removeSegmentsFromCluster(int numServers, int numSegments) {
        for (int i = 0; i < numSegments; ++i) {
            DataSegment segment = this.newSegment(i);
            int serverIndex = i % numServers;
            this.inventoryView.removeServerSegment(CoordinatorSegmentDataCacheConcurrencyTest.newServer("server_" + serverIndex), segment);
        }
    }

    private static TestCoordinatorServerView newCoordinatorServerView(ServerInventoryView baseView) {
        return new TestCoordinatorServerView(baseView, (CoordinatorSegmentWatcherConfig)EasyMock.createMock(CoordinatorSegmentWatcherConfig.class), new NoopServiceEmitter(), null);
    }

    private static DruidServer newServer(String name) {
        return new DruidServer(name, "host:8083", "host:8283", 1000L, ServerType.HISTORICAL, "tier", 0);
    }

    private DataSegment newSegment(int partitionId) {
        return new DataSegment(DATASOURCE, Intervals.of((String)"2012/2013"), "version1", null, (List)ImmutableList.of(), (List)ImmutableList.of(), (ShardSpec)new NumberedShardSpec(partitionId, 0), null, Integer.valueOf(1), 100L, DataSegment.PruneSpecsHolder.DEFAULT);
    }

    private QueryableIndex newQueryableIndex(int partitionId) {
        return IndexBuilder.create().tmpDir(new File(this.tmpDir, "" + partitionId)).segmentWriteOutMediumFactory((SegmentWriteOutMediumFactory)OffHeapMemorySegmentWriteOutMediumFactory.instance()).schema(new IncrementalIndexSchema.Builder().withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("cnt"), new DoubleSumAggregatorFactory("m1", "m1")}).withRollup(false).build()).rows((Iterable)this.ROWS1).buildMMappedIndex();
    }

    private static class TestServerInventoryView
    implements ServerInventoryView {
        private final Map<String, DruidServer> serverMap = new HashMap<String, DruidServer>();
        private final Map<String, Set<DataSegment>> segmentsMap = new HashMap<String, Set<DataSegment>>();
        private final List<NonnullPair<ServerView.SegmentCallback, Executor>> segmentCallbacks = new ArrayList<NonnullPair<ServerView.SegmentCallback, Executor>>();
        private final List<NonnullPair<ServerView.ServerRemovedCallback, Executor>> serverRemovedCallbacks = new ArrayList<NonnullPair<ServerView.ServerRemovedCallback, Executor>>();

        private TestServerInventoryView() {
        }

        private void init() {
            this.segmentCallbacks.forEach(pair -> ((Executor)pair.rhs).execute(() -> ((ServerView.SegmentCallback)((ServerView.SegmentCallback)pair.lhs)).segmentViewInitialized()));
        }

        private void addServerSegment(DruidServer server, DataSegment segment) {
            this.serverMap.put(server.getName(), server);
            this.segmentsMap.computeIfAbsent(server.getName(), k -> new HashSet()).add(segment);
            this.segmentCallbacks.forEach(pair -> ((Executor)pair.rhs).execute(() -> ((ServerView.SegmentCallback)pair.lhs).segmentAdded(server.getMetadata(), segment)));
        }

        private void removeServerSegment(DruidServer server, DataSegment segment) {
            this.segmentsMap.computeIfAbsent(server.getName(), k -> new HashSet()).remove(segment);
            this.segmentCallbacks.forEach(pair -> ((Executor)pair.rhs).execute(() -> ((ServerView.SegmentCallback)pair.lhs).segmentRemoved(server.getMetadata(), segment)));
        }

        private void removeServer(DruidServer server) {
            this.serverMap.remove(server.getName());
            this.segmentsMap.remove(server.getName());
            this.serverRemovedCallbacks.forEach(pair -> ((Executor)pair.rhs).execute(() -> ((ServerView.ServerRemovedCallback)pair.lhs).serverRemoved(server)));
        }

        public void registerSegmentCallback(Executor exec, ServerView.SegmentCallback callback) {
            this.segmentCallbacks.add((NonnullPair<ServerView.SegmentCallback, Executor>)new NonnullPair((Object)callback, (Object)exec));
        }

        public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback) {
            this.serverRemovedCallbacks.add((NonnullPair<ServerView.ServerRemovedCallback, Executor>)new NonnullPair((Object)callback, (Object)exec));
        }

        @Nullable
        public DruidServer getInventoryValue(String serverKey) {
            return this.serverMap.get(serverKey);
        }

        public Collection<DruidServer> getInventory() {
            return this.serverMap.values();
        }

        public boolean isStarted() {
            return true;
        }

        public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) {
            Set<DataSegment> segments = this.segmentsMap.get(serverKey);
            return segments != null && segments.contains(segment);
        }
    }

    private static class TestCoordinatorServerView
    extends CoordinatorServerView {
        public TestCoordinatorServerView(ServerInventoryView baseView, CoordinatorSegmentWatcherConfig segmentWatcherConfig, ServiceEmitter emitter, @Nullable DirectDruidClientFactory druidClientFactory) {
            super(baseView, segmentWatcherConfig, emitter, druidClientFactory);
        }

        public QueryRunner getQueryRunner(String serverName) {
            return (QueryRunner)EasyMock.mock(QueryRunner.class);
        }
    }
}

