DeltaTask.java

  1. /*
  2.  * Copyright (C) 2010, Google Inc. and others
  3.  *
  4.  * This program and the accompanying materials are made available under the
  5.  * terms of the Eclipse Distribution License v. 1.0 which is available at
  6.  * https://www.eclipse.org/org/documents/edl-v10.php.
  7.  *
  8.  * SPDX-License-Identifier: BSD-3-Clause
  9.  */

  10. package org.eclipse.jgit.internal.storage.pack;

  11. import java.io.IOException;
  12. import java.util.ArrayList;
  13. import java.util.Collections;
  14. import java.util.Iterator;
  15. import java.util.LinkedList;
  16. import java.util.List;
  17. import java.util.concurrent.Callable;

  18. import org.eclipse.jgit.lib.ObjectReader;
  19. import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
  20. import org.eclipse.jgit.storage.pack.PackConfig;

  21. final class DeltaTask implements Callable<Object> {
  22.     static final long MAX_METER = 9 << 20;

  23.     static final class Block {
  24.         private static final int MIN_TOP_PATH = 50 << 20;

  25.         final List<DeltaTask> tasks;
  26.         final int threads;
  27.         final PackConfig config;
  28.         final ObjectReader templateReader;
  29.         final DeltaCache dc;
  30.         final ThreadSafeProgressMonitor pm;
  31.         final ObjectToPack[] list;
  32.         final int beginIndex;
  33.         final int endIndex;

  34.         private long totalWeight;
  35.         long bytesPerUnit;

  36.         Block(int threads, PackConfig config, ObjectReader reader,
  37.                 DeltaCache dc, ThreadSafeProgressMonitor pm,
  38.                 ObjectToPack[] list, int begin, int end) {
  39.             this.tasks = new ArrayList<>(threads);
  40.             this.threads = threads;
  41.             this.config = config;
  42.             this.templateReader = reader;
  43.             this.dc = dc;
  44.             this.pm = pm;
  45.             this.list = list;
  46.             this.beginIndex = begin;
  47.             this.endIndex = end;
  48.         }

  49.         int cost() {
  50.             int d = (int) (totalWeight / bytesPerUnit);
  51.             if (totalWeight % bytesPerUnit != 0)
  52.                 d++;
  53.             return d;
  54.         }

  55.         synchronized DeltaWindow stealWork(DeltaTask forThread) {
  56.             for (;;) {
  57.                 DeltaTask maxTask = null;
  58.                 Slice maxSlice = null;
  59.                 int maxWork = 0;

  60.                 for (DeltaTask task : tasks) {
  61.                     Slice s = task.remaining();
  62.                     if (s != null && maxWork < s.size()) {
  63.                         maxTask = task;
  64.                         maxSlice = s;
  65.                         maxWork = s.size();
  66.                     }
  67.                 }
  68.                 if (maxTask == null) {
  69.                     return null;
  70.                 }
  71.                 if (maxTask.tryStealWork(maxSlice)) {
  72.                     return forThread.initWindow(maxSlice);
  73.                 }
  74.             }
  75.         }

  76.         void partitionTasks() {
  77.             ArrayList<WeightedPath> topPaths = computeTopPaths();
  78.             Iterator<WeightedPath> topPathItr = topPaths.iterator();
  79.             int nextTop = 0;
  80.             long weightPerThread = Math.max(totalWeight / threads, 1);
  81.             for (int i = beginIndex; i < endIndex;) {
  82.                 DeltaTask task = new DeltaTask(this);
  83.                 long w = 0;

  84.                 // Assign the thread one top path.
  85.                 if (topPathItr.hasNext()) {
  86.                     WeightedPath p = topPathItr.next();
  87.                     w += p.weight;
  88.                     task.add(p.slice);
  89.                 }

  90.                 // Assign the task thread ~average weight.
  91.                 int s = i;
  92.                 for (; w < weightPerThread && i < endIndex;) {
  93.                     if (nextTop < topPaths.size()
  94.                             && i == topPaths.get(nextTop).slice.beginIndex) {
  95.                         if (s < i) {
  96.                             task.add(new Slice(s, i));
  97.                         }
  98.                         s = i = topPaths.get(nextTop++).slice.endIndex;
  99.                     } else {
  100.                         w += getAdjustedWeight(list[i++]);
  101.                     }
  102.                 }

  103.                 // Round up the slice to the end of a path.
  104.                 if (s < i) {
  105.                     int h = list[i - 1].getPathHash();
  106.                     while (i < endIndex) {
  107.                         if (h == list[i].getPathHash()) {
  108.                             i++;
  109.                         } else {
  110.                             break;
  111.                         }
  112.                     }
  113.                     task.add(new Slice(s, i));
  114.                 }
  115.                 if (!task.slices.isEmpty()) {
  116.                     tasks.add(task);
  117.                 }
  118.             }
  119.             while (topPathItr.hasNext()) {
  120.                 WeightedPath p = topPathItr.next();
  121.                 DeltaTask task = new DeltaTask(this);
  122.                 task.add(p.slice);
  123.                 tasks.add(task);
  124.             }

  125.             topPaths = null;
  126.         }

  127.         private ArrayList<WeightedPath> computeTopPaths() {
  128.             ArrayList<WeightedPath> topPaths = new ArrayList<>(
  129.                     threads);
  130.             int cp = beginIndex;
  131.             int ch = list[cp].getPathHash();
  132.             long cw = getAdjustedWeight(list[cp]);
  133.             totalWeight = cw;

  134.             for (int i = cp + 1; i < endIndex; i++) {
  135.                 ObjectToPack o = list[i];
  136.                 if (ch != o.getPathHash()) {
  137.                     if (MIN_TOP_PATH < cw) {
  138.                         if (topPaths.size() < threads) {
  139.                             Slice s = new Slice(cp, i);
  140.                             topPaths.add(new WeightedPath(cw, s));
  141.                             if (topPaths.size() == threads) {
  142.                                 Collections.sort(topPaths);
  143.                             }
  144.                         } else if (topPaths.get(0).weight < cw) {
  145.                             Slice s = new Slice(cp, i);
  146.                             WeightedPath p = new WeightedPath(cw, s);
  147.                             topPaths.set(0, p);
  148.                             if (p.compareTo(topPaths.get(1)) > 0) {
  149.                                 Collections.sort(topPaths);
  150.                             }
  151.                         }
  152.                     }
  153.                     cp = i;
  154.                     ch = o.getPathHash();
  155.                     cw = 0;
  156.                 }
  157.                 int weight = getAdjustedWeight(o);
  158.                 cw += weight;
  159.                 totalWeight += weight;
  160.             }

  161.             // Sort by starting index to identify gaps later.
  162.             Collections.sort(topPaths, (WeightedPath a,
  163.                     WeightedPath b) -> a.slice.beginIndex - b.slice.beginIndex);

  164.             bytesPerUnit = 1;
  165.             while (MAX_METER <= (totalWeight / bytesPerUnit)) {
  166.                 bytesPerUnit <<= 10;
  167.             }
  168.             return topPaths;
  169.         }
  170.     }

  171.     static int getAdjustedWeight(ObjectToPack o) {
  172.         // Edge objects and those with reused deltas do not need to be
  173.         // compressed. For compression calculations, ignore their weights.
  174.         if (o.isEdge() || o.doNotAttemptDelta()) {
  175.             return 0;
  176.         }
  177.         return o.getWeight();
  178.     }

  179.     static final class WeightedPath implements Comparable<WeightedPath> {
  180.         final long weight;
  181.         final Slice slice;

  182.         WeightedPath(long weight, Slice s) {
  183.             this.weight = weight;
  184.             this.slice = s;
  185.         }

  186.         @Override
  187.         public int compareTo(WeightedPath o) {
  188.             int cmp = Long.signum(weight - o.weight);
  189.             if (cmp != 0) {
  190.                 return cmp;
  191.             }
  192.             return slice.beginIndex - o.slice.beginIndex;
  193.         }
  194.     }

  195.     static final class Slice {
  196.         final int beginIndex;
  197.         final int endIndex;

  198.         Slice(int b, int e) {
  199.             beginIndex = b;
  200.             endIndex = e;
  201.         }

  202.         final int size() {
  203.             return endIndex - beginIndex;
  204.         }
  205.     }

  206.     private final Block block;
  207.     final LinkedList<Slice> slices;

  208.     private ObjectReader or;
  209.     private DeltaWindow dw;

  210.     DeltaTask(Block b) {
  211.         this.block = b;
  212.         this.slices = new LinkedList<>();
  213.     }

  214.     void add(Slice s) {
  215.         if (!slices.isEmpty()) {
  216.             Slice last = slices.getLast();
  217.             if (last.endIndex == s.beginIndex) {
  218.                 slices.removeLast();
  219.                 slices.add(new Slice(last.beginIndex, s.endIndex));
  220.                 return;
  221.             }
  222.         }
  223.         slices.add(s);
  224.     }

  225.     /** {@inheritDoc} */
  226.     @Override
  227.     public Object call() throws Exception {
  228.         or = block.templateReader.newReader();
  229.         try {
  230.             DeltaWindow w;
  231.             for (;;) {
  232.                 synchronized (this) {
  233.                     if (slices.isEmpty()) {
  234.                         break;
  235.                     }
  236.                     w = initWindow(slices.removeFirst());
  237.                 }
  238.                 runWindow(w);
  239.             }
  240.             while ((w = block.stealWork(this)) != null) {
  241.                 runWindow(w);
  242.             }
  243.         } finally {
  244.             block.pm.endWorker();
  245.             or.close();
  246.             or = null;
  247.         }
  248.         return null;
  249.     }

  250.     DeltaWindow initWindow(Slice s) {
  251.         DeltaWindow w = new DeltaWindow(block.config, block.dc,
  252.                 or, block.pm, block.bytesPerUnit,
  253.                 block.list, s.beginIndex, s.endIndex);
  254.         synchronized (this) {
  255.             dw = w;
  256.         }
  257.         return w;
  258.     }

  259.     private void runWindow(DeltaWindow w) throws IOException {
  260.         try {
  261.             w.search();
  262.         } finally {
  263.             synchronized (this) {
  264.                 dw = null;
  265.             }
  266.         }
  267.     }

  268.     synchronized Slice remaining() {
  269.         if (!slices.isEmpty()) {
  270.             return slices.getLast();
  271.         }
  272.         DeltaWindow d = dw;
  273.         return d != null ? d.remaining() : null;
  274.     }

  275.     synchronized boolean tryStealWork(Slice s) {
  276.         if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
  277.             slices.removeLast();
  278.             return true;
  279.         }
  280.         DeltaWindow d = dw;
  281.         return d != null ? d.tryStealWork(s) : false;
  282.     }
  283. }