DfsBlockCache.java

  1. /*
  2.  * Copyright (C) 2008-2011, Google Inc.
  3.  * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org> and others
  4.  *
  5.  * This program and the accompanying materials are made available under the
  6.  * terms of the Eclipse Distribution License v. 1.0 which is available at
  7.  * https://www.eclipse.org/org/documents/edl-v10.php.
  8.  *
  9.  * SPDX-License-Identifier: BSD-3-Clause
  10.  */

  11. package org.eclipse.jgit.internal.storage.dfs;

  12. import java.io.IOException;
  13. import java.time.Duration;
  14. import java.util.Map;
  15. import java.util.concurrent.ConcurrentHashMap;
  16. import java.util.concurrent.atomic.AtomicInteger;
  17. import java.util.concurrent.atomic.AtomicLong;
  18. import java.util.concurrent.atomic.AtomicReference;
  19. import java.util.concurrent.atomic.AtomicReferenceArray;
  20. import java.util.concurrent.locks.ReentrantLock;
  21. import java.util.function.Consumer;
  22. import java.util.stream.LongStream;

  23. import org.eclipse.jgit.internal.JGitText;
  24. import org.eclipse.jgit.internal.storage.pack.PackExt;

  25. /**
  26.  * Caches slices of a
  27.  * {@link org.eclipse.jgit.internal.storage.dfs.BlockBasedFile} in memory for
  28.  * faster read access.
  29.  * <p>
  30.  * The DfsBlockCache serves as a Java based "buffer cache", loading segments of
  31.  * a BlockBasedFile into the JVM heap prior to use. As JGit often wants to do
  32.  * reads of only tiny slices of a file, the DfsBlockCache tries to smooth out
  33.  * these tiny reads into larger block-sized IO operations.
  34.  * <p>
  35.  * Whenever a cache miss occurs, loading is invoked by exactly one thread for
  36.  * the given <code>(DfsStreamKey,position)</code> key tuple. This is ensured by
  37.  * an array of locks, with the tuple hashed to a lock instance.
  38.  * <p>
  39.  * Its too expensive during object access to be accurate with a least recently
  40.  * used (LRU) algorithm. Strictly ordering every read is a lot of overhead that
  41.  * typically doesn't yield a corresponding benefit to the application. This
  42.  * cache implements a clock replacement algorithm, giving each block at least
  43.  * one chance to have been accessed during a sweep of the cache to save itself
  44.  * from eviction. The number of swipe chances is configurable per pack
  45.  * extension.
  46.  * <p>
  47.  * Entities created by the cache are held under hard references, preventing the
  48.  * Java VM from clearing anything. Blocks are discarded by the replacement
  49.  * algorithm when adding a new block would cause the cache to exceed its
  50.  * configured maximum size.
  51.  * <p>
  52.  * The key tuple is passed through to methods as a pair of parameters rather
  53.  * than as a single Object, thus reducing the transient memory allocations of
  54.  * callers. It is more efficient to avoid the allocation, as we can't be 100%
  55.  * sure that a JIT would be able to stack-allocate a key tuple.
  56.  * <p>
  57.  * The internal hash table does not expand at runtime, instead it is fixed in
  58.  * size at cache creation time. The internal lock table used to gate load
  59.  * invocations is also fixed in size.
  60.  */
  61. public final class DfsBlockCache {
  62.     private static volatile DfsBlockCache cache;

  63.     static {
  64.         reconfigure(new DfsBlockCacheConfig());
  65.     }

  66.     /**
  67.      * Modify the configuration of the window cache.
  68.      * <p>
  69.      * The new configuration is applied immediately, and the existing cache is
  70.      * cleared.
  71.      *
  72.      * @param cfg
  73.      *            the new window cache configuration.
  74.      * @throws java.lang.IllegalArgumentException
  75.      *             the cache configuration contains one or more invalid
  76.      *             settings, usually too low of a limit.
  77.      */
  78.     public static void reconfigure(DfsBlockCacheConfig cfg) {
  79.         cache = new DfsBlockCache(cfg);
  80.     }

  81.     /**
  82.      * Get the currently active DfsBlockCache.
  83.      *
  84.      * @return the currently active DfsBlockCache.
  85.      */
  86.     public static DfsBlockCache getInstance() {
  87.         return cache;
  88.     }

  89.     /** Number of entries in {@link #table}. */
  90.     private final int tableSize;

  91.     /** Hash bucket directory; entries are chained below. */
  92.     private final AtomicReferenceArray<HashEntry> table;

  93.     /**
  94.      * Locks to prevent concurrent loads for same (PackFile,position) block. The
  95.      * number of locks is {@link DfsBlockCacheConfig#getConcurrencyLevel()} to
  96.      * cap the overall concurrent block loads.
  97.      */
  98.     private final ReentrantLock[] loadLocks;

  99.     /**
  100.      * A separate pool of locks per pack extension to prevent concurrent loads
  101.      * for same index or bitmap from PackFile.
  102.      */
  103.     private final ReentrantLock[][] refLocks;

  104.     /** Maximum number of bytes the cache should hold. */
  105.     private final long maxBytes;

  106.     /** Pack files smaller than this size can be copied through the cache. */
  107.     private final long maxStreamThroughCache;

  108.     /**
  109.      * Suggested block size to read from pack files in.
  110.      * <p>
  111.      * If a pack file does not have a native block size, this size will be used.
  112.      * <p>
  113.      * If a pack file has a native size, a whole multiple of the native size
  114.      * will be used until it matches this size.
  115.      * <p>
  116.      * The value for blockSize must be a power of 2.
  117.      */
  118.     private final int blockSize;

  119.     /** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */
  120.     private final int blockSizeShift;

  121.     /**
  122.      * Number of times a block was found in the cache, per pack file extension.
  123.      */
  124.     private final AtomicReference<AtomicLong[]> statHit;

  125.     /**
  126.      * Number of times a block was not found, and had to be loaded, per pack
  127.      * file extension.
  128.      */
  129.     private final AtomicReference<AtomicLong[]> statMiss;

  130.     /**
  131.      * Number of blocks evicted due to cache being full, per pack file
  132.      * extension.
  133.      */
  134.     private final AtomicReference<AtomicLong[]> statEvict;

  135.     /**
  136.      * Number of bytes currently loaded in the cache, per pack file extension.
  137.      */
  138.     private final AtomicReference<AtomicLong[]> liveBytes;

  139.     /** Protects the clock and its related data. */
  140.     private final ReentrantLock clockLock;

  141.     /**
  142.      * A consumer of object reference lock wait time milliseconds.  May be used to build a metric.
  143.      */
  144.     private final Consumer<Long> refLockWaitTime;

  145.     /** Current position of the clock. */
  146.     private Ref clockHand;

  147.     /** Limits of cache hot count per pack file extension. */
  148.     private final int[] cacheHotLimits = new int[PackExt.values().length];

  149.     /** Consumer of loading and eviction events of indexes. */
  150.     private final DfsBlockCacheConfig.IndexEventConsumer indexEventConsumer;

  151.     /** Stores timestamps of the last eviction of indexes. */
  152.     private final Map<EvictKey, Long> indexEvictionMap = new ConcurrentHashMap<>();

  153.     @SuppressWarnings("unchecked")
  154.     private DfsBlockCache(DfsBlockCacheConfig cfg) {
  155.         tableSize = tableSize(cfg);
  156.         if (tableSize < 1) {
  157.             throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
  158.         }

  159.         table = new AtomicReferenceArray<>(tableSize);
  160.         int concurrencyLevel = cfg.getConcurrencyLevel();
  161.         loadLocks = new ReentrantLock[concurrencyLevel];
  162.         for (int i = 0; i < loadLocks.length; i++) {
  163.             loadLocks[i] = new ReentrantLock(true /* fair */);
  164.         }
  165.         refLocks = new ReentrantLock[PackExt.values().length][concurrencyLevel];
  166.         for (int i = 0; i < PackExt.values().length; i++) {
  167.             for (int j = 0; j < concurrencyLevel; ++j) {
  168.                 refLocks[i][j] = new ReentrantLock(true /* fair */);
  169.             }
  170.         }

  171.         maxBytes = cfg.getBlockLimit();
  172.         maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
  173.         blockSize = cfg.getBlockSize();
  174.         blockSizeShift = Integer.numberOfTrailingZeros(blockSize);

  175.         clockLock = new ReentrantLock(true /* fair */);
  176.         String none = ""; //$NON-NLS-1$
  177.         clockHand = new Ref<>(
  178.                 DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
  179.                 -1, 0, null);
  180.         clockHand.next = clockHand;

  181.         statHit = new AtomicReference<>(newCounters());
  182.         statMiss = new AtomicReference<>(newCounters());
  183.         statEvict = new AtomicReference<>(newCounters());
  184.         liveBytes = new AtomicReference<>(newCounters());

  185.         refLockWaitTime = cfg.getRefLockWaitTimeConsumer();

  186.         for (int i = 0; i < PackExt.values().length; ++i) {
  187.             Integer limit = cfg.getCacheHotMap().get(PackExt.values()[i]);
  188.             if (limit != null && limit.intValue() > 0) {
  189.                 cacheHotLimits[i] = limit.intValue();
  190.             } else {
  191.                 cacheHotLimits[i] = DfsBlockCacheConfig.DEFAULT_CACHE_HOT_MAX;
  192.             }
  193.         }
  194.         indexEventConsumer = cfg.getIndexEventConsumer();
  195.     }

  196.     boolean shouldCopyThroughCache(long length) {
  197.         return length <= maxStreamThroughCache;
  198.     }

  199.     /**
  200.      * Get total number of bytes in the cache, per pack file extension.
  201.      *
  202.      * @return total number of bytes in the cache, per pack file extension.
  203.      */
  204.     public long[] getCurrentSize() {
  205.         return getStatVals(liveBytes);
  206.     }

  207.     /**
  208.      * Get 0..100, defining how full the cache is.
  209.      *
  210.      * @return 0..100, defining how full the cache is.
  211.      */
  212.     public long getFillPercentage() {
  213.         return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
  214.     }

  215.     /**
  216.      * Get number of requests for items in the cache, per pack file extension.
  217.      *
  218.      * @return number of requests for items in the cache, per pack file
  219.      *         extension.
  220.      */
  221.     public long[] getHitCount() {
  222.         return getStatVals(statHit);
  223.     }

  224.     /**
  225.      * Get number of requests for items not in the cache, per pack file
  226.      * extension.
  227.      *
  228.      * @return number of requests for items not in the cache, per pack file
  229.      *         extension.
  230.      */
  231.     public long[] getMissCount() {
  232.         return getStatVals(statMiss);
  233.     }

  234.     /**
  235.      * Get total number of requests (hit + miss), per pack file extension.
  236.      *
  237.      * @return total number of requests (hit + miss), per pack file extension.
  238.      */
  239.     public long[] getTotalRequestCount() {
  240.         AtomicLong[] hit = statHit.get();
  241.         AtomicLong[] miss = statMiss.get();
  242.         long[] cnt = new long[Math.max(hit.length, miss.length)];
  243.         for (int i = 0; i < hit.length; i++) {
  244.             cnt[i] += hit[i].get();
  245.         }
  246.         for (int i = 0; i < miss.length; i++) {
  247.             cnt[i] += miss[i].get();
  248.         }
  249.         return cnt;
  250.     }

  251.     /**
  252.      * Get hit ratios
  253.      *
  254.      * @return hit ratios
  255.      */
  256.     public long[] getHitRatio() {
  257.         AtomicLong[] hit = statHit.get();
  258.         AtomicLong[] miss = statMiss.get();
  259.         long[] ratio = new long[Math.max(hit.length, miss.length)];
  260.         for (int i = 0; i < ratio.length; i++) {
  261.             if (i >= hit.length) {
  262.                 ratio[i] = 0;
  263.             } else if (i >= miss.length) {
  264.                 ratio[i] = 100;
  265.             } else {
  266.                 long hitVal = hit[i].get();
  267.                 long missVal = miss[i].get();
  268.                 long total = hitVal + missVal;
  269.                 ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
  270.             }
  271.         }
  272.         return ratio;
  273.     }

  274.     /**
  275.      * Get number of evictions performed due to cache being full, per pack file
  276.      * extension.
  277.      *
  278.      * @return number of evictions performed due to cache being full, per pack
  279.      *         file extension.
  280.      */
  281.     public long[] getEvictions() {
  282.         return getStatVals(statEvict);
  283.     }

  284.     /**
  285.      * Quickly check if the cache contains block 0 of the given stream.
  286.      * <p>
  287.      * This can be useful for sophisticated pre-read algorithms to quickly
  288.      * determine if a file is likely already in cache, especially small
  289.      * reftables which may be smaller than a typical DFS block size.
  290.      *
  291.      * @param key
  292.      *            the file to check.
  293.      * @return true if block 0 (the first block) is in the cache.
  294.      */
  295.     public boolean hasBlock0(DfsStreamKey key) {
  296.         HashEntry e1 = table.get(slot(key, 0));
  297.         DfsBlock v = scan(e1, key, 0);
  298.         return v != null && v.contains(key, 0);
  299.     }

  300.     private int hash(int packHash, long off) {
  301.         return packHash + (int) (off >>> blockSizeShift);
  302.     }

  303.     int getBlockSize() {
  304.         return blockSize;
  305.     }

  306.     private static int tableSize(DfsBlockCacheConfig cfg) {
  307.         final int wsz = cfg.getBlockSize();
  308.         final long limit = cfg.getBlockLimit();
  309.         if (wsz <= 0) {
  310.             throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
  311.         }
  312.         if (limit < wsz) {
  313.             throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
  314.         }
  315.         return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
  316.     }

  317.     /**
  318.      * Look up a cached object, creating and loading it if it doesn't exist.
  319.      *
  320.      * @param file
  321.      *            the pack that "contains" the cached object.
  322.      * @param position
  323.      *            offset within <code>pack</code> of the object.
  324.      * @param ctx
  325.      *            current thread's reader.
  326.      * @param fileChannel
  327.      *            supplier for channel to read {@code pack}.
  328.      * @return the object reference.
  329.      * @throws IOException
  330.      *             the reference was not in the cache and could not be loaded.
  331.      */
  332.     DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
  333.             ReadableChannelSupplier fileChannel) throws IOException {
  334.         final long requestedPosition = position;
  335.         position = file.alignToBlock(position);

  336.         DfsStreamKey key = file.key;
  337.         int slot = slot(key, position);
  338.         HashEntry e1 = table.get(slot);
  339.         DfsBlock v = scan(e1, key, position);
  340.         if (v != null && v.contains(key, requestedPosition)) {
  341.             ctx.stats.blockCacheHit++;
  342.             getStat(statHit, key).incrementAndGet();
  343.             return v;
  344.         }

  345.         reserveSpace(blockSize, key);
  346.         ReentrantLock regionLock = lockFor(key, position);
  347.         regionLock.lock();
  348.         try {
  349.             HashEntry e2 = table.get(slot);
  350.             if (e2 != e1) {
  351.                 v = scan(e2, key, position);
  352.                 if (v != null) {
  353.                     ctx.stats.blockCacheHit++;
  354.                     getStat(statHit, key).incrementAndGet();
  355.                     creditSpace(blockSize, key);
  356.                     return v;
  357.                 }
  358.             }

  359.             getStat(statMiss, key).incrementAndGet();
  360.             boolean credit = true;
  361.             try {
  362.                 v = file.readOneBlock(position, ctx, fileChannel.get());
  363.                 credit = false;
  364.             } finally {
  365.                 if (credit) {
  366.                     creditSpace(blockSize, key);
  367.                 }
  368.             }
  369.             if (position != v.start) {
  370.                 // The file discovered its blockSize and adjusted.
  371.                 position = v.start;
  372.                 slot = slot(key, position);
  373.                 e2 = table.get(slot);
  374.             }

  375.             Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
  376.             ref.markHotter();
  377.             for (;;) {
  378.                 HashEntry n = new HashEntry(clean(e2), ref);
  379.                 if (table.compareAndSet(slot, e2, n)) {
  380.                     break;
  381.                 }
  382.                 e2 = table.get(slot);
  383.             }
  384.             addToClock(ref, blockSize - v.size());
  385.         } finally {
  386.             regionLock.unlock();
  387.         }

  388.         // If the block size changed from the default, it is possible the block
  389.         // that was loaded is the wrong block for the requested position.
  390.         if (v.contains(file.key, requestedPosition)) {
  391.             return v;
  392.         }
  393.         return getOrLoad(file, requestedPosition, ctx, fileChannel);
  394.     }

  395.     @SuppressWarnings("unchecked")
  396.     private void reserveSpace(long reserve, DfsStreamKey key) {
  397.         clockLock.lock();
  398.         try {
  399.             long live = LongStream.of(getCurrentSize()).sum() + reserve;
  400.             if (maxBytes < live) {
  401.                 Ref prev = clockHand;
  402.                 Ref hand = clockHand.next;
  403.                 do {
  404.                     if (hand.isHot()) {
  405.                         // Value was recently touched. Cache is still hot so
  406.                         // give it another chance, but cool it down a bit.
  407.                         hand.markColder();
  408.                         prev = hand;
  409.                         hand = hand.next;
  410.                         continue;
  411.                     } else if (prev == hand)
  412.                         break;

  413.                     // No recent access since last scan, kill
  414.                     // value and remove from clock.
  415.                     Ref dead = hand;
  416.                     hand = hand.next;
  417.                     prev.next = hand;
  418.                     dead.next = null;
  419.                     dead.value = null;
  420.                     live -= dead.size;
  421.                     getStat(liveBytes, dead.key).addAndGet(-dead.size);
  422.                     getStat(statEvict, dead.key).incrementAndGet();
  423.                     reportIndexEvicted(dead);
  424.                 } while (maxBytes < live);
  425.                 clockHand = prev;
  426.             }
  427.             getStat(liveBytes, key).addAndGet(reserve);
  428.         } finally {
  429.             clockLock.unlock();
  430.         }
  431.     }

  432.     private void creditSpace(long credit, DfsStreamKey key) {
  433.         clockLock.lock();
  434.         try {
  435.             getStat(liveBytes, key).addAndGet(-credit);
  436.         } finally {
  437.             clockLock.unlock();
  438.         }
  439.     }

  440.     @SuppressWarnings("unchecked")
  441.     private void addToClock(Ref ref, long credit) {
  442.         clockLock.lock();
  443.         try {
  444.             if (credit != 0) {
  445.                 getStat(liveBytes, ref.key).addAndGet(-credit);
  446.             }
  447.             Ref ptr = clockHand;
  448.             ref.next = ptr.next;
  449.             ptr.next = ref;
  450.             clockHand = ref;
  451.         } finally {
  452.             clockLock.unlock();
  453.         }
  454.     }

  455.     void put(DfsBlock v) {
  456.         put(v.stream, v.start, v.size(), v);
  457.     }

  458.     /**
  459.      * Look up a cached object, creating and loading it if it doesn't exist.
  460.      *
  461.      * @param key
  462.      *            the stream key of the pack.
  463.      * @param position
  464.      *            the position in the key. The default should be 0.
  465.      * @param loader
  466.      *            the function to load the reference.
  467.      * @return the object reference.
  468.      * @throws IOException
  469.      *             the reference was not in the cache and could not be loaded.
  470.      */
  471.     <T> Ref<T> getOrLoadRef(
  472.             DfsStreamKey key, long position, RefLoader<T> loader)
  473.             throws IOException {
  474.         long start = System.nanoTime();
  475.         int slot = slot(key, position);
  476.         HashEntry e1 = table.get(slot);
  477.         Ref<T> ref = scanRef(e1, key, position);
  478.         if (ref != null) {
  479.             getStat(statHit, key).incrementAndGet();
  480.             reportIndexRequested(ref, true /* cacheHit */, start);
  481.             return ref;
  482.         }

  483.         ReentrantLock regionLock = lockForRef(key);
  484.         long lockStart = System.currentTimeMillis();
  485.         regionLock.lock();
  486.         try {
  487.             HashEntry e2 = table.get(slot);
  488.             if (e2 != e1) {
  489.                 ref = scanRef(e2, key, position);
  490.                 if (ref != null) {
  491.                     getStat(statHit, key).incrementAndGet();
  492.                     reportIndexRequested(ref, true /* cacheHit */,
  493.                             start);
  494.                     return ref;
  495.                 }
  496.             }

  497.             if (refLockWaitTime != null) {
  498.                 refLockWaitTime.accept(
  499.                         Long.valueOf(System.currentTimeMillis() - lockStart));
  500.             }
  501.             getStat(statMiss, key).incrementAndGet();
  502.             ref = loader.load();
  503.             ref.markHotter();
  504.             // Reserve after loading to get the size of the object
  505.             reserveSpace(ref.size, key);
  506.             for (;;) {
  507.                 HashEntry n = new HashEntry(clean(e2), ref);
  508.                 if (table.compareAndSet(slot, e2, n)) {
  509.                     break;
  510.                 }
  511.                 e2 = table.get(slot);
  512.             }
  513.             addToClock(ref, 0);
  514.         } finally {
  515.             regionLock.unlock();
  516.         }
  517.         reportIndexRequested(ref, false /* cacheHit */, start);
  518.         return ref;
  519.     }

  520.     <T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
  521.         return put(key, 0, size, v);
  522.     }

  523.     <T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) {
  524.         int slot = slot(key, pos);
  525.         HashEntry e1 = table.get(slot);
  526.         Ref<T> ref = scanRef(e1, key, pos);
  527.         if (ref != null) {
  528.             return ref;
  529.         }

  530.         reserveSpace(size, key);
  531.         ReentrantLock regionLock = lockFor(key, pos);
  532.         regionLock.lock();
  533.         try {
  534.             HashEntry e2 = table.get(slot);
  535.             if (e2 != e1) {
  536.                 ref = scanRef(e2, key, pos);
  537.                 if (ref != null) {
  538.                     creditSpace(size, key);
  539.                     return ref;
  540.                 }
  541.             }

  542.             ref = new Ref<>(key, pos, size, v);
  543.             ref.markHotter();
  544.             for (;;) {
  545.                 HashEntry n = new HashEntry(clean(e2), ref);
  546.                 if (table.compareAndSet(slot, e2, n)) {
  547.                     break;
  548.                 }
  549.                 e2 = table.get(slot);
  550.             }
  551.             addToClock(ref, 0);
  552.         } finally {
  553.             regionLock.unlock();
  554.         }
  555.         return ref;
  556.     }

  557.     boolean contains(DfsStreamKey key, long position) {
  558.         return scan(table.get(slot(key, position)), key, position) != null;
  559.     }

  560.     @SuppressWarnings("unchecked")
  561.     <T> T get(DfsStreamKey key, long position) {
  562.         T val = (T) scan(table.get(slot(key, position)), key, position);
  563.         if (val == null) {
  564.             getStat(statMiss, key).incrementAndGet();
  565.         } else {
  566.             getStat(statHit, key).incrementAndGet();
  567.         }
  568.         return val;
  569.     }

  570.     private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
  571.         Ref<T> r = scanRef(n, key, position);
  572.         return r != null ? r.get() : null;
  573.     }

  574.     @SuppressWarnings("unchecked")
  575.     private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
  576.         for (; n != null; n = n.next) {
  577.             Ref<T> r = n.ref;
  578.             if (r.position == position && r.key.equals(key)) {
  579.                 return r.get() != null ? r : null;
  580.             }
  581.         }
  582.         return null;
  583.     }

  584.     private int slot(DfsStreamKey key, long position) {
  585.         return (hash(key.hash, position) >>> 1) % tableSize;
  586.     }

  587.     private ReentrantLock lockFor(DfsStreamKey key, long position) {
  588.         return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
  589.     }

  590.     private ReentrantLock lockForRef(DfsStreamKey key) {
  591.         int slot = (key.hash >>> 1) % refLocks[key.packExtPos].length;
  592.         return refLocks[key.packExtPos][slot];
  593.     }

  594.     private static AtomicLong[] newCounters() {
  595.         AtomicLong[] ret = new AtomicLong[PackExt.values().length];
  596.         for (int i = 0; i < ret.length; i++) {
  597.             ret[i] = new AtomicLong();
  598.         }
  599.         return ret;
  600.     }

  601.     private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
  602.             DfsStreamKey key) {
  603.         int pos = key.packExtPos;
  604.         while (true) {
  605.             AtomicLong[] vals = stats.get();
  606.             if (pos < vals.length) {
  607.                 return vals[pos];
  608.             }
  609.             AtomicLong[] expect = vals;
  610.             vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
  611.             System.arraycopy(expect, 0, vals, 0, expect.length);
  612.             for (int i = expect.length; i < vals.length; i++) {
  613.                 vals[i] = new AtomicLong();
  614.             }
  615.             if (stats.compareAndSet(expect, vals)) {
  616.                 return vals[pos];
  617.             }
  618.         }
  619.     }

  620.     private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
  621.         AtomicLong[] stats = stat.get();
  622.         long[] cnt = new long[stats.length];
  623.         for (int i = 0; i < stats.length; i++) {
  624.             cnt[i] = stats[i].get();
  625.         }
  626.         return cnt;
  627.     }

  628.     private static HashEntry clean(HashEntry top) {
  629.         while (top != null && top.ref.next == null) {
  630.             top = top.next;
  631.         }
  632.         if (top == null) {
  633.             return null;
  634.         }
  635.         HashEntry n = clean(top.next);
  636.         return n == top.next ? top : new HashEntry(n, top.ref);
  637.     }

  638.     private void reportIndexRequested(Ref<?> ref, boolean cacheHit,
  639.             long start) {
  640.         if (indexEventConsumer == null
  641.                 || !isIndexExtPos(ref.key.packExtPos)) {
  642.             return;
  643.         }
  644.         EvictKey evictKey = new EvictKey(ref);
  645.         Long prevEvictedTime = indexEvictionMap.get(evictKey);
  646.         long now = System.nanoTime();
  647.         long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
  648.                 : now - prevEvictedTime.longValue();
  649.         indexEventConsumer.acceptRequestedEvent(ref.key.packExtPos, cacheHit,
  650.                 (now - start) / 1000L /* micros */, ref.size,
  651.                 Duration.ofNanos(sinceLastEvictionNanos));
  652.     }

  653.     private void reportIndexEvicted(Ref<?> dead) {
  654.         if (indexEventConsumer == null
  655.                 || !indexEventConsumer.shouldReportEvictedEvent()
  656.                 || !isIndexExtPos(dead.key.packExtPos)) {
  657.             return;
  658.         }
  659.         EvictKey evictKey = new EvictKey(dead);
  660.         Long prevEvictedTime = indexEvictionMap.get(evictKey);
  661.         long now = System.nanoTime();
  662.         long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
  663.                 : now - prevEvictedTime.longValue();
  664.         indexEvictionMap.put(evictKey, Long.valueOf(now));
  665.         indexEventConsumer.acceptEvictedEvent(dead.key.packExtPos, dead.size,
  666.                 dead.totalHitCount.get(),
  667.                 Duration.ofNanos(sinceLastEvictionNanos));
  668.     }

  669.     private static boolean isIndexExtPos(int packExtPos) {
  670.         return packExtPos == PackExt.INDEX.getPosition()
  671.                 || packExtPos == PackExt.REVERSE_INDEX.getPosition()
  672.                 || packExtPos == PackExt.BITMAP_INDEX.getPosition();
  673.     }

  674.     private static final class HashEntry {
  675.         /** Next entry in the hash table's chain list. */
  676.         final HashEntry next;

  677.         /** The referenced object. */
  678.         final Ref ref;

  679.         HashEntry(HashEntry n, Ref r) {
  680.             next = n;
  681.             ref = r;
  682.         }
  683.     }

  684.     static final class Ref<T> {
  685.         final DfsStreamKey key;
  686.         final long position;
  687.         final long size;
  688.         volatile T value;
  689.         Ref next;

  690.         private volatile int hotCount;
  691.         private AtomicInteger totalHitCount = new AtomicInteger();

  692.         Ref(DfsStreamKey key, long position, long size, T v) {
  693.             this.key = key;
  694.             this.position = position;
  695.             this.size = size;
  696.             this.value = v;
  697.         }

  698.         T get() {
  699.             T v = value;
  700.             if (v != null) {
  701.                 markHotter();
  702.             }
  703.             return v;
  704.         }

  705.         boolean has() {
  706.             return value != null;
  707.         }

  708.         void markHotter() {
  709.             int cap = DfsBlockCache
  710.                     .getInstance().cacheHotLimits[key.packExtPos];
  711.             hotCount = Math.min(cap, hotCount + 1);
  712.             totalHitCount.incrementAndGet();
  713.         }

  714.         void markColder() {
  715.             hotCount = Math.max(0, hotCount - 1);
  716.         }

  717.         boolean isHot() {
  718.             return hotCount > 0;
  719.         }
  720.     }

  721.     private static final class EvictKey {
  722.         private final int keyHash;
  723.         private final int packExtPos;
  724.         private final long position;

  725.         EvictKey(Ref<?> ref) {
  726.             keyHash = ref.key.hash;
  727.             packExtPos = ref.key.packExtPos;
  728.             position = ref.position;
  729.         }

  730.         @Override
  731.         public boolean equals(Object object) {
  732.             if (object instanceof EvictKey) {
  733.                 EvictKey other = (EvictKey) object;
  734.                 return keyHash == other.keyHash
  735.                         && packExtPos == other.packExtPos
  736.                         && position == other.position;
  737.             }
  738.             return false;
  739.         }

  740.         @Override
  741.         public int hashCode() {
  742.             return DfsBlockCache.getInstance().hash(keyHash, position);
  743.         }
  744.     }

  745.     @FunctionalInterface
  746.     interface RefLoader<T> {
  747.         Ref<T> load() throws IOException;
  748.     }

  749.     /**
  750.      * Supplier for readable channel
  751.      */
  752.     @FunctionalInterface
  753.     interface ReadableChannelSupplier {
  754.         /**
  755.          * @return ReadableChannel
  756.          * @throws IOException
  757.          */
  758.         ReadableChannel get() throws IOException;
  759.     }
  760. }