Browse Source

acquire: Use priority queues and a 3 stage pipeline design

Employ a priority queue instead of a normal queue to hold
the items; and only add items to the running pipeline if
their priority is the same or higher than the priority
of items in the queue.

The priorities are designed for a 3 stage pipeline system:

In stage 1, all Release files and .diff/Index files are fetched. This
allows us to determine what files remain to be fetched, and thus
ensures a usable progress reporting.

In stage 2, all Pdiff patches are fetched, so we can apply them
in parallel with fetching other files in stage 3.

In stage 3, all other files are fetched (complete index files
such as Contents, Packages).

Performance improvements, mainly from fetching the pdiff patches
before complete files, so they can be applied in parallel:

For the 01 Sep 2016 03:35:23 UTC -> 02 Sep 2016 09:25:37 update
of Debian unstable and testing with Contents and appstream for
amd64 and i386, update time reduced from 37 seconds to 24-28
seconds.

Previously, apt would first download new DEP11 icon tarballs
and metadata files, causing the CPU to be idle. By fetching
the diffs in stage 2, we can now patch our contents and Packages
files while we are downloading the DEP11 stuff.
Julian Andres Klode 7 years ago
parent
commit
2a440328ea

+ 22 - 0
apt-pkg/acquire-item.cc

@@ -1019,6 +1019,28 @@ bool pkgAcquire::Item::IsRedirectionLoop(std::string const &NewURI)	/*{{{*/
 }
 									/*}}}*/
 
+																		/*}}}*/
+int pkgAcquire::Item::Priority() 				/*{{{*/
+{
+   // Stage 1: Meta indices and diff indices
+   // - those need to be fetched first to have progress reporting working
+   //   for the rest
+   if (dynamic_cast<pkgAcqMetaSig*>(this) != nullptr
+       || dynamic_cast<pkgAcqMetaBase*>(this) != nullptr
+       || dynamic_cast<pkgAcqDiffIndex*>(this) != nullptr)
+      return 1000;
+   // Stage 2: Diff files
+   // - fetch before complete indexes so we can apply the diffs while fetching
+   //   larger files.
+   if (dynamic_cast<pkgAcqIndexDiffs*>(this) != nullptr ||
+       dynamic_cast<pkgAcqIndexMergeDiffs*>(this) != nullptr)
+      return 800;
+
+   // Stage 3: The rest - complete index files and other stuff
+   return 500;
+}
+									/*}}}*/
+
 pkgAcqTransactionItem::pkgAcqTransactionItem(pkgAcquire * const Owner,	/*{{{*/
       pkgAcqMetaClearSig * const transactionManager, IndexTarget const &target) :
    pkgAcquire::Item(Owner), d(NULL), Target(target), TransactionManager(transactionManager)

+ 2 - 0
apt-pkg/acquire-item.h

@@ -305,6 +305,8 @@ class pkgAcquire::Item : public WeakPointable				/*{{{*/
    virtual ~Item();
 
    bool APT_HIDDEN IsRedirectionLoop(std::string const &NewURI);
+   /** \brief The priority of the item, used for queuing */
+   int APT_HIDDEN Priority();
 
    protected:
    /** \brief The acquire object with which this item is associated. */

+ 32 - 4
apt-pkg/acquire.cc

@@ -894,9 +894,10 @@ pkgAcquire::Queue::~Queue()
 /* */
 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
 {
+   QItem **OptimalI = &Items;
    QItem **I = &Items;
    // move to the end of the queue and check for duplicates here
-   for (; *I != 0; I = &(*I)->Next)
+   for (; *I != 0; ) {
       if (Item.URI == (*I)->URI)
       {
 	 if (_config->FindB("Debug::pkgAcquire::Worker",false) == true)
@@ -905,12 +906,22 @@ bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
 	 Item.Owner->Status = (*I)->Owner->Status;
 	 return false;
       }
+      // Determine the optimal position to insert: before anything with a
+      // higher priority.
+      int priority = (*I)->GetPriority();
+
+      I = &(*I)->Next;
+      if (priority >= Item.Owner->Priority()) {
+	 OptimalI = I;
+      }
+   }
+
 
    // Create a new item
    QItem *Itm = new QItem;
    *Itm = Item;
-   Itm->Next = 0;
-   *I = Itm;
+   Itm->Next = *OptimalI;
+   *OptimalI = Itm;
    
    Item.Owner->QueueCounter++;   
    if (Items->Next == 0)
@@ -1060,16 +1071,24 @@ bool pkgAcquire::Queue::Cycle()
 
    // Look for a queable item
    QItem *I = Items;
+   int ActivePriority = 0;
    while (PipeDepth < (signed)MaxPipeDepth)
    {
-      for (; I != 0; I = I->Next)
+      for (; I != 0; I = I->Next) {
+	 if (I->Owner->Status == pkgAcquire::Item::StatFetching)
+	    ActivePriority = std::max(ActivePriority, I->GetPriority());
 	 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
 	    break;
+      }
 
       // Nothing to do, queue is idle.
       if (I == 0)
 	 return true;
 
+      // This item has a lower priority than stuff in the pipeline, pretend
+      // the queue is idle
+      if (I->GetPriority() < ActivePriority)
+	 return true;
       I->Worker = Workers;
       for (auto const &O: I->Owners)
 	 O->Status = pkgAcquire::Item::StatFetching;
@@ -1135,6 +1154,15 @@ APT_PURE unsigned long long pkgAcquire::Queue::QItem::GetMaximumSize() const	/*{
    return Maximum;
 }
 									/*}}}*/
+APT_PURE int pkgAcquire::Queue::QItem::GetPriority() const	/*{{{*/
+{
+   int Priority = 0;
+   for (auto const &O: Owners)
+      Priority = std::max(Priority, O->Priority());
+
+   return Priority;
+}
+									/*}}}*/
 void pkgAcquire::Queue::QItem::SyncDestinationFiles() const		/*{{{*/
 {
    /* ensure that the first owner has the best partial file of all and

+ 2 - 0
apt-pkg/acquire.h

@@ -464,6 +464,8 @@ class pkgAcquire::Queue
 
       /** @return the custom headers to use for this item */
       std::string Custom600Headers() const;
+      /** @return the maximum priority of this item */
+      int APT_HIDDEN GetPriority() const;
    };
 
    /** \brief The name of this queue. */

+ 2 - 2
test/integration/test-apt-sources-deb822

@@ -98,10 +98,10 @@ echo "$BASE" > $SOURCES
 echo "" >> $SOURCES
 echo "$BASE" | sed  s/stable/unstable/  >> $SOURCES
 testsuccessequal --nomsg "'http://ftp.debian.org/debian/dists/stable/InRelease' ftp.debian.org_debian_dists_stable_InRelease 0 
+'http://ftp.debian.org/debian/dists/unstable/InRelease' ftp.debian.org_debian_dists_unstable_InRelease 0 
 'http://ftp.debian.org/debian/dists/stable/main/binary-i386/Packages.xz' ftp.debian.org_debian_dists_stable_main_binary-i386_Packages 0 
 'http://ftp.debian.org/debian/dists/stable/main/binary-all/Packages.xz' ftp.debian.org_debian_dists_stable_main_binary-all_Packages 0 
 'http://ftp.debian.org/debian/dists/stable/main/i18n/Translation-en.xz' ftp.debian.org_debian_dists_stable_main_i18n_Translation-en 0 
-'http://ftp.debian.org/debian/dists/unstable/InRelease' ftp.debian.org_debian_dists_unstable_InRelease 0 
 'http://ftp.debian.org/debian/dists/unstable/main/binary-i386/Packages.xz' ftp.debian.org_debian_dists_unstable_main_binary-i386_Packages 0 
 'http://ftp.debian.org/debian/dists/unstable/main/binary-all/Packages.xz' ftp.debian.org_debian_dists_unstable_main_binary-all_Packages 0 
 'http://ftp.debian.org/debian/dists/unstable/main/i18n/Translation-en.xz' ftp.debian.org_debian_dists_unstable_main_i18n_Translation-en 0 " aptget update --print-uris
@@ -110,10 +110,10 @@ testsuccessequal --nomsg "'http://ftp.debian.org/debian/dists/stable/InRelease'
 msgcleantest 'Test deb822 with' 'two Suite entries'
 echo "$BASE"  | sed -e "s/stable/stable unstable/" > $SOURCES
 testsuccessequal --nomsg "'http://ftp.debian.org/debian/dists/stable/InRelease' ftp.debian.org_debian_dists_stable_InRelease 0 
+'http://ftp.debian.org/debian/dists/unstable/InRelease' ftp.debian.org_debian_dists_unstable_InRelease 0 
 'http://ftp.debian.org/debian/dists/stable/main/binary-i386/Packages.xz' ftp.debian.org_debian_dists_stable_main_binary-i386_Packages 0 
 'http://ftp.debian.org/debian/dists/stable/main/binary-all/Packages.xz' ftp.debian.org_debian_dists_stable_main_binary-all_Packages 0 
 'http://ftp.debian.org/debian/dists/stable/main/i18n/Translation-en.xz' ftp.debian.org_debian_dists_stable_main_i18n_Translation-en 0 
-'http://ftp.debian.org/debian/dists/unstable/InRelease' ftp.debian.org_debian_dists_unstable_InRelease 0 
 'http://ftp.debian.org/debian/dists/unstable/main/binary-i386/Packages.xz' ftp.debian.org_debian_dists_unstable_main_binary-i386_Packages 0 
 'http://ftp.debian.org/debian/dists/unstable/main/binary-all/Packages.xz' ftp.debian.org_debian_dists_unstable_main_binary-all_Packages 0 
 'http://ftp.debian.org/debian/dists/unstable/main/i18n/Translation-en.xz' ftp.debian.org_debian_dists_unstable_main_i18n_Translation-en 0 " aptget update --print-uris