123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958 |
- // -*- mode: cpp; mode: fold -*-
- // Description /*{{{*/
- // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
- /* ######################################################################
- Acquire - File Acquiration
- The core element for the schedule system is the concept of a named
- queue. Each queue is unique and each queue has a name derived from the
- URI. The degree of paralization can be controlled by how the queue
- name is derived from the URI.
-
- ##################################################################### */
- /*}}}*/
- // Include Files /*{{{*/
- #include <config.h>
- #include <apt-pkg/acquire.h>
- #include <apt-pkg/acquire-item.h>
- #include <apt-pkg/acquire-worker.h>
- #include <apt-pkg/configuration.h>
- #include <apt-pkg/error.h>
- #include <apt-pkg/strutl.h>
- #include <apt-pkg/fileutl.h>
- #include <iostream>
- #include <sstream>
- #include <stdio.h>
- #include <dirent.h>
- #include <sys/time.h>
- #include <errno.h>
- #include <apti18n.h>
- /*}}}*/
- using namespace std;
- // Acquire::pkgAcquire - Constructor /*{{{*/
- // ---------------------------------------------------------------------
- /* We grab some runtime state from the configuration space */
- pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
- Debug(_config->FindB("Debug::pkgAcquire",false)),
- Running(false)
- {
- string const Mode = _config->Find("Acquire::Queue-Mode","host");
- if (strcasecmp(Mode.c_str(),"host") == 0)
- QueueMode = QueueHost;
- if (strcasecmp(Mode.c_str(),"access") == 0)
- QueueMode = QueueAccess;
- }
- pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
- Configs(0), Log(Progress), ToFetch(0),
- Debug(_config->FindB("Debug::pkgAcquire",false)),
- Running(false)
- {
- string const Mode = _config->Find("Acquire::Queue-Mode","host");
- if (strcasecmp(Mode.c_str(),"host") == 0)
- QueueMode = QueueHost;
- if (strcasecmp(Mode.c_str(),"access") == 0)
- QueueMode = QueueAccess;
- Setup(Progress, "");
- }
- /*}}}*/
- // Acquire::Setup - Delayed Constructor /*{{{*/
- // ---------------------------------------------------------------------
- /* Do everything needed to be a complete Acquire object and report the
- success (or failure) back so the user knows that something is wrong… */
- bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
- {
- Log = Progress;
- // check for existence and possibly create auxiliary directories
- string const listDir = _config->FindDir("Dir::State::lists");
- string const partialListDir = listDir + "partial/";
- string const archivesDir = _config->FindDir("Dir::Cache::Archives");
- string const partialArchivesDir = archivesDir + "partial/";
- if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
- CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
- return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
- if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
- CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
- return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
- if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
- return true;
- // Lock the directory this acquire object will work in
- LockFD = GetLock(flCombine(Lock, "lock"));
- if (LockFD == -1)
- return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
- return true;
- }
- /*}}}*/
- // Acquire::~pkgAcquire - Destructor /*{{{*/
- // ---------------------------------------------------------------------
- /* Free our memory, clean up the queues (destroy the workers) */
- pkgAcquire::~pkgAcquire()
- {
- Shutdown();
- if (LockFD != -1)
- close(LockFD);
- while (Configs != 0)
- {
- MethodConfig *Jnk = Configs;
- Configs = Configs->Next;
- delete Jnk;
- }
- }
- /*}}}*/
- // Acquire::Shutdown - Clean out the acquire object /*{{{*/
- // ---------------------------------------------------------------------
- /* */
- void pkgAcquire::Shutdown()
- {
- while (Items.empty() == false)
- {
- if (Items[0]->Status == Item::StatFetching)
- Items[0]->Status = Item::StatError;
- delete Items[0];
- }
- while (Queues != 0)
- {
- Queue *Jnk = Queues;
- Queues = Queues->Next;
- delete Jnk;
- }
- }
- /*}}}*/
- // Acquire::Add - Add a new item /*{{{*/
- // ---------------------------------------------------------------------
- /* This puts an item on the acquire list. This list is mainly for tracking
- item status */
- void pkgAcquire::Add(Item *Itm)
- {
- Items.push_back(Itm);
- }
- /*}}}*/
- // Acquire::Remove - Remove a item /*{{{*/
- // ---------------------------------------------------------------------
- /* Remove an item from the acquire list. This is usually not used.. */
- void pkgAcquire::Remove(Item *Itm)
- {
- Dequeue(Itm);
-
- for (ItemIterator I = Items.begin(); I != Items.end();)
- {
- if (*I == Itm)
- {
- Items.erase(I);
- I = Items.begin();
- }
- else
- ++I;
- }
- }
- /*}}}*/
- // Acquire::Add - Add a worker /*{{{*/
- // ---------------------------------------------------------------------
- /* A list of workers is kept so that the select loop can direct their FD
- usage. */
- void pkgAcquire::Add(Worker *Work)
- {
- Work->NextAcquire = Workers;
- Workers = Work;
- }
- /*}}}*/
- // Acquire::Remove - Remove a worker /*{{{*/
- // ---------------------------------------------------------------------
- /* A worker has died. This can not be done while the select loop is running
- as it would require that RunFds could handling a changing list state and
- it can't.. */
- void pkgAcquire::Remove(Worker *Work)
- {
- if (Running == true)
- abort();
-
- Worker **I = &Workers;
- for (; *I != 0;)
- {
- if (*I == Work)
- *I = (*I)->NextAcquire;
- else
- I = &(*I)->NextAcquire;
- }
- }
- /*}}}*/
- // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
- // ---------------------------------------------------------------------
- /* This is the entry point for an item. An item calls this function when
- it is constructed which creates a queue (based on the current queue
- mode) and puts the item in that queue. If the system is running then
- the queue might be started. */
- void pkgAcquire::Enqueue(ItemDesc &Item)
- {
- // Determine which queue to put the item in
- const MethodConfig *Config;
- string Name = QueueName(Item.URI,Config);
- if (Name.empty() == true)
- return;
- // Find the queue structure
- Queue *I = Queues;
- for (; I != 0 && I->Name != Name; I = I->Next);
- if (I == 0)
- {
- I = new Queue(Name,this);
- I->Next = Queues;
- Queues = I;
-
- if (Running == true)
- I->Startup();
- }
- // See if this is a local only URI
- if (Config->LocalOnly == true && Item.Owner->Complete == false)
- Item.Owner->Local = true;
- Item.Owner->Status = Item::StatIdle;
-
- // Queue it into the named queue
- if(I->Enqueue(Item))
- ToFetch++;
-
- // Some trace stuff
- if (Debug == true)
- {
- clog << "Fetching " << Item.URI << endl;
- clog << " to " << Item.Owner->DestFile << endl;
- clog << " Queue is: " << Name << endl;
- }
- }
- /*}}}*/
- // Acquire::Dequeue - Remove an item from all queues /*{{{*/
- // ---------------------------------------------------------------------
- /* This is called when an item is finished being fetched. It removes it
- from all the queues */
- void pkgAcquire::Dequeue(Item *Itm)
- {
- Queue *I = Queues;
- bool Res = false;
- if (Debug == true)
- clog << "Dequeuing " << Itm->DestFile << endl;
- for (; I != 0; I = I->Next)
- {
- if (I->Dequeue(Itm))
- {
- Res = true;
- if (Debug == true)
- clog << "Dequeued from " << I->Name << endl;
- }
- }
- if (Res == true)
- ToFetch--;
- }
- /*}}}*/
- // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
- // ---------------------------------------------------------------------
- /* The string returned depends on the configuration settings and the
- method parameters. Given something like http://foo.org/bar it can
- return http://foo.org or http */
- string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
- {
- URI U(Uri);
-
- Config = GetConfig(U.Access);
- if (Config == 0)
- return string();
-
- /* Single-Instance methods get exactly one queue per URI. This is
- also used for the Access queue method */
- if (Config->SingleInstance == true || QueueMode == QueueAccess)
- return U.Access;
- string AccessSchema = U.Access + ':',
- FullQueueName = AccessSchema + U.Host;
- unsigned int Instances = 0, SchemaLength = AccessSchema.length();
- Queue *I = Queues;
- for (; I != 0; I = I->Next) {
- // if the queue already exists, re-use it
- if (I->Name == FullQueueName)
- return FullQueueName;
- if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
- Instances++;
- }
- if (Debug) {
- clog << "Found " << Instances << " instances of " << U.Access << endl;
- }
- if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
- return U.Access;
- return FullQueueName;
- }
- /*}}}*/
- // Acquire::GetConfig - Fetch the configuration information /*{{{*/
- // ---------------------------------------------------------------------
- /* This locates the configuration structure for an access method. If
- a config structure cannot be found a Worker will be created to
- retrieve it */
- pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
- {
- // Search for an existing config
- MethodConfig *Conf;
- for (Conf = Configs; Conf != 0; Conf = Conf->Next)
- if (Conf->Access == Access)
- return Conf;
-
- // Create the new config class
- Conf = new MethodConfig;
- Conf->Access = Access;
- Conf->Next = Configs;
- Configs = Conf;
- // Create the worker to fetch the configuration
- Worker Work(Conf);
- if (Work.Start() == false)
- return 0;
- /* if a method uses DownloadLimit, we switch to SingleInstance mode */
- if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
- Conf->SingleInstance = true;
-
- return Conf;
- }
- /*}}}*/
- // Acquire::SetFds - Deal with readable FDs /*{{{*/
- // ---------------------------------------------------------------------
- /* Collect FDs that have activity monitors into the fd sets */
- void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
- {
- for (Worker *I = Workers; I != 0; I = I->NextAcquire)
- {
- if (I->InReady == true && I->InFd >= 0)
- {
- if (Fd < I->InFd)
- Fd = I->InFd;
- FD_SET(I->InFd,RSet);
- }
- if (I->OutReady == true && I->OutFd >= 0)
- {
- if (Fd < I->OutFd)
- Fd = I->OutFd;
- FD_SET(I->OutFd,WSet);
- }
- }
- }
- /*}}}*/
- // Acquire::RunFds - Deal with active FDs /*{{{*/
- // ---------------------------------------------------------------------
- /* Dispatch active FDs over to the proper workers. It is very important
- that a worker never be erased while this is running! The queue class
- should never erase a worker except during shutdown processing. */
- void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
- {
- for (Worker *I = Workers; I != 0; I = I->NextAcquire)
- {
- if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
- I->InFdReady();
- if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
- I->OutFdReady();
- }
- }
- /*}}}*/
- // Acquire::Run - Run the fetch sequence /*{{{*/
- // ---------------------------------------------------------------------
- /* This runs the queues. It manages a select loop for all of the
- Worker tasks. The workers interact with the queues and items to
- manage the actual fetch. */
- pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
- {
- Running = true;
-
- for (Queue *I = Queues; I != 0; I = I->Next)
- I->Startup();
-
- if (Log != 0)
- Log->Start();
-
- bool WasCancelled = false;
- // Run till all things have been acquired
- struct timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = PulseIntervall;
- while (ToFetch > 0)
- {
- fd_set RFds;
- fd_set WFds;
- int Highest = 0;
- FD_ZERO(&RFds);
- FD_ZERO(&WFds);
- SetFds(Highest,&RFds,&WFds);
-
- int Res;
- do
- {
- Res = select(Highest+1,&RFds,&WFds,0,&tv);
- }
- while (Res < 0 && errno == EINTR);
-
- if (Res < 0)
- {
- _error->Errno("select","Select has failed");
- break;
- }
-
- RunFds(&RFds,&WFds);
- if (_error->PendingError() == true)
- break;
-
- // Timeout, notify the log class
- if (Res == 0 || (Log != 0 && Log->Update == true))
- {
- tv.tv_usec = PulseIntervall;
- for (Worker *I = Workers; I != 0; I = I->NextAcquire)
- I->Pulse();
- if (Log != 0 && Log->Pulse(this) == false)
- {
- WasCancelled = true;
- break;
- }
- }
- }
- if (Log != 0)
- Log->Stop();
-
- // Shut down the acquire bits
- Running = false;
- for (Queue *I = Queues; I != 0; I = I->Next)
- I->Shutdown(false);
- // Shut down the items
- for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
- (*I)->Finished();
-
- if (_error->PendingError())
- return Failed;
- if (WasCancelled)
- return Cancelled;
- return Continue;
- }
- /*}}}*/
- // Acquire::Bump - Called when an item is dequeued /*{{{*/
- // ---------------------------------------------------------------------
- /* This routine bumps idle queues in hopes that they will be able to fetch
- the dequeued item */
- void pkgAcquire::Bump()
- {
- for (Queue *I = Queues; I != 0; I = I->Next)
- I->Bump();
- }
- /*}}}*/
- // Acquire::WorkerStep - Step to the next worker /*{{{*/
- // ---------------------------------------------------------------------
- /* Not inlined to advoid including acquire-worker.h */
- pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
- {
- return I->NextAcquire;
- };
- /*}}}*/
- // Acquire::Clean - Cleans a directory /*{{{*/
- // ---------------------------------------------------------------------
- /* This is a bit simplistic, it looks at every file in the dir and sees
- if it is part of the download set. */
- bool pkgAcquire::Clean(string Dir)
- {
- // non-existing directories are by definition clean…
- if (DirectoryExists(Dir) == false)
- return true;
- DIR *D = opendir(Dir.c_str());
- if (D == 0)
- return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
-
- string StartDir = SafeGetCWD();
- if (chdir(Dir.c_str()) != 0)
- {
- closedir(D);
- return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
- }
-
- for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
- {
- // Skip some files..
- if (strcmp(Dir->d_name,"lock") == 0 ||
- strcmp(Dir->d_name,"partial") == 0 ||
- strcmp(Dir->d_name,".") == 0 ||
- strcmp(Dir->d_name,"..") == 0)
- continue;
-
- // Look in the get list
- ItemCIterator I = Items.begin();
- for (; I != Items.end(); ++I)
- if (flNotDir((*I)->DestFile) == Dir->d_name)
- break;
-
- // Nothing found, nuke it
- if (I == Items.end())
- unlink(Dir->d_name);
- };
-
- closedir(D);
- if (chdir(StartDir.c_str()) != 0)
- return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
- return true;
- }
- /*}}}*/
- // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
- // ---------------------------------------------------------------------
- /* This is the total number of bytes needed */
- unsigned long long pkgAcquire::TotalNeeded()
- {
- unsigned long long Total = 0;
- for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
- Total += (*I)->FileSize;
- return Total;
- }
- /*}}}*/
- // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
- // ---------------------------------------------------------------------
- /* This is the number of bytes that is not local */
- unsigned long long pkgAcquire::FetchNeeded()
- {
- unsigned long long Total = 0;
- for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
- if ((*I)->Local == false)
- Total += (*I)->FileSize;
- return Total;
- }
- /*}}}*/
- // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
- // ---------------------------------------------------------------------
- /* This is the number of bytes that is not local */
- unsigned long long pkgAcquire::PartialPresent()
- {
- unsigned long long Total = 0;
- for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
- if ((*I)->Local == false)
- Total += (*I)->PartialSize;
- return Total;
- }
- /*}}}*/
- // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
- // ---------------------------------------------------------------------
- /* */
- pkgAcquire::UriIterator pkgAcquire::UriBegin()
- {
- return UriIterator(Queues);
- }
- /*}}}*/
- // Acquire::UriEnd - End iterator for the uri list /*{{{*/
- // ---------------------------------------------------------------------
- /* */
- pkgAcquire::UriIterator pkgAcquire::UriEnd()
- {
- return UriIterator(0);
- }
- /*}}}*/
- // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
- // ---------------------------------------------------------------------
- /* */
- pkgAcquire::MethodConfig::MethodConfig()
- {
- SingleInstance = false;
- Pipeline = false;
- SendConfig = false;
- LocalOnly = false;
- Removable = false;
- Next = 0;
- }
- /*}}}*/
- // Queue::Queue - Constructor /*{{{*/
- // ---------------------------------------------------------------------
- /* */
- pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
- Owner(Owner)
- {
- Items = 0;
- Next = 0;
- Workers = 0;
- MaxPipeDepth = 1;
- PipeDepth = 0;
- }
- /*}}}*/
- // Queue::~Queue - Destructor /*{{{*/
- // ---------------------------------------------------------------------
- /* */
- pkgAcquire::Queue::~Queue()
- {
- Shutdown(true);
-
- while (Items != 0)
- {
- QItem *Jnk = Items;
- Items = Items->Next;
- delete Jnk;
- }
- }
- /*}}}*/
- // Queue::Enqueue - Queue an item to the queue /*{{{*/
- // ---------------------------------------------------------------------
- /* */
- bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
- {
- QItem **I = &Items;
- // move to the end of the queue and check for duplicates here
- for (; *I != 0; I = &(*I)->Next)
- if (Item.URI == (*I)->URI)
- {
- Item.Owner->Status = Item::StatDone;
- return false;
- }
- // Create a new item
- QItem *Itm = new QItem;
- *Itm = Item;
- Itm->Next = 0;
- *I = Itm;
-
- Item.Owner->QueueCounter++;
- if (Items->Next == 0)
- Cycle();
- return true;
- }
- /*}}}*/
- // Queue::Dequeue - Remove an item from the queue /*{{{*/
- // ---------------------------------------------------------------------
- /* We return true if we hit something */
- bool pkgAcquire::Queue::Dequeue(Item *Owner)
- {
- if (Owner->Status == pkgAcquire::Item::StatFetching)
- return _error->Error("Tried to dequeue a fetching object");
-
- bool Res = false;
-
- QItem **I = &Items;
- for (; *I != 0;)
- {
- if ((*I)->Owner == Owner)
- {
- QItem *Jnk= *I;
- *I = (*I)->Next;
- Owner->QueueCounter--;
- delete Jnk;
- Res = true;
- }
- else
- I = &(*I)->Next;
- }
-
- return Res;
- }
- /*}}}*/
- // Queue::Startup - Start the worker processes /*{{{*/
- // ---------------------------------------------------------------------
- /* It is possible for this to be called with a pre-existing set of
- workers. */
- bool pkgAcquire::Queue::Startup()
- {
- if (Workers == 0)
- {
- URI U(Name);
- pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
- if (Cnf == 0)
- return false;
-
- Workers = new Worker(this,Cnf,Owner->Log);
- Owner->Add(Workers);
- if (Workers->Start() == false)
- return false;
-
- /* When pipelining we commit 10 items. This needs to change when we
- added other source retry to have cycle maintain a pipeline depth
- on its own. */
- if (Cnf->Pipeline == true)
- MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
- else
- MaxPipeDepth = 1;
- }
-
- return Cycle();
- }
- /*}}}*/
- // Queue::Shutdown - Shutdown the worker processes /*{{{*/
- // ---------------------------------------------------------------------
- /* If final is true then all workers are eliminated, otherwise only workers
- that do not need cleanup are removed */
- bool pkgAcquire::Queue::Shutdown(bool Final)
- {
- // Delete all of the workers
- pkgAcquire::Worker **Cur = &Workers;
- while (*Cur != 0)
- {
- pkgAcquire::Worker *Jnk = *Cur;
- if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
- {
- *Cur = Jnk->NextQueue;
- Owner->Remove(Jnk);
- delete Jnk;
- }
- else
- Cur = &(*Cur)->NextQueue;
- }
-
- return true;
- }
- /*}}}*/
- // Queue::FindItem - Find a URI in the item list /*{{{*/
- // ---------------------------------------------------------------------
- /* */
- pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
- {
- for (QItem *I = Items; I != 0; I = I->Next)
- if (I->URI == URI && I->Worker == Owner)
- return I;
- return 0;
- }
- /*}}}*/
- // Queue::ItemDone - Item has been completed /*{{{*/
- // ---------------------------------------------------------------------
- /* The worker signals this which causes the item to be removed from the
- queue. If this is the last queue instance then it is removed from the
- main queue too.*/
- bool pkgAcquire::Queue::ItemDone(QItem *Itm)
- {
- PipeDepth--;
- if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
- Itm->Owner->Status = pkgAcquire::Item::StatDone;
-
- if (Itm->Owner->QueueCounter <= 1)
- Owner->Dequeue(Itm->Owner);
- else
- {
- Dequeue(Itm->Owner);
- Owner->Bump();
- }
-
- return Cycle();
- }
- /*}}}*/
- // Queue::Cycle - Queue new items into the method /*{{{*/
- // ---------------------------------------------------------------------
- /* This locates a new idle item and sends it to the worker. If pipelining
- is enabled then it keeps the pipe full. */
- bool pkgAcquire::Queue::Cycle()
- {
- if (Items == 0 || Workers == 0)
- return true;
- if (PipeDepth < 0)
- return _error->Error("Pipedepth failure");
-
- // Look for a queable item
- QItem *I = Items;
- while (PipeDepth < (signed)MaxPipeDepth)
- {
- for (; I != 0; I = I->Next)
- if (I->Owner->Status == pkgAcquire::Item::StatIdle)
- break;
-
- // Nothing to do, queue is idle.
- if (I == 0)
- return true;
-
- I->Worker = Workers;
- I->Owner->Status = pkgAcquire::Item::StatFetching;
- PipeDepth++;
- if (Workers->QueueItem(I) == false)
- return false;
- }
-
- return true;
- }
- /*}}}*/
- // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
- // ---------------------------------------------------------------------
- /* This is called when an item in multiple queues is dequeued */
- void pkgAcquire::Queue::Bump()
- {
- Cycle();
- }
- /*}}}*/
- // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
- // ---------------------------------------------------------------------
- /* */
- pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
- {
- Start();
- }
- /*}}}*/
- // AcquireStatus::Pulse - Called periodically /*{{{*/
- // ---------------------------------------------------------------------
- /* This computes some internal state variables for the derived classes to
- use. It generates the current downloaded bytes and total bytes to download
- as well as the current CPS estimate. */
- bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
- {
- TotalBytes = 0;
- CurrentBytes = 0;
- TotalItems = 0;
- CurrentItems = 0;
-
- // Compute the total number of bytes to fetch
- unsigned int Unknown = 0;
- unsigned int Count = 0;
- for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
- ++I, ++Count)
- {
- TotalItems++;
- if ((*I)->Status == pkgAcquire::Item::StatDone)
- ++CurrentItems;
-
- // Totally ignore local items
- if ((*I)->Local == true)
- continue;
- TotalBytes += (*I)->FileSize;
- if ((*I)->Complete == true)
- CurrentBytes += (*I)->FileSize;
- if ((*I)->FileSize == 0 && (*I)->Complete == false)
- ++Unknown;
- }
-
- // Compute the current completion
- unsigned long long ResumeSize = 0;
- for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
- I = Owner->WorkerStep(I))
- if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
- {
- CurrentBytes += I->CurrentSize;
- ResumeSize += I->ResumePoint;
-
- // Files with unknown size always have 100% completion
- if (I->CurrentItem->Owner->FileSize == 0 &&
- I->CurrentItem->Owner->Complete == false)
- TotalBytes += I->CurrentSize;
- }
-
- // Normalize the figures and account for unknown size downloads
- if (TotalBytes <= 0)
- TotalBytes = 1;
- if (Unknown == Count)
- TotalBytes = Unknown;
- // Wha?! Is not supposed to happen.
- if (CurrentBytes > TotalBytes)
- CurrentBytes = TotalBytes;
-
- // Compute the CPS
- struct timeval NewTime;
- gettimeofday(&NewTime,0);
- if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
- NewTime.tv_sec - Time.tv_sec > 6)
- {
- double Delta = NewTime.tv_sec - Time.tv_sec +
- (NewTime.tv_usec - Time.tv_usec)/1000000.0;
-
- // Compute the CPS value
- if (Delta < 0.01)
- CurrentCPS = 0;
- else
- CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
- LastBytes = CurrentBytes - ResumeSize;
- ElapsedTime = (unsigned long long)Delta;
- Time = NewTime;
- }
- int fd = _config->FindI("APT::Status-Fd",-1);
- if(fd > 0)
- {
- ostringstream status;
- char msg[200];
- long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
- unsigned long long ETA = 0;
- if(CurrentCPS > 0)
- ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
- // only show the ETA if it makes sense
- if (ETA > 0 && ETA < 172800 /* two days */ )
- snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
- else
- snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
-
- // build the status str
- status << "dlstatus:" << i
- << ":" << (CurrentBytes/float(TotalBytes)*100.0)
- << ":" << msg
- << endl;
- std::string const dlstatus = status.str();
- FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
- }
- return true;
- }
- /*}}}*/
- // AcquireStatus::Start - Called when the download is started /*{{{*/
- // ---------------------------------------------------------------------
- /* We just reset the counters */
- void pkgAcquireStatus::Start()
- {
- gettimeofday(&Time,0);
- gettimeofday(&StartTime,0);
- LastBytes = 0;
- CurrentCPS = 0;
- CurrentBytes = 0;
- TotalBytes = 0;
- FetchedBytes = 0;
- ElapsedTime = 0;
- TotalItems = 0;
- CurrentItems = 0;
- }
- /*}}}*/
- // AcquireStatus::Stop - Finished downloading /*{{{*/
- // ---------------------------------------------------------------------
- /* This accurately computes the elapsed time and the total overall CPS. */
- void pkgAcquireStatus::Stop()
- {
- // Compute the CPS and elapsed time
- struct timeval NewTime;
- gettimeofday(&NewTime,0);
-
- double Delta = NewTime.tv_sec - StartTime.tv_sec +
- (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
-
- // Compute the CPS value
- if (Delta < 0.01)
- CurrentCPS = 0;
- else
- CurrentCPS = FetchedBytes/Delta;
- LastBytes = CurrentBytes;
- ElapsedTime = (unsigned long long)Delta;
- }
- /*}}}*/
- // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
- // ---------------------------------------------------------------------
- /* This is used to get accurate final transfer rate reporting. */
- void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
- {
- FetchedBytes += Size - Resume;
- }
- /*}}}*/
|