/*
 * Decompiled with CFR 0.152.
 */
package kafka.log.remote;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
import kafka.log.remote.RemoteLogManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Option;
import scala.collection.JavaConverters;

public class RemoteLogManagerTest {
    Time time = new MockTime();
    int brokerId = 0;
    String logDir = TestUtils.tempDirectory((String)"kafka-").toString();
    RemoteStorageManager remoteStorageManager = (RemoteStorageManager)Mockito.mock(RemoteStorageManager.class);
    RemoteLogMetadataManager remoteLogMetadataManager = (RemoteLogMetadataManager)Mockito.mock(RemoteLogMetadataManager.class);
    RemoteLogManagerConfig remoteLogManagerConfig = null;
    RemoteLogManager remoteLogManager = null;
    TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
    TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0));
    Map<String, Uuid> topicIds = new HashMap<String, Uuid>();
    TopicPartition tp = new TopicPartition("TestTopic", 5);
    EpochEntry epochEntry0 = new EpochEntry(0, 0L);
    EpochEntry epochEntry1 = new EpochEntry(1, 100L);
    EpochEntry epochEntry2 = new EpochEntry(2, 200L);
    List<EpochEntry> totalEpochEntries = Arrays.asList(this.epochEntry0, this.epochEntry1, this.epochEntry2);
    LeaderEpochCheckpoint checkpoint = new LeaderEpochCheckpoint(){
        List<EpochEntry> epochs = Collections.emptyList();

        public void write(Collection<EpochEntry> epochs) {
            this.epochs = new ArrayList<EpochEntry>(epochs);
        }

        public List<EpochEntry> read() {
            return this.epochs;
        }
    };
    UnifiedLog mockLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);

    @BeforeEach
    void setUp() throws Exception {
        this.topicIds.put(this.leaderTopicIdPartition.topicPartition().topic(), this.leaderTopicIdPartition.topicId());
        this.topicIds.put(this.followerTopicIdPartition.topicPartition().topic(), this.followerTopicIdPartition.topicId());
        Properties props = new Properties();
        this.remoteLogManagerConfig = this.createRLMConfig(props);
        this.remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, this.brokerId, this.logDir, this.time, tp -> Optional.of(this.mockLog)){

            public RemoteStorageManager createRemoteStorageManager() {
                return RemoteLogManagerTest.this.remoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };
    }

    @Test
    void testGetLeaderEpochCheckpoint() {
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.tp, this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        InMemoryLeaderEpochCheckpoint inMemoryCheckpoint = this.remoteLogManager.getLeaderEpochCheckpoint(this.mockLog, 0L, 300L);
        Assertions.assertEquals(this.totalEpochEntries, (Object)inMemoryCheckpoint.read());
        InMemoryLeaderEpochCheckpoint inMemoryCheckpoint2 = this.remoteLogManager.getLeaderEpochCheckpoint(this.mockLog, 100L, 200L);
        List epochEntries = inMemoryCheckpoint2.read();
        Assertions.assertEquals((int)1, (int)epochEntries.size());
        Assertions.assertEquals((Object)this.epochEntry1, epochEntries.get(0));
    }

    @Test
    void testFindHighestRemoteOffset() throws RemoteStorageException {
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.tp, this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), this.tp);
        long offset = this.remoteLogManager.findHighestRemoteOffset(tpId);
        Assertions.assertEquals((long)-1L, (long)offset);
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch(tpId, 2)).thenReturn(Optional.of(200L));
        long offset2 = this.remoteLogManager.findHighestRemoteOffset(tpId);
        Assertions.assertEquals((long)200L, (long)offset2);
    }

    @Test
    void testRemoteLogMetadataManagerWithUserDefinedConfigs() {
        String key = "key";
        String configPrefix = "config.prefix";
        Properties props = new Properties();
        props.put("remote.log.metadata.manager.impl.prefix", configPrefix);
        props.put(configPrefix + key, "world");
        props.put("remote.log.metadata.y", "z");
        Map metadataMangerConfig = this.createRLMConfig(props).remoteLogMetadataManagerProps();
        Assertions.assertEquals((Object)props.get(configPrefix + key), metadataMangerConfig.get(key));
        Assertions.assertFalse((boolean)metadataMangerConfig.containsKey("remote.log.metadata.y"));
    }

    @Test
    void testStartup() {
        this.remoteLogManager.startup();
        ArgumentCaptor capture = ArgumentCaptor.forClass(Map.class);
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.times((int)1))).configure((Map)capture.capture());
        Assertions.assertEquals((Object)this.brokerId, ((Map)capture.getValue()).get("broker.id"));
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).configure((Map)capture.capture());
        Assertions.assertEquals((Object)this.brokerId, ((Map)capture.getValue()).get("broker.id"));
        Assertions.assertEquals((Object)this.logDir, ((Map)capture.getValue()).get("log.dir"));
    }

    @Test
    void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception {
        long oldSegmentStartOffset = 0L;
        long nextSegmentStartOffset = 150L;
        long oldSegmentEndOffset = nextSegmentStartOffset - 1L;
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        File tempFile = TestUtils.tempFile();
        File mockProducerSnapshotIndex = TestUtils.tempFile();
        File tempDir = TestUtils.tempDirectory();
        LogSegment oldSegment = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)oldSegment.baseOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)nextSegmentStartOffset);
        FileRecords fileRecords = (FileRecords)Mockito.mock(FileRecords.class);
        Mockito.when((Object)oldSegment.log()).thenReturn((Object)fileRecords);
        Mockito.when((Object)fileRecords.file()).thenReturn((Object)tempFile);
        Mockito.when((Object)oldSegment.readNextOffset()).thenReturn((Object)nextSegmentStartOffset);
        Mockito.when((Object)this.mockLog.activeSegment()).thenReturn((Object)activeSegment);
        Mockito.when((Object)this.mockLog.logStartOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
        ProducerStateManager mockStateManager = (ProducerStateManager)Mockito.mock(ProducerStateManager.class);
        Mockito.when((Object)this.mockLog.producerStateManager()).thenReturn((Object)mockStateManager);
        Mockito.when((Object)mockStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
        Mockito.when((Object)this.mockLog.lastStableOffset()).thenReturn((Object)250L);
        LazyIndex idx = LazyIndex.forOffset((File)UnifiedLog.offsetIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1000);
        LazyIndex timeIdx = LazyIndex.forTime((File)UnifiedLog.timeIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1500);
        File txnFile = UnifiedLog.transactionIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)"");
        txnFile.createNewFile();
        TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
        Mockito.when((Object)oldSegment.lazyTimeIndex()).thenReturn((Object)timeIdx);
        Mockito.when((Object)oldSegment.lazyOffsetIndex()).thenReturn((Object)idx);
        Mockito.when((Object)oldSegment.txnIndex()).thenReturn((Object)txnIndex);
        CompletableFuture<Object> dummyFuture = new CompletableFuture<Object>();
        dummyFuture.complete(null);
        Mockito.when((Object)this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
        Mockito.when((Object)this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
        ((RemoteStorageManager)Mockito.doNothing().when((Object)this.remoteStorageManager)).copyLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData)ArgumentMatchers.any(LogSegmentData.class));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition);
        task.convertToLeader(2);
        task.copyLogSegmentsToRemote();
        ArgumentCaptor remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager)).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)remoteLogSegmentMetadataArg.capture());
        TreeMap<Integer, Long> expectedLeaderEpochs = new TreeMap<Integer, Long>();
        expectedLeaderEpochs.put(this.epochEntry0.epoch, this.epochEntry0.startOffset);
        expectedLeaderEpochs.put(this.epochEntry1.epoch, this.epochEntry1.startOffset);
        this.verifyRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)remoteLogSegmentMetadataArg.getValue(), oldSegmentStartOffset, oldSegmentEndOffset, expectedLeaderEpochs);
        ArgumentCaptor remoteLogSegmentMetadataArg2 = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ArgumentCaptor logSegmentDataArg = ArgumentCaptor.forClass(LogSegmentData.class);
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.times((int)1))).copyLogSegmentData((RemoteLogSegmentMetadata)remoteLogSegmentMetadataArg2.capture(), (LogSegmentData)logSegmentDataArg.capture());
        Assertions.assertEquals((Object)remoteLogSegmentMetadataArg.getValue(), (Object)remoteLogSegmentMetadataArg2.getValue());
        this.verifyLogSegmentData((LogSegmentData)logSegmentDataArg.getValue(), idx, timeIdx, txnIndex, tempFile, mockProducerSnapshotIndex, Arrays.asList(this.epochEntry0, this.epochEntry1));
        ArgumentCaptor remoteLogSegmentMetadataUpdateArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadataUpdate.class);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)remoteLogSegmentMetadataUpdateArg.capture());
        this.verifyRemoteLogSegmentMetadataUpdate((RemoteLogSegmentMetadataUpdate)remoteLogSegmentMetadataUpdateArg.getValue());
        ArgumentCaptor argument = ArgumentCaptor.forClass(Long.class);
        ((UnifiedLog)Mockito.verify((Object)this.mockLog, (VerificationMode)Mockito.times((int)1))).updateHighestOffsetInRemoteStorage(((Long)argument.capture()).longValue());
        Assertions.assertEquals((long)oldSegmentEndOffset, (Long)((Long)argument.getValue()));
    }

    @Test
    void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Exception {
        long oldSegmentStartOffset = 0L;
        long nextSegmentStartOffset = 150L;
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        LogSegment oldSegment = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)oldSegment.baseOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)nextSegmentStartOffset);
        Mockito.when((Object)this.mockLog.activeSegment()).thenReturn((Object)activeSegment);
        Mockito.when((Object)this.mockLog.logStartOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
        Mockito.when((Object)this.mockLog.lastStableOffset()).thenReturn((Object)250L);
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition);
        task.convertToFollower();
        task.copyLogSegmentsToRemote();
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.never())).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.never())).copyLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData)ArgumentMatchers.any(LogSegmentData.class));
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.never())).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class));
        ((UnifiedLog)Mockito.verify((Object)this.mockLog, (VerificationMode)Mockito.never())).updateHighestOffsetInRemoteStorage(ArgumentMatchers.anyLong());
    }

    private void verifyRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long oldSegmentStartOffset, long oldSegmentEndOffset, Map<Integer, Long> expectedLeaderEpochs) {
        Assertions.assertEquals((Object)this.leaderTopicIdPartition, (Object)remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition());
        Assertions.assertEquals((long)oldSegmentStartOffset, (long)remoteLogSegmentMetadata.startOffset());
        Assertions.assertEquals((long)oldSegmentEndOffset, (long)remoteLogSegmentMetadata.endOffset());
        NavigableMap leaderEpochs = remoteLogSegmentMetadata.segmentLeaderEpochs();
        Assertions.assertEquals((int)expectedLeaderEpochs.size(), (int)leaderEpochs.size());
        Iterator<Map.Entry<Integer, Long>> leaderEpochEntries = expectedLeaderEpochs.entrySet().iterator();
        Assertions.assertEquals(leaderEpochEntries.next(), leaderEpochs.firstEntry());
        Assertions.assertEquals(leaderEpochEntries.next(), leaderEpochs.lastEntry());
        Assertions.assertEquals((int)this.brokerId, (int)remoteLogSegmentMetadata.brokerId());
        Assertions.assertEquals((Object)RemoteLogSegmentState.COPY_SEGMENT_STARTED, (Object)remoteLogSegmentMetadata.state());
    }

    private void verifyRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) {
        Assertions.assertEquals((Object)this.leaderTopicIdPartition, (Object)remoteLogSegmentMetadataUpdate.remoteLogSegmentId().topicIdPartition());
        Assertions.assertEquals((int)this.brokerId, (int)remoteLogSegmentMetadataUpdate.brokerId());
        Assertions.assertEquals((Object)RemoteLogSegmentState.COPY_SEGMENT_FINISHED, (Object)remoteLogSegmentMetadataUpdate.state());
    }

    private void verifyLogSegmentData(LogSegmentData logSegmentData, LazyIndex idx, LazyIndex timeIdx, TransactionIndex txnIndex, File tempFile, File mockProducerSnapshotIndex, List<EpochEntry> expectedLeaderEpoch) throws IOException {
        Assertions.assertEquals((Object)idx.file().getAbsolutePath(), (Object)logSegmentData.offsetIndex().toAbsolutePath().toString());
        Assertions.assertEquals((Object)timeIdx.file().getAbsolutePath(), (Object)logSegmentData.timeIndex().toAbsolutePath().toString());
        Assertions.assertEquals((Object)txnIndex.file().getPath(), (Object)((Path)logSegmentData.transactionIndex().get()).toAbsolutePath().toString());
        Assertions.assertEquals((Object)tempFile.getAbsolutePath(), (Object)logSegmentData.logSegment().toAbsolutePath().toString());
        Assertions.assertEquals((Object)mockProducerSnapshotIndex.getAbsolutePath(), (Object)logSegmentData.producerSnapshotIndex().toAbsolutePath().toString());
        InMemoryLeaderEpochCheckpoint inMemoryLeaderEpochCheckpoint = new InMemoryLeaderEpochCheckpoint();
        inMemoryLeaderEpochCheckpoint.write(expectedLeaderEpoch);
        Assertions.assertEquals((Object)inMemoryLeaderEpochCheckpoint.readAsByteBuffer(), (Object)logSegmentData.leaderEpochIndex());
    }

    @Test
    void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
        final ClassLoaderAwareRemoteStorageManager rsmManager = (ClassLoaderAwareRemoteStorageManager)Mockito.mock(ClassLoaderAwareRemoteStorageManager.class);
        RemoteLogManager remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, this.brokerId, this.logDir, this.time, t -> Optional.empty()){

            public RemoteStorageManager createRemoteStorageManager() {
                return rsmManager;
            }
        };
        Assertions.assertEquals((Object)rsmManager, (Object)remoteLogManager.storageManager());
    }

    private void verifyInCache(TopicIdPartition ... topicIdPartitions) {
        Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> Assertions.assertDoesNotThrow(() -> this.remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L)));
    }

    private void verifyNotInCache(TopicIdPartition ... topicIdPartitions) {
        Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> Assertions.assertThrows(KafkaException.class, () -> this.remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L)));
    }

    @Test
    void testTopicIdCacheUpdates() throws RemoteStorageException {
        Partition mockLeaderPartition = this.mockPartition(this.leaderTopicIdPartition);
        Partition mockFollowerPartition = this.mockPartition(this.followerTopicIdPartition);
        Mockito.when((Object)this.remoteLogMetadataManager.remoteLogSegmentMetadata((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        this.verifyNotInCache(this.followerTopicIdPartition, this.leaderTopicIdPartition);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.singleton(mockFollowerPartition), this.topicIds);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).onPartitionLeadershipChanges(Collections.singleton(this.leaderTopicIdPartition), Collections.singleton(this.followerTopicIdPartition));
        this.verifyInCache(this.followerTopicIdPartition, this.leaderTopicIdPartition);
        this.remoteLogManager.stopPartitions(this.leaderTopicIdPartition.topicPartition(), true);
        this.verifyNotInCache(this.leaderTopicIdPartition);
        this.verifyInCache(this.followerTopicIdPartition);
        this.remoteLogManager.stopPartitions(this.followerTopicIdPartition.topicPartition(), true);
        this.verifyNotInCache(this.leaderTopicIdPartition, this.followerTopicIdPartition);
    }

    @Test
    void testFetchRemoteLogSegmentMetadata() throws RemoteStorageException {
        this.remoteLogManager.onLeadershipChange(Collections.singleton(this.mockPartition(this.leaderTopicIdPartition)), Collections.singleton(this.mockPartition(this.followerTopicIdPartition)), this.topicIds);
        this.remoteLogManager.fetchRemoteLogSegmentMetadata(this.leaderTopicIdPartition.topicPartition(), 10, 100L);
        this.remoteLogManager.fetchRemoteLogSegmentMetadata(this.followerTopicIdPartition.topicPartition(), 20, 200L);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager)).remoteLogSegmentMetadata((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager)).remoteLogSegmentMetadata((TopicIdPartition)ArgumentMatchers.eq((Object)this.followerTopicIdPartition), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() {
        RemoteLogManager spyRemoteLogManager = (RemoteLogManager)Mockito.spy((Object)this.remoteLogManager);
        spyRemoteLogManager.onLeadershipChange(Collections.emptySet(), Collections.singleton(this.mockPartition(this.followerTopicIdPartition)), this.topicIds);
        ((RemoteLogManager)Mockito.verify((Object)spyRemoteLogManager)).doHandleLeaderOrFollowerPartitions((TopicIdPartition)ArgumentMatchers.eq((Object)this.followerTopicIdPartition), (Consumer)ArgumentMatchers.any(Consumer.class));
        Mockito.reset((Object[])new RemoteLogManager[]{spyRemoteLogManager});
        spyRemoteLogManager.onLeadershipChange(Collections.singleton(this.mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
        ((RemoteLogManager)Mockito.verify((Object)spyRemoteLogManager)).doHandleLeaderOrFollowerPartitions((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), (Consumer)ArgumentMatchers.any(Consumer.class));
    }

    private MemoryRecords records(long timestamp, long initialOffset, int partitionLeaderEpoch) {
        return MemoryRecords.withRecords((long)initialOffset, (CompressionType)CompressionType.NONE, (Integer)partitionLeaderEpoch, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(timestamp - 1L, "first message".getBytes()), new SimpleRecord(timestamp + 1L, "second message".getBytes()), new SimpleRecord(timestamp + 2L, "third message".getBytes())});
    }

    @Test
    void testRLMTaskShouldSetLeaderEpochCorrectly() {
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition);
        Assertions.assertFalse((boolean)task.isLeader());
        task.convertToLeader(1);
        Assertions.assertTrue((boolean)task.isLeader());
        task.convertToFollower();
        Assertions.assertFalse((boolean)task.isLeader());
    }

    @Test
    void testFindOffsetByTimestamp() throws IOException, RemoteStorageException {
        TopicPartition tp = this.leaderTopicIdPartition.topicPartition();
        RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid());
        long ts = this.time.milliseconds();
        long startOffset = 120L;
        int targetLeaderEpoch = 10;
        RemoteLogSegmentMetadata segmentMetadata = (RemoteLogSegmentMetadata)Mockito.mock(RemoteLogSegmentMetadata.class);
        Mockito.when((Object)segmentMetadata.remoteLogSegmentId()).thenReturn((Object)remoteLogSegmentId);
        Mockito.when((Object)segmentMetadata.maxTimestampMs()).thenReturn((Object)(ts + 2L));
        Mockito.when((Object)segmentMetadata.startOffset()).thenReturn((Object)startOffset);
        Mockito.when((Object)segmentMetadata.endOffset()).thenReturn((Object)(startOffset + 2L));
        File tpDir = new File(this.logDir, tp.toString());
        Files.createDirectory(tpDir.toPath(), new FileAttribute[0]);
        File txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix());
        txnIdxFile.createNewFile();
        Mockito.when((Object)this.remoteStorageManager.fetchIndex((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (RemoteStorageManager.IndexType)ArgumentMatchers.any(RemoteStorageManager.IndexType.class))).thenAnswer(ans -> {
            RemoteLogSegmentMetadata metadata = (RemoteLogSegmentMetadata)ans.getArgument(0);
            RemoteStorageManager.IndexType indexType = (RemoteStorageManager.IndexType)ans.getArgument(1);
            int maxEntries = (int)(metadata.endOffset() - metadata.startOffset());
            OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix()), metadata.startOffset(), maxEntries * 8);
            TimeIndex timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix()), metadata.startOffset(), maxEntries * 12);
            switch (indexType) {
                case OFFSET: {
                    return new FileInputStream(offsetIdx.file());
                }
                case TIMESTAMP: {
                    return new FileInputStream(timeIdx.file());
                }
                case TRANSACTION: {
                    return new FileInputStream(txnIdxFile);
                }
            }
            return null;
        });
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(ans -> {
            int leaderEpoch = (Integer)ans.getArgument(1);
            if (leaderEpoch == targetLeaderEpoch) {
                return Collections.singleton(segmentMetadata).iterator();
            }
            return Collections.emptyList().iterator();
        });
        Mockito.when((Object)this.remoteStorageManager.fetchLogSegment(segmentMetadata, 0)).thenAnswer(a -> new ByteArrayInputStream(this.records(ts, startOffset, targetLeaderEpoch).buffer().array()));
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, this.checkpoint);
        leaderEpochFileCache.assign(5, 99L);
        leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
        leaderEpochFileCache.assign(12, 500L);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(this.mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
        Optional maybeTimestampAndOffset1 = this.remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.of(new FileRecords.TimestampAndOffset(ts + 1L, startOffset + 1L, Optional.of(targetLeaderEpoch))), (Object)maybeTimestampAndOffset1);
        Optional maybeTimestampAndOffset2 = this.remoteLogManager.findOffsetByTimestamp(tp, ts + 2L, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.of(new FileRecords.TimestampAndOffset(ts + 2L, startOffset + 2L, Optional.of(targetLeaderEpoch))), (Object)maybeTimestampAndOffset2);
        Optional maybeTimestampAndOffset3 = this.remoteLogManager.findOffsetByTimestamp(tp, ts + 3L, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.empty(), (Object)maybeTimestampAndOffset3);
    }

    @Test
    void testIdempotentClose() throws IOException {
        this.remoteLogManager.close();
        this.remoteLogManager.close();
        InOrder inorder = Mockito.inOrder((Object[])new Object[]{this.remoteStorageManager, this.remoteLogMetadataManager});
        ((RemoteStorageManager)inorder.verify((Object)this.remoteStorageManager, Mockito.times((int)1))).close();
        ((RemoteLogMetadataManager)inorder.verify((Object)this.remoteLogMetadataManager, Mockito.times((int)1))).close();
    }

    private Partition mockPartition(TopicIdPartition topicIdPartition) {
        TopicPartition tp = topicIdPartition.topicPartition();
        Partition partition = (Partition)Mockito.mock(Partition.class);
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        Mockito.when((Object)partition.topicPartition()).thenReturn((Object)tp);
        Mockito.when((Object)partition.topic()).thenReturn((Object)tp.topic());
        Mockito.when((Object)log.remoteLogEnabled()).thenReturn((Object)true);
        Mockito.when((Object)partition.log()).thenReturn((Object)Option.apply((Object)log));
        return partition;
    }

    private RemoteLogManagerConfig createRLMConfig(Properties props) {
        props.put("remote.log.storage.system.enable", (Object)true);
        props.put("remote.log.storage.manager.class.name", NoOpRemoteStorageManager.class.getName());
        props.put("remote.log.metadata.manager.class.name", NoOpRemoteLogMetadataManager.class.getName());
        AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, (Map)props);
        return new RemoteLogManagerConfig(config);
    }
}

