merged lp:~mvo/apt/mvo
[ntk/apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <iostream>
27 #include <sstream>
28 #include <stdio.h>
29
30 #include <dirent.h>
31 #include <sys/time.h>
32 #include <errno.h>
33
34 #include <apti18n.h>
35 /*}}}*/
36
37 using namespace std;
38
39 // Acquire::pkgAcquire - Constructor /*{{{*/
40 // ---------------------------------------------------------------------
41 /* We grab some runtime state from the configuration space */
42 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
43 Debug(_config->FindB("Debug::pkgAcquire",false)),
44 Running(false)
45 {
46 string const Mode = _config->Find("Acquire::Queue-Mode","host");
47 if (strcasecmp(Mode.c_str(),"host") == 0)
48 QueueMode = QueueHost;
49 if (strcasecmp(Mode.c_str(),"access") == 0)
50 QueueMode = QueueAccess;
51 }
52 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
53 Configs(0), Log(Progress), ToFetch(0),
54 Debug(_config->FindB("Debug::pkgAcquire",false)),
55 Running(false)
56 {
57 string const Mode = _config->Find("Acquire::Queue-Mode","host");
58 if (strcasecmp(Mode.c_str(),"host") == 0)
59 QueueMode = QueueHost;
60 if (strcasecmp(Mode.c_str(),"access") == 0)
61 QueueMode = QueueAccess;
62 Setup(Progress, "");
63 }
64 /*}}}*/
65 // Acquire::Setup - Delayed Constructor /*{{{*/
66 // ---------------------------------------------------------------------
67 /* Do everything needed to be a complete Acquire object and report the
68 success (or failure) back so the user knows that something is wrong… */
69 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
70 {
71 Log = Progress;
72
73 // check for existence and possibly create auxiliary directories
74 string const listDir = _config->FindDir("Dir::State::lists");
75 string const partialListDir = listDir + "partial/";
76 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
77 string const partialArchivesDir = archivesDir + "partial/";
78
79 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
80 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
81 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
82
83 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
84 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
85 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
86
87 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
88 return true;
89
90 // Lock the directory this acquire object will work in
91 LockFD = GetLock(flCombine(Lock, "lock"));
92 if (LockFD == -1)
93 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
94
95 return true;
96 }
97 /*}}}*/
98 // Acquire::~pkgAcquire - Destructor /*{{{*/
99 // ---------------------------------------------------------------------
100 /* Free our memory, clean up the queues (destroy the workers) */
101 pkgAcquire::~pkgAcquire()
102 {
103 Shutdown();
104
105 if (LockFD != -1)
106 close(LockFD);
107
108 while (Configs != 0)
109 {
110 MethodConfig *Jnk = Configs;
111 Configs = Configs->Next;
112 delete Jnk;
113 }
114 }
115 /*}}}*/
116 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
117 // ---------------------------------------------------------------------
118 /* */
119 void pkgAcquire::Shutdown()
120 {
121 while (Items.empty() == false)
122 {
123 if (Items[0]->Status == Item::StatFetching)
124 Items[0]->Status = Item::StatError;
125 delete Items[0];
126 }
127
128 while (Queues != 0)
129 {
130 Queue *Jnk = Queues;
131 Queues = Queues->Next;
132 delete Jnk;
133 }
134 }
135 /*}}}*/
136 // Acquire::Add - Add a new item /*{{{*/
137 // ---------------------------------------------------------------------
138 /* This puts an item on the acquire list. This list is mainly for tracking
139 item status */
140 void pkgAcquire::Add(Item *Itm)
141 {
142 Items.push_back(Itm);
143 }
144 /*}}}*/
145 // Acquire::Remove - Remove a item /*{{{*/
146 // ---------------------------------------------------------------------
147 /* Remove an item from the acquire list. This is usually not used.. */
148 void pkgAcquire::Remove(Item *Itm)
149 {
150 Dequeue(Itm);
151
152 for (ItemIterator I = Items.begin(); I != Items.end();)
153 {
154 if (*I == Itm)
155 {
156 Items.erase(I);
157 I = Items.begin();
158 }
159 else
160 ++I;
161 }
162 }
163 /*}}}*/
164 // Acquire::Add - Add a worker /*{{{*/
165 // ---------------------------------------------------------------------
166 /* A list of workers is kept so that the select loop can direct their FD
167 usage. */
168 void pkgAcquire::Add(Worker *Work)
169 {
170 Work->NextAcquire = Workers;
171 Workers = Work;
172 }
173 /*}}}*/
174 // Acquire::Remove - Remove a worker /*{{{*/
175 // ---------------------------------------------------------------------
176 /* A worker has died. This can not be done while the select loop is running
177 as it would require that RunFds could handling a changing list state and
178 it cant.. */
179 void pkgAcquire::Remove(Worker *Work)
180 {
181 if (Running == true)
182 abort();
183
184 Worker **I = &Workers;
185 for (; *I != 0;)
186 {
187 if (*I == Work)
188 *I = (*I)->NextAcquire;
189 else
190 I = &(*I)->NextAcquire;
191 }
192 }
193 /*}}}*/
194 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
195 // ---------------------------------------------------------------------
196 /* This is the entry point for an item. An item calls this function when
197 it is constructed which creates a queue (based on the current queue
198 mode) and puts the item in that queue. If the system is running then
199 the queue might be started. */
200 void pkgAcquire::Enqueue(ItemDesc &Item)
201 {
202 // Determine which queue to put the item in
203 const MethodConfig *Config;
204 string Name = QueueName(Item.URI,Config);
205 if (Name.empty() == true)
206 return;
207
208 // Find the queue structure
209 Queue *I = Queues;
210 for (; I != 0 && I->Name != Name; I = I->Next);
211 if (I == 0)
212 {
213 I = new Queue(Name,this);
214 I->Next = Queues;
215 Queues = I;
216
217 if (Running == true)
218 I->Startup();
219 }
220
221 // See if this is a local only URI
222 if (Config->LocalOnly == true && Item.Owner->Complete == false)
223 Item.Owner->Local = true;
224 Item.Owner->Status = Item::StatIdle;
225
226 // Queue it into the named queue
227 if(I->Enqueue(Item))
228 ToFetch++;
229
230 // Some trace stuff
231 if (Debug == true)
232 {
233 clog << "Fetching " << Item.URI << endl;
234 clog << " to " << Item.Owner->DestFile << endl;
235 clog << " Queue is: " << Name << endl;
236 }
237 }
238 /*}}}*/
239 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
240 // ---------------------------------------------------------------------
241 /* This is called when an item is finished being fetched. It removes it
242 from all the queues */
243 void pkgAcquire::Dequeue(Item *Itm)
244 {
245 Queue *I = Queues;
246 bool Res = false;
247 if (Debug == true)
248 clog << "Dequeuing " << Itm->DestFile << endl;
249
250 for (; I != 0; I = I->Next)
251 {
252 if (I->Dequeue(Itm))
253 {
254 Res = true;
255 if (Debug == true)
256 clog << "Dequeued from " << I->Name << endl;
257 }
258 }
259
260 if (Res == true)
261 ToFetch--;
262 }
263 /*}}}*/
264 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
265 // ---------------------------------------------------------------------
266 /* The string returned depends on the configuration settings and the
267 method parameters. Given something like http://foo.org/bar it can
268 return http://foo.org or http */
269 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
270 {
271 URI U(Uri);
272
273 Config = GetConfig(U.Access);
274 if (Config == 0)
275 return string();
276
277 /* Single-Instance methods get exactly one queue per URI. This is
278 also used for the Access queue method */
279 if (Config->SingleInstance == true || QueueMode == QueueAccess)
280 return U.Access;
281
282 string AccessSchema = U.Access + ':',
283 FullQueueName = AccessSchema + U.Host;
284 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
285
286 Queue *I = Queues;
287 for (; I != 0; I = I->Next) {
288 // if the queue already exists, re-use it
289 if (I->Name == FullQueueName)
290 return FullQueueName;
291
292 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
293 Instances++;
294 }
295
296 if (Debug) {
297 clog << "Found " << Instances << " instances of " << U.Access << endl;
298 }
299
300 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
301 return U.Access;
302
303 return FullQueueName;
304 }
305 /*}}}*/
306 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
307 // ---------------------------------------------------------------------
308 /* This locates the configuration structure for an access method. If
309 a config structure cannot be found a Worker will be created to
310 retrieve it */
311 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
312 {
313 // Search for an existing config
314 MethodConfig *Conf;
315 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
316 if (Conf->Access == Access)
317 return Conf;
318
319 // Create the new config class
320 Conf = new MethodConfig;
321 Conf->Access = Access;
322 Conf->Next = Configs;
323 Configs = Conf;
324
325 // Create the worker to fetch the configuration
326 Worker Work(Conf);
327 if (Work.Start() == false)
328 return 0;
329
330 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
331 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
332 Conf->SingleInstance = true;
333
334 return Conf;
335 }
336 /*}}}*/
337 // Acquire::SetFds - Deal with readable FDs /*{{{*/
338 // ---------------------------------------------------------------------
339 /* Collect FDs that have activity monitors into the fd sets */
340 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
341 {
342 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
343 {
344 if (I->InReady == true && I->InFd >= 0)
345 {
346 if (Fd < I->InFd)
347 Fd = I->InFd;
348 FD_SET(I->InFd,RSet);
349 }
350 if (I->OutReady == true && I->OutFd >= 0)
351 {
352 if (Fd < I->OutFd)
353 Fd = I->OutFd;
354 FD_SET(I->OutFd,WSet);
355 }
356 }
357 }
358 /*}}}*/
359 // Acquire::RunFds - Deal with active FDs /*{{{*/
360 // ---------------------------------------------------------------------
361 /* Dispatch active FDs over to the proper workers. It is very important
362 that a worker never be erased while this is running! The queue class
363 should never erase a worker except during shutdown processing. */
364 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
365 {
366 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
367 {
368 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
369 I->InFdReady();
370 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
371 I->OutFdReady();
372 }
373 }
374 /*}}}*/
375 // Acquire::Run - Run the fetch sequence /*{{{*/
376 // ---------------------------------------------------------------------
377 /* This runs the queues. It manages a select loop for all of the
378 Worker tasks. The workers interact with the queues and items to
379 manage the actual fetch. */
380 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
381 {
382 Running = true;
383
384 for (Queue *I = Queues; I != 0; I = I->Next)
385 I->Startup();
386
387 if (Log != 0)
388 Log->Start();
389
390 bool WasCancelled = false;
391
392 // Run till all things have been acquired
393 struct timeval tv;
394 tv.tv_sec = 0;
395 tv.tv_usec = PulseIntervall;
396 while (ToFetch > 0)
397 {
398 fd_set RFds;
399 fd_set WFds;
400 int Highest = 0;
401 FD_ZERO(&RFds);
402 FD_ZERO(&WFds);
403 SetFds(Highest,&RFds,&WFds);
404
405 int Res;
406 do
407 {
408 Res = select(Highest+1,&RFds,&WFds,0,&tv);
409 }
410 while (Res < 0 && errno == EINTR);
411
412 if (Res < 0)
413 {
414 _error->Errno("select","Select has failed");
415 break;
416 }
417
418 RunFds(&RFds,&WFds);
419 if (_error->PendingError() == true)
420 break;
421
422 // Timeout, notify the log class
423 if (Res == 0 || (Log != 0 && Log->Update == true))
424 {
425 tv.tv_usec = PulseIntervall;
426 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
427 I->Pulse();
428 if (Log != 0 && Log->Pulse(this) == false)
429 {
430 WasCancelled = true;
431 break;
432 }
433 }
434 }
435
436 if (Log != 0)
437 Log->Stop();
438
439 // Shut down the acquire bits
440 Running = false;
441 for (Queue *I = Queues; I != 0; I = I->Next)
442 I->Shutdown(false);
443
444 // Shut down the items
445 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
446 (*I)->Finished();
447
448 if (_error->PendingError())
449 return Failed;
450 if (WasCancelled)
451 return Cancelled;
452 return Continue;
453 }
454 /*}}}*/
455 // Acquire::Bump - Called when an item is dequeued /*{{{*/
456 // ---------------------------------------------------------------------
457 /* This routine bumps idle queues in hopes that they will be able to fetch
458 the dequeued item */
459 void pkgAcquire::Bump()
460 {
461 for (Queue *I = Queues; I != 0; I = I->Next)
462 I->Bump();
463 }
464 /*}}}*/
465 // Acquire::WorkerStep - Step to the next worker /*{{{*/
466 // ---------------------------------------------------------------------
467 /* Not inlined to advoid including acquire-worker.h */
468 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
469 {
470 return I->NextAcquire;
471 };
472 /*}}}*/
473 // Acquire::Clean - Cleans a directory /*{{{*/
474 // ---------------------------------------------------------------------
475 /* This is a bit simplistic, it looks at every file in the dir and sees
476 if it is part of the download set. */
477 bool pkgAcquire::Clean(string Dir)
478 {
479 // non-existing directories are by definition clean…
480 if (DirectoryExists(Dir) == false)
481 return true;
482
483 DIR *D = opendir(Dir.c_str());
484 if (D == 0)
485 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
486
487 string StartDir = SafeGetCWD();
488 if (chdir(Dir.c_str()) != 0)
489 {
490 closedir(D);
491 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
492 }
493
494 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
495 {
496 // Skip some files..
497 if (strcmp(Dir->d_name,"lock") == 0 ||
498 strcmp(Dir->d_name,"partial") == 0 ||
499 strcmp(Dir->d_name,".") == 0 ||
500 strcmp(Dir->d_name,"..") == 0)
501 continue;
502
503 // Look in the get list
504 ItemCIterator I = Items.begin();
505 for (; I != Items.end(); ++I)
506 if (flNotDir((*I)->DestFile) == Dir->d_name)
507 break;
508
509 // Nothing found, nuke it
510 if (I == Items.end())
511 unlink(Dir->d_name);
512 };
513
514 closedir(D);
515 if (chdir(StartDir.c_str()) != 0)
516 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
517 return true;
518 }
519 /*}}}*/
520 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
521 // ---------------------------------------------------------------------
522 /* This is the total number of bytes needed */
523 unsigned long long pkgAcquire::TotalNeeded()
524 {
525 unsigned long long Total = 0;
526 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
527 Total += (*I)->FileSize;
528 return Total;
529 }
530 /*}}}*/
531 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
532 // ---------------------------------------------------------------------
533 /* This is the number of bytes that is not local */
534 unsigned long long pkgAcquire::FetchNeeded()
535 {
536 unsigned long long Total = 0;
537 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
538 if ((*I)->Local == false)
539 Total += (*I)->FileSize;
540 return Total;
541 }
542 /*}}}*/
543 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
544 // ---------------------------------------------------------------------
545 /* This is the number of bytes that is not local */
546 unsigned long long pkgAcquire::PartialPresent()
547 {
548 unsigned long long Total = 0;
549 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
550 if ((*I)->Local == false)
551 Total += (*I)->PartialSize;
552 return Total;
553 }
554 /*}}}*/
555 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
556 // ---------------------------------------------------------------------
557 /* */
558 pkgAcquire::UriIterator pkgAcquire::UriBegin()
559 {
560 return UriIterator(Queues);
561 }
562 /*}}}*/
563 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
564 // ---------------------------------------------------------------------
565 /* */
566 pkgAcquire::UriIterator pkgAcquire::UriEnd()
567 {
568 return UriIterator(0);
569 }
570 /*}}}*/
571 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
572 // ---------------------------------------------------------------------
573 /* */
574 pkgAcquire::MethodConfig::MethodConfig()
575 {
576 SingleInstance = false;
577 Pipeline = false;
578 SendConfig = false;
579 LocalOnly = false;
580 Removable = false;
581 Next = 0;
582 }
583 /*}}}*/
584 // Queue::Queue - Constructor /*{{{*/
585 // ---------------------------------------------------------------------
586 /* */
587 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
588 Owner(Owner)
589 {
590 Items = 0;
591 Next = 0;
592 Workers = 0;
593 MaxPipeDepth = 1;
594 PipeDepth = 0;
595 }
596 /*}}}*/
597 // Queue::~Queue - Destructor /*{{{*/
598 // ---------------------------------------------------------------------
599 /* */
600 pkgAcquire::Queue::~Queue()
601 {
602 Shutdown(true);
603
604 while (Items != 0)
605 {
606 QItem *Jnk = Items;
607 Items = Items->Next;
608 delete Jnk;
609 }
610 }
611 /*}}}*/
612 // Queue::Enqueue - Queue an item to the queue /*{{{*/
613 // ---------------------------------------------------------------------
614 /* */
615 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
616 {
617 QItem **I = &Items;
618 // move to the end of the queue and check for duplicates here
619 for (; *I != 0; I = &(*I)->Next)
620 if (Item.URI == (*I)->URI)
621 {
622 Item.Owner->Status = Item::StatDone;
623 return false;
624 }
625
626 // Create a new item
627 QItem *Itm = new QItem;
628 *Itm = Item;
629 Itm->Next = 0;
630 *I = Itm;
631
632 Item.Owner->QueueCounter++;
633 if (Items->Next == 0)
634 Cycle();
635 return true;
636 }
637 /*}}}*/
638 // Queue::Dequeue - Remove an item from the queue /*{{{*/
639 // ---------------------------------------------------------------------
640 /* We return true if we hit something */
641 bool pkgAcquire::Queue::Dequeue(Item *Owner)
642 {
643 if (Owner->Status == pkgAcquire::Item::StatFetching)
644 return _error->Error("Tried to dequeue a fetching object");
645
646 bool Res = false;
647
648 QItem **I = &Items;
649 for (; *I != 0;)
650 {
651 if ((*I)->Owner == Owner)
652 {
653 QItem *Jnk= *I;
654 *I = (*I)->Next;
655 Owner->QueueCounter--;
656 delete Jnk;
657 Res = true;
658 }
659 else
660 I = &(*I)->Next;
661 }
662
663 return Res;
664 }
665 /*}}}*/
666 // Queue::Startup - Start the worker processes /*{{{*/
667 // ---------------------------------------------------------------------
668 /* It is possible for this to be called with a pre-existing set of
669 workers. */
670 bool pkgAcquire::Queue::Startup()
671 {
672 if (Workers == 0)
673 {
674 URI U(Name);
675 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
676 if (Cnf == 0)
677 return false;
678
679 Workers = new Worker(this,Cnf,Owner->Log);
680 Owner->Add(Workers);
681 if (Workers->Start() == false)
682 return false;
683
684 /* When pipelining we commit 10 items. This needs to change when we
685 added other source retry to have cycle maintain a pipeline depth
686 on its own. */
687 if (Cnf->Pipeline == true)
688 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
689 else
690 MaxPipeDepth = 1;
691 }
692
693 return Cycle();
694 }
695 /*}}}*/
696 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
697 // ---------------------------------------------------------------------
698 /* If final is true then all workers are eliminated, otherwise only workers
699 that do not need cleanup are removed */
700 bool pkgAcquire::Queue::Shutdown(bool Final)
701 {
702 // Delete all of the workers
703 pkgAcquire::Worker **Cur = &Workers;
704 while (*Cur != 0)
705 {
706 pkgAcquire::Worker *Jnk = *Cur;
707 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
708 {
709 *Cur = Jnk->NextQueue;
710 Owner->Remove(Jnk);
711 delete Jnk;
712 }
713 else
714 Cur = &(*Cur)->NextQueue;
715 }
716
717 return true;
718 }
719 /*}}}*/
720 // Queue::FindItem - Find a URI in the item list /*{{{*/
721 // ---------------------------------------------------------------------
722 /* */
723 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
724 {
725 for (QItem *I = Items; I != 0; I = I->Next)
726 if (I->URI == URI && I->Worker == Owner)
727 return I;
728 return 0;
729 }
730 /*}}}*/
731 // Queue::ItemDone - Item has been completed /*{{{*/
732 // ---------------------------------------------------------------------
733 /* The worker signals this which causes the item to be removed from the
734 queue. If this is the last queue instance then it is removed from the
735 main queue too.*/
736 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
737 {
738 PipeDepth--;
739 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
740 Itm->Owner->Status = pkgAcquire::Item::StatDone;
741
742 if (Itm->Owner->QueueCounter <= 1)
743 Owner->Dequeue(Itm->Owner);
744 else
745 {
746 Dequeue(Itm->Owner);
747 Owner->Bump();
748 }
749
750 return Cycle();
751 }
752 /*}}}*/
753 // Queue::Cycle - Queue new items into the method /*{{{*/
754 // ---------------------------------------------------------------------
755 /* This locates a new idle item and sends it to the worker. If pipelining
756 is enabled then it keeps the pipe full. */
757 bool pkgAcquire::Queue::Cycle()
758 {
759 if (Items == 0 || Workers == 0)
760 return true;
761
762 if (PipeDepth < 0)
763 return _error->Error("Pipedepth failure");
764
765 // Look for a queable item
766 QItem *I = Items;
767 while (PipeDepth < (signed)MaxPipeDepth)
768 {
769 for (; I != 0; I = I->Next)
770 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
771 break;
772
773 // Nothing to do, queue is idle.
774 if (I == 0)
775 return true;
776
777 I->Worker = Workers;
778 I->Owner->Status = pkgAcquire::Item::StatFetching;
779 PipeDepth++;
780 if (Workers->QueueItem(I) == false)
781 return false;
782 }
783
784 return true;
785 }
786 /*}}}*/
787 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
788 // ---------------------------------------------------------------------
789 /* This is called when an item in multiple queues is dequeued */
790 void pkgAcquire::Queue::Bump()
791 {
792 Cycle();
793 }
794 /*}}}*/
795 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
796 // ---------------------------------------------------------------------
797 /* */
798 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
799 {
800 Start();
801 }
802 /*}}}*/
803 // AcquireStatus::Pulse - Called periodically /*{{{*/
804 // ---------------------------------------------------------------------
805 /* This computes some internal state variables for the derived classes to
806 use. It generates the current downloaded bytes and total bytes to download
807 as well as the current CPS estimate. */
808 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
809 {
810 TotalBytes = 0;
811 CurrentBytes = 0;
812 TotalItems = 0;
813 CurrentItems = 0;
814
815 // Compute the total number of bytes to fetch
816 unsigned int Unknown = 0;
817 unsigned int Count = 0;
818 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
819 ++I, ++Count)
820 {
821 TotalItems++;
822 if ((*I)->Status == pkgAcquire::Item::StatDone)
823 ++CurrentItems;
824
825 // Totally ignore local items
826 if ((*I)->Local == true)
827 continue;
828
829 TotalBytes += (*I)->FileSize;
830 if ((*I)->Complete == true)
831 CurrentBytes += (*I)->FileSize;
832 if ((*I)->FileSize == 0 && (*I)->Complete == false)
833 ++Unknown;
834 }
835
836 // Compute the current completion
837 unsigned long long ResumeSize = 0;
838 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
839 I = Owner->WorkerStep(I))
840 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
841 {
842 CurrentBytes += I->CurrentSize;
843 ResumeSize += I->ResumePoint;
844
845 // Files with unknown size always have 100% completion
846 if (I->CurrentItem->Owner->FileSize == 0 &&
847 I->CurrentItem->Owner->Complete == false)
848 TotalBytes += I->CurrentSize;
849 }
850
851 // Normalize the figures and account for unknown size downloads
852 if (TotalBytes <= 0)
853 TotalBytes = 1;
854 if (Unknown == Count)
855 TotalBytes = Unknown;
856
857 // Wha?! Is not supposed to happen.
858 if (CurrentBytes > TotalBytes)
859 CurrentBytes = TotalBytes;
860
861 // Compute the CPS
862 struct timeval NewTime;
863 gettimeofday(&NewTime,0);
864 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
865 NewTime.tv_sec - Time.tv_sec > 6)
866 {
867 double Delta = NewTime.tv_sec - Time.tv_sec +
868 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
869
870 // Compute the CPS value
871 if (Delta < 0.01)
872 CurrentCPS = 0;
873 else
874 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
875 LastBytes = CurrentBytes - ResumeSize;
876 ElapsedTime = (unsigned long long)Delta;
877 Time = NewTime;
878 }
879
880 int fd = _config->FindI("APT::Status-Fd",-1);
881 if(fd > 0)
882 {
883 ostringstream status;
884
885 char msg[200];
886 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
887 unsigned long long ETA = 0;
888 if(CurrentCPS > 0)
889 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
890
891 // only show the ETA if it makes sense
892 if (ETA > 0 && ETA < 172800 /* two days */ )
893 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
894 else
895 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
896
897
898
899 // build the status str
900 status << "dlstatus:" << i
901 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
902 << ":" << msg
903 << endl;
904
905 std::string const dlstatus = status.str();
906 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
907 }
908
909 return true;
910 }
911 /*}}}*/
912 // AcquireStatus::Start - Called when the download is started /*{{{*/
913 // ---------------------------------------------------------------------
914 /* We just reset the counters */
915 void pkgAcquireStatus::Start()
916 {
917 gettimeofday(&Time,0);
918 gettimeofday(&StartTime,0);
919 LastBytes = 0;
920 CurrentCPS = 0;
921 CurrentBytes = 0;
922 TotalBytes = 0;
923 FetchedBytes = 0;
924 ElapsedTime = 0;
925 TotalItems = 0;
926 CurrentItems = 0;
927 }
928 /*}}}*/
929 // AcquireStatus::Stop - Finished downloading /*{{{*/
930 // ---------------------------------------------------------------------
931 /* This accurately computes the elapsed time and the total overall CPS. */
932 void pkgAcquireStatus::Stop()
933 {
934 // Compute the CPS and elapsed time
935 struct timeval NewTime;
936 gettimeofday(&NewTime,0);
937
938 double Delta = NewTime.tv_sec - StartTime.tv_sec +
939 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
940
941 // Compute the CPS value
942 if (Delta < 0.01)
943 CurrentCPS = 0;
944 else
945 CurrentCPS = FetchedBytes/Delta;
946 LastBytes = CurrentBytes;
947 ElapsedTime = (unsigned long long)Delta;
948 }
949 /*}}}*/
950 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
951 // ---------------------------------------------------------------------
952 /* This is used to get accurate final transfer rate reporting. */
953 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
954 {
955 FetchedBytes += Size - Resume;
956 }
957 /*}}}*/