/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.storage.internals.log;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.util.LockUtils;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import org.apache.kafka.storage.internals.log.LogCleaningAbortedException;
import org.apache.kafka.storage.internals.log.LogCleaningException;
import org.apache.kafka.storage.internals.log.LogCleaningState;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.LogToClean;
import org.apache.kafka.storage.internals.log.PreCleanStats;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogCleanerManager {
    public static final String OFFSET_CHECKPOINT_FILE = "cleaner-offset-checkpoint";
    private static final Logger LOG = LoggerFactory.getLogger((String)"kafka.log.LogCleaner");
    private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = "uncleanable-partitions-count";
    private static final String UNCLEANABLE_BYTES_METRIC_NAME = "uncleanable-bytes";
    private static final String MAX_DIRTY_PERCENT_METRIC_NAME = "max-dirty-percent";
    private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = "time-since-last-run-ms";
    public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = Set.of("max-dirty-percent", "time-since-last-run-ms");
    private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log", "LogCleanerManager");
    private final Map<TopicPartition, LogCleaningState> inProgress = new HashMap<TopicPartition, LogCleaningState>();
    private final Map<String, Set<TopicPartition>> uncleanablePartitions = new HashMap<String, Set<TopicPartition>>();
    private final Lock lock = new ReentrantLock();
    private final Condition pausedCleaningCond = this.lock.newCondition();
    private final Map<String, List<Map<String, String>>> gaugeMetricNameWithTag = new HashMap<String, List<Map<String, String>>>();
    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
    private volatile Map<File, OffsetCheckpointFile> checkpoints;
    private volatile double dirtiestLogCleanableRatio;
    private volatile long timeOfLastRun;

    public LogCleanerManager(List<File> logDirs, ConcurrentMap<TopicPartition, UnifiedLog> logs, LogDirFailureChannel logDirFailureChannel) {
        this.logs = logs;
        this.checkpoints = logDirs.stream().collect(Collectors.toMap(dir -> dir, dir -> {
            try {
                return new OffsetCheckpointFile(new File((File)dir, OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }));
        this.registerMetrics(logDirs);
    }

    private void registerMetrics(List<File> logDirs) {
        Map<String, String> metricTag;
        for (File dir : logDirs) {
            metricTag = Map.of("logDirectory", dir.getAbsolutePath());
            this.metricsGroup.newGauge(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, () -> (Integer)LockUtils.inLock((Lock)this.lock, () -> this.uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()), metricTag);
            this.gaugeMetricNameWithTag.computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, k -> new ArrayList()).add(metricTag);
        }
        for (File dir : logDirs) {
            metricTag = Map.of("logDirectory", dir.getAbsolutePath());
            this.metricsGroup.newGauge(UNCLEANABLE_BYTES_METRIC_NAME, () -> (Number)LockUtils.inLock((Lock)this.lock, () -> {
                Set<TopicPartition> partitions = this.uncleanablePartitions.get(dir.getAbsolutePath());
                if (partitions == null) {
                    return 0;
                }
                Map<TopicPartition, Long> lastClean = this.allCleanerCheckpoints();
                long now = Time.SYSTEM.milliseconds();
                return partitions.stream().mapToLong(tp -> {
                    UnifiedLog log = (UnifiedLog)this.logs.get(tp);
                    if (log != null) {
                        Optional<Long> lastCleanOffset = Optional.of((Long)lastClean.get(tp));
                        try {
                            OffsetsToClean offsetsToClean = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, now);
                            return LogCleanerManager.calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(), offsetsToClean.firstUncleanableDirtyOffset()).getValue();
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    return 0L;
                }).sum();
            }), metricTag);
            this.gaugeMetricNameWithTag.computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new ArrayList()).add(metricTag);
        }
        this.dirtiestLogCleanableRatio = 0.0;
        this.metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int)(100.0 * this.dirtiestLogCleanableRatio));
        this.timeOfLastRun = Time.SYSTEM.milliseconds();
        this.metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> Time.SYSTEM.milliseconds() - this.timeOfLastRun);
    }

    public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
        return this.gaugeMetricNameWithTag;
    }

    public Map<TopicPartition, Long> allCleanerCheckpoints() {
        return (Map)LockUtils.inLock((Lock)this.lock, () -> this.checkpoints.values().stream().flatMap(checkpoint -> {
            try {
                return checkpoint.read().entrySet().stream();
            }
            catch (KafkaStorageException e) {
                LOG.error("Failed to access checkpoint file {} in dir {}", new Object[]{checkpoint.file().getName(), checkpoint.file().getParentFile().getAbsolutePath(), e});
                return Stream.empty();
            }
        }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
    }

    public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
        return (Optional)LockUtils.inLock((Lock)this.lock, () -> Optional.ofNullable(this.inProgress.get(tp)));
    }

    public void setCleaningState(TopicPartition tp, LogCleaningState state) {
        LockUtils.inLock((Lock)this.lock, () -> this.inProgress.put(tp, state));
    }

    public Optional<LogToClean> grabFilthiestCompactedLog(Time time, PreCleanStats preCleanStats) {
        return (Optional)LockUtils.inLock((Lock)this.lock, () -> {
            long now;
            this.timeOfLastRun = now = time.milliseconds();
            Map<TopicPartition, Long> lastClean = this.allCleanerCheckpoints();
            List<LogToClean> dirtyLogs = this.logs.entrySet().stream().filter(entry -> ((UnifiedLog)entry.getValue()).config().compact && !this.inProgress.containsKey(entry.getKey()) && !this.isUncleanablePartition((UnifiedLog)entry.getValue(), (TopicPartition)entry.getKey())).map(entry -> {
                TopicPartition topicPartition = (TopicPartition)entry.getKey();
                UnifiedLog log = (UnifiedLog)entry.getValue();
                try {
                    Long lastCleanOffset = (Long)lastClean.get(topicPartition);
                    OffsetsToClean offsetsToClean = LogCleanerManager.cleanableOffsets(log, Optional.ofNullable(lastCleanOffset), now);
                    if (offsetsToClean.forceUpdateCheckpoint) {
                        this.updateCheckpoints(log.parentDirFile(), Optional.of(Map.entry(topicPartition, offsetsToClean.firstDirtyOffset)), Optional.empty());
                    }
                    long compactionDelayMs = LogCleanerManager.maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now);
                    preCleanStats.updateMaxCompactionDelay(compactionDelayMs);
                    return new LogToClean(log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset, compactionDelayMs > 0L);
                }
                catch (Throwable e) {
                    throw new LogCleaningException(log, "Failed to calculate log cleaning stats for partition " + String.valueOf(topicPartition), e);
                }
            }).filter(ltc -> ltc.totalBytes() > 0L).toList();
            this.dirtiestLogCleanableRatio = dirtyLogs.isEmpty() ? 0.0 : dirtyLogs.stream().mapToDouble(LogToClean::cleanableRatio).max().orElse(0.0);
            List<LogToClean> cleanableLogs = dirtyLogs.stream().filter(ltc -> ltc.needCompactionNow() && ltc.cleanableBytes() > 0L || ltc.cleanableRatio() > ltc.log().config().minCleanableRatio).toList();
            if (cleanableLogs.isEmpty()) {
                return Optional.empty();
            }
            preCleanStats.recordCleanablePartitions(cleanableLogs.size());
            LogToClean filthiest = cleanableLogs.stream().max(Comparator.comparingDouble(LogToClean::cleanableRatio)).orElseThrow(() -> new IllegalStateException("No filthiest log found"));
            this.inProgress.put(filthiest.topicPartition(), LogCleaningState.LOG_CLEANING_IN_PROGRESS);
            return Optional.of(filthiest);
        });
    }

    public List<Map.Entry<TopicPartition, UnifiedLog>> pauseCleaningForNonCompactedPartitions() {
        return (List)LockUtils.inLock((Lock)this.lock, () -> {
            List<Map.Entry> deletableLogs = this.logs.entrySet().stream().filter(entry -> !((UnifiedLog)entry.getValue()).config().compact).filter(entry -> !this.inProgress.containsKey(entry.getKey())).collect(Collectors.toList());
            deletableLogs.forEach(entry -> this.inProgress.put((TopicPartition)entry.getKey(), LogCleaningState.logCleaningPaused(1)));
            return deletableLogs;
        });
    }

    public Map<TopicPartition, UnifiedLog> deletableLogs() {
        return (Map)LockUtils.inLock((Lock)this.lock, () -> {
            Map<TopicPartition, UnifiedLog> toClean = this.logs.entrySet().stream().filter(entry -> {
                TopicPartition topicPartition = (TopicPartition)entry.getKey();
                UnifiedLog log = (UnifiedLog)entry.getValue();
                return !this.inProgress.containsKey(topicPartition) && log.config().compact && !this.isUncleanablePartition(log, topicPartition);
            }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            toClean.forEach((partition, log) -> this.inProgress.put((TopicPartition)partition, LogCleaningState.LOG_CLEANING_IN_PROGRESS));
            return toClean;
        });
    }

    public void abortCleaning(TopicPartition topicPartition) {
        LockUtils.inLock((Lock)this.lock, () -> {
            this.abortAndPauseCleaning(topicPartition);
            this.resumeCleaning(Set.of(topicPartition));
            return null;
        });
    }

    public void abortAndPauseCleaning(TopicPartition topicPartition) {
        LockUtils.inLock((Lock)this.lock, () -> {
            LogCleaningState state = this.inProgress.get(topicPartition);
            if (state == null) {
                this.inProgress.put(topicPartition, LogCleaningState.logCleaningPaused(1));
            } else if (state == LogCleaningState.LOG_CLEANING_IN_PROGRESS) {
                this.inProgress.put(topicPartition, LogCleaningState.LOG_CLEANING_ABORTED);
            } else if (state instanceof LogCleaningState.LogCleaningPaused) {
                LogCleaningState.LogCleaningPaused logCleaningPaused = (LogCleaningState.LogCleaningPaused)state;
                this.inProgress.put(topicPartition, LogCleaningState.logCleaningPaused(logCleaningPaused.pausedCount() + 1));
            } else {
                throw new IllegalStateException("Compaction for partition " + String.valueOf(topicPartition) + " cannot be aborted and paused since it is in " + String.valueOf(state) + " state.");
            }
            while (!this.isCleaningInStatePaused(topicPartition)) {
                try {
                    this.pausedCleaningCond.await(100L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            return null;
        });
    }

    public void resumeCleaning(Set<TopicPartition> topicPartitions) {
        LockUtils.inLock((Lock)this.lock, () -> {
            topicPartitions.forEach(topicPartition -> {
                LogCleaningState state = this.inProgress.get(topicPartition);
                if (state == null) {
                    throw new IllegalStateException("Compaction for partition " + String.valueOf(topicPartition) + " cannot be resumed since it is not paused.");
                }
                if (state instanceof LogCleaningState.LogCleaningPaused) {
                    LogCleaningState.LogCleaningPaused logCleaningPaused = (LogCleaningState.LogCleaningPaused)state;
                    if (logCleaningPaused.pausedCount() == 1) {
                        this.inProgress.remove(topicPartition);
                    } else if (logCleaningPaused.pausedCount() > 1) {
                        this.inProgress.put((TopicPartition)topicPartition, LogCleaningState.logCleaningPaused(logCleaningPaused.pausedCount() - 1));
                    }
                } else {
                    throw new IllegalStateException("Compaction for partition " + String.valueOf(topicPartition) + " cannot be resumed since it is in " + String.valueOf(state) + " state.");
                }
            });
            return null;
        });
    }

    private boolean isCleaningInState(TopicPartition topicPartition, LogCleaningState expectedState) {
        LogCleaningState state = this.inProgress.get(topicPartition);
        if (state == null) {
            return false;
        }
        return state == expectedState;
    }

    private boolean isCleaningInStatePaused(TopicPartition topicPartition) {
        LogCleaningState state = this.inProgress.get(topicPartition);
        if (state == null) {
            return false;
        }
        return state instanceof LogCleaningState.LogCleaningPaused;
    }

    public void checkCleaningAborted(TopicPartition topicPartition) {
        LockUtils.inLock((Lock)this.lock, () -> {
            if (this.isCleaningInState(topicPartition, LogCleaningState.LOG_CLEANING_ABORTED)) {
                throw new LogCleaningAbortedException();
            }
            return null;
        });
    }

    public void updateCheckpoints(File dataDir, Optional<Map.Entry<TopicPartition, Long>> partitionToUpdateOrAdd, Optional<TopicPartition> partitionToRemove) {
        LockUtils.inLock((Lock)this.lock, () -> {
            OffsetCheckpointFile checkpoint = this.checkpoints.get(dataDir);
            if (checkpoint != null) {
                try {
                    Map<TopicPartition, Long> currentCheckpoint = checkpoint.read().entrySet().stream().filter(entry -> this.logs.containsKey(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                    HashMap<TopicPartition, Long> updatedCheckpoint = new HashMap<TopicPartition, Long>(currentCheckpoint);
                    partitionToRemove.ifPresent(updatedCheckpoint::remove);
                    partitionToUpdateOrAdd.ifPresent(entry -> updatedCheckpoint.put((TopicPartition)entry.getKey(), (Long)entry.getValue()));
                    checkpoint.write(updatedCheckpoint);
                }
                catch (KafkaStorageException e) {
                    LOG.error("Failed to access checkpoint file {} in dir {}", new Object[]{checkpoint.file().getName(), checkpoint.file().getParentFile().getAbsolutePath(), e});
                }
            }
            return null;
        });
    }

    public void alterCheckpointDir(TopicPartition topicPartition, File sourceLogDir, File destLogDir) {
        LockUtils.inLock((Lock)this.lock, () -> {
            try {
                Optional offsetOpt = Optional.ofNullable(this.checkpoints.get(sourceLogDir)).flatMap(checkpoint -> Optional.ofNullable(checkpoint.read().get(topicPartition)));
                offsetOpt.ifPresent(offset -> {
                    LOG.debug("Removing the partition offset data in checkpoint file for '{}' from {} directory.", (Object)topicPartition, (Object)sourceLogDir.getAbsoluteFile());
                    this.updateCheckpoints(sourceLogDir, Optional.empty(), Optional.of(topicPartition));
                    LOG.debug("Adding the partition offset data in checkpoint file for '{}' to {} directory.", (Object)topicPartition, (Object)destLogDir.getAbsoluteFile());
                    this.updateCheckpoints(destLogDir, Optional.of(Map.entry(topicPartition, offset)), Optional.empty());
                });
            }
            catch (KafkaStorageException e) {
                LOG.error("Failed to access checkpoint file in dir {}", (Object)sourceLogDir.getAbsolutePath(), (Object)e);
            }
            Set logUncleanablePartitions = this.uncleanablePartitions.getOrDefault(sourceLogDir.toString(), Collections.emptySet());
            if (logUncleanablePartitions.contains(topicPartition)) {
                logUncleanablePartitions.remove(topicPartition);
                this.markPartitionUncleanable(destLogDir.toString(), topicPartition);
            }
            return null;
        });
    }

    public void handleLogDirFailure(String dir) {
        LOG.warn("Stopping cleaning logs in dir {}", (Object)dir);
        LockUtils.inLock((Lock)this.lock, () -> {
            this.checkpoints = this.checkpoints.entrySet().stream().filter(entry -> !((File)entry.getKey()).getAbsolutePath().equals(dir)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            return null;
        });
    }

    public void maybeTruncateCheckpoint(File dataDir, TopicPartition topicPartition, long offset) {
        LockUtils.inLock((Lock)this.lock, () -> {
            Map<TopicPartition, Long> existing;
            OffsetCheckpointFile checkpoint;
            if (((UnifiedLog)this.logs.get((Object)topicPartition)).config().compact && (checkpoint = this.checkpoints.get(dataDir)) != null && (existing = checkpoint.read()).getOrDefault(topicPartition, 0L) > offset) {
                existing.put(topicPartition, offset);
                checkpoint.write(existing);
            }
            return null;
        });
    }

    public void doneCleaning(TopicPartition topicPartition, File dataDir, long endOffset) {
        LockUtils.inLock((Lock)this.lock, () -> {
            LogCleaningState state = this.inProgress.get(topicPartition);
            if (state == null) {
                throw new IllegalStateException("State for partition " + String.valueOf(topicPartition) + " should exist.");
            }
            if (state == LogCleaningState.LOG_CLEANING_IN_PROGRESS) {
                this.updateCheckpoints(dataDir, Optional.of(Map.entry(topicPartition, endOffset)), Optional.empty());
                this.inProgress.remove(topicPartition);
            } else if (state == LogCleaningState.LOG_CLEANING_ABORTED) {
                this.inProgress.put(topicPartition, LogCleaningState.logCleaningPaused(1));
                this.pausedCleaningCond.signalAll();
            } else {
                throw new IllegalStateException("In-progress partition " + String.valueOf(topicPartition) + " cannot be in " + String.valueOf(state) + " state.");
            }
            return null;
        });
    }

    public void doneDeleting(List<TopicPartition> topicPartitions) {
        LockUtils.inLock((Lock)this.lock, () -> {
            topicPartitions.forEach(topicPartition -> {
                LogCleaningState logCleaningState = this.inProgress.get(topicPartition);
                if (logCleaningState == null) {
                    throw new IllegalStateException("State for partition " + String.valueOf(topicPartition) + " should exist.");
                }
                if (logCleaningState == LogCleaningState.LOG_CLEANING_IN_PROGRESS) {
                    this.inProgress.remove(topicPartition);
                } else if (logCleaningState == LogCleaningState.LOG_CLEANING_ABORTED) {
                    this.inProgress.put((TopicPartition)topicPartition, LogCleaningState.logCleaningPaused(1));
                    this.pausedCleaningCond.signalAll();
                } else {
                    throw new IllegalStateException("In-progress partition " + String.valueOf(topicPartition) + " cannot be in " + String.valueOf(logCleaningState) + " state.");
                }
            });
            return null;
        });
    }

    public Set<TopicPartition> uncleanablePartitions(String logDir) {
        return (Set)LockUtils.inLock((Lock)this.lock, () -> {
            Set<TopicPartition> partitions = this.uncleanablePartitions.get(logDir);
            return partitions != null ? Set.copyOf(partitions) : Set.of();
        });
    }

    public void markPartitionUncleanable(String logDir, TopicPartition partition) {
        LockUtils.inLock((Lock)this.lock, () -> {
            Set partitions = this.uncleanablePartitions.computeIfAbsent(logDir, dir -> new HashSet());
            partitions.add(partition);
            return null;
        });
    }

    private boolean isUncleanablePartition(UnifiedLog log, TopicPartition topicPartition) {
        return (Boolean)LockUtils.inLock((Lock)this.lock, () -> Optional.ofNullable(this.uncleanablePartitions.get(log.parentDir())).map(partitions -> partitions.contains(topicPartition)).orElse(false));
    }

    public void maintainUncleanablePartitions() {
        LockUtils.inLock((Lock)this.lock, () -> {
            this.uncleanablePartitions.values().forEach(partitions -> partitions.removeIf(partition -> !this.logs.containsKey(partition)));
            this.uncleanablePartitions.entrySet().removeIf(entry -> ((Set)entry.getValue()).isEmpty());
            return null;
        });
    }

    public void removeMetrics() {
        GAUGE_METRIC_NAME_NO_TAG.forEach(arg_0 -> ((KafkaMetricsGroup)this.metricsGroup).removeMetric(arg_0));
        this.gaugeMetricNameWithTag.forEach((metricName, tags) -> tags.forEach(tag -> this.metricsGroup.removeMetric(metricName, tag)));
        this.gaugeMetricNameWithTag.clear();
    }

    private static boolean isCompactAndDelete(UnifiedLog log) {
        return log.config().compact && log.config().delete;
    }

    private static long maxCompactionDelay(UnifiedLog log, long firstDirtyOffset, long now) {
        long maxCompactionLagMs;
        long cleanUntilTime;
        List<LogSegment> dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset);
        Stream<Long> firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).stream().filter(timestamp -> timestamp > 0L);
        long earliestDirtySegmentTimestamp = firstBatchTimestamps.min(Comparator.naturalOrder()).orElse(Long.MAX_VALUE);
        return earliestDirtySegmentTimestamp < (cleanUntilTime = now - (maxCompactionLagMs = Math.max(log.config().maxCompactionLagMs, 0L))) ? cleanUntilTime - earliestDirtySegmentTimestamp : 0L;
    }

    public static OffsetsToClean cleanableOffsets(UnifiedLog log, Optional<Long> lastCleanOffset, long now) throws IOException {
        boolean forceUpdateCheckpoint;
        long firstDirtyOffset;
        long logStartOffset = log.logStartOffset();
        long checkpointDirtyOffset = lastCleanOffset.orElse(logStartOffset);
        if (checkpointDirtyOffset < logStartOffset) {
            if (!LogCleanerManager.isCompactAndDelete(log)) {
                LOG.warn("Resetting first dirty offset of {} to log start offset {} since the checkpointed offset {} is invalid.", new Object[]{log.name(), logStartOffset, checkpointDirtyOffset});
            }
            firstDirtyOffset = logStartOffset;
            forceUpdateCheckpoint = true;
        } else if (checkpointDirtyOffset > log.logEndOffset()) {
            LOG.warn("The last checkpoint dirty offset for partition {} is {}, which is larger than the log end offset {}. Resetting to the log start offset {}.", new Object[]{log.name(), checkpointDirtyOffset, log.logEndOffset(), logStartOffset});
            firstDirtyOffset = logStartOffset;
            forceUpdateCheckpoint = true;
        } else {
            firstDirtyOffset = checkpointDirtyOffset;
            forceUpdateCheckpoint = false;
        }
        long minCompactionLagMs = Math.max(log.config().compactionLagMs, 0L);
        long firstUncleanableDirtyOffset = (Long)Stream.of(Optional.of(log.lastStableOffset()), Optional.of(log.activeSegment().baseOffset()), minCompactionLagMs > 0L ? LogCleanerManager.findFirstUncleanableSegment(log, firstDirtyOffset, now, minCompactionLagMs) : Optional.empty()).flatMap(Optional::stream).min(Long::compare).orElseThrow(() -> new IllegalStateException("No uncleanable offset found"));
        LOG.debug("Finding range of cleanable offsets for log={}. Last clean offset={} now={} => firstDirtyOffset={} firstUncleanableOffset={} activeSegment.baseOffset={}", new Object[]{log.name(), lastCleanOffset, now, firstDirtyOffset, firstUncleanableDirtyOffset, log.activeSegment().baseOffset()});
        return new OffsetsToClean(firstDirtyOffset, Math.max(firstDirtyOffset, firstUncleanableDirtyOffset), forceUpdateCheckpoint);
    }

    public static Map.Entry<Long, Long> calculateCleanableBytes(UnifiedLog log, long firstDirtyOffset, long uncleanableOffset) {
        List<LogSegment> nonActiveSegments = log.nonActiveLogSegmentsFrom(uncleanableOffset);
        LogSegment firstUncleanableSegment = nonActiveSegments.isEmpty() ? log.activeSegment() : nonActiveSegments.get(0);
        long firstUncleanableOffset = firstUncleanableSegment.baseOffset();
        long cleanableBytes = log.logSegments(Math.min(firstDirtyOffset, firstUncleanableOffset), firstUncleanableOffset).stream().mapToLong(LogSegment::size).sum();
        return Map.entry(firstUncleanableOffset, cleanableBytes);
    }

    private static Optional<Long> findFirstUncleanableSegment(UnifiedLog log, long firstDirtyOffset, long now, long minCompactionLagMs) throws IOException {
        List<LogSegment> dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset);
        for (LogSegment segment : dirtyNonActiveSegments) {
            boolean isUncleanable = segment.largestTimestamp() > now - minCompactionLagMs;
            LOG.debug("Checking if log segment may be cleaned: log='{}' segment.baseOffset={} segment.largestTimestamp={}; now - compactionLag={}; is uncleanable={}", new Object[]{log.name(), segment.baseOffset(), segment.largestTimestamp(), now - minCompactionLagMs, isUncleanable});
            if (!isUncleanable) continue;
            return Optional.of(segment.baseOffset());
        }
        return Optional.empty();
    }

    public record OffsetsToClean(long firstDirtyOffset, long firstUncleanableDirtyOffset, boolean forceUpdateCheckpoint) {
    }
}

