- backport forgotten Valid-Until patch from the obsolete experimental
[ntk/apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <apt-pkg/acquire.h>
17 #include <apt-pkg/acquire-item.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/configuration.h>
20 #include <apt-pkg/error.h>
21 #include <apt-pkg/strutl.h>
22 #include <apt-pkg/fileutl.h>
23
24 #include <apti18n.h>
25
26 #include <iostream>
27 #include <sstream>
28 #include <stdio.h>
29
30 #include <dirent.h>
31 #include <sys/time.h>
32 #include <errno.h>
33 /*}}}*/
34
35 using namespace std;
36
37 // Acquire::pkgAcquire - Constructor /*{{{*/
38 // ---------------------------------------------------------------------
39 /* We grab some runtime state from the configuration space */
40 pkgAcquire::pkgAcquire() : Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
41 Debug(_config->FindB("Debug::pkgAcquire",false)),
42 Running(false), LockFD(-1)
43 {
44 string const Mode = _config->Find("Acquire::Queue-Mode","host");
45 if (strcasecmp(Mode.c_str(),"host") == 0)
46 QueueMode = QueueHost;
47 if (strcasecmp(Mode.c_str(),"access") == 0)
48 QueueMode = QueueAccess;
49 }
50 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : Queues(0), Workers(0),
51 Configs(0), Log(Progress), ToFetch(0),
52 Debug(_config->FindB("Debug::pkgAcquire",false)),
53 Running(false), LockFD(-1)
54 {
55 string const Mode = _config->Find("Acquire::Queue-Mode","host");
56 if (strcasecmp(Mode.c_str(),"host") == 0)
57 QueueMode = QueueHost;
58 if (strcasecmp(Mode.c_str(),"access") == 0)
59 QueueMode = QueueAccess;
60 Setup(Progress, "");
61 }
62 /*}}}*/
63 // Acquire::Setup - Delayed Constructor /*{{{*/
64 // ---------------------------------------------------------------------
65 /* Do everything needed to be a complete Acquire object and report the
66 success (or failure) back so the user knows that something is wrong… */
67 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
68 {
69 Log = Progress;
70
71 // check for existence and possibly create auxiliary directories
72 string const listDir = _config->FindDir("Dir::State::lists");
73 string const partialListDir = listDir + "partial/";
74 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
75 string const partialArchivesDir = archivesDir + "partial/";
76
77 if (CheckDirectory(_config->FindDir("Dir::State"), partialListDir) == false &&
78 CheckDirectory(listDir, partialListDir) == false)
79 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
80
81 if (CheckDirectory(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
82 CheckDirectory(archivesDir, partialArchivesDir) == false)
83 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
84
85 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
86 return true;
87
88 // Lock the directory this acquire object will work in
89 LockFD = GetLock(flCombine(Lock, "lock"));
90 if (LockFD == -1)
91 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
92
93 return true;
94 }
95 /*}}}*/
96 // Acquire::CheckDirectory - ensure that the given directory exists /*{{{*/
97 // ---------------------------------------------------------------------
98 /* a small wrapper around CreateDirectory to check if it exists and to
99 remove the trailing "/apt/" from the parent directory if needed */
100 bool pkgAcquire::CheckDirectory(string const &Parent, string const &Path) const
101 {
102 if (DirectoryExists(Path) == true)
103 return true;
104
105 size_t const len = Parent.size();
106 if (len > 5 && Parent.find("/apt/", len - 6, 5) == len - 5)
107 {
108 if (CreateDirectory(Parent.substr(0,len-5), Path) == true)
109 return true;
110 }
111 else if (CreateDirectory(Parent, Path) == true)
112 return true;
113
114 return false;
115 }
116 /*}}}*/
117 // Acquire::~pkgAcquire - Destructor /*{{{*/
118 // ---------------------------------------------------------------------
119 /* Free our memory, clean up the queues (destroy the workers) */
120 pkgAcquire::~pkgAcquire()
121 {
122 Shutdown();
123
124 if (LockFD != -1)
125 close(LockFD);
126
127 while (Configs != 0)
128 {
129 MethodConfig *Jnk = Configs;
130 Configs = Configs->Next;
131 delete Jnk;
132 }
133 }
134 /*}}}*/
135 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
136 // ---------------------------------------------------------------------
137 /* */
138 void pkgAcquire::Shutdown()
139 {
140 while (Items.size() != 0)
141 {
142 if (Items[0]->Status == Item::StatFetching)
143 Items[0]->Status = Item::StatError;
144 delete Items[0];
145 }
146
147 while (Queues != 0)
148 {
149 Queue *Jnk = Queues;
150 Queues = Queues->Next;
151 delete Jnk;
152 }
153 }
154 /*}}}*/
155 // Acquire::Add - Add a new item /*{{{*/
156 // ---------------------------------------------------------------------
157 /* This puts an item on the acquire list. This list is mainly for tracking
158 item status */
159 void pkgAcquire::Add(Item *Itm)
160 {
161 Items.push_back(Itm);
162 }
163 /*}}}*/
164 // Acquire::Remove - Remove a item /*{{{*/
165 // ---------------------------------------------------------------------
166 /* Remove an item from the acquire list. This is usually not used.. */
167 void pkgAcquire::Remove(Item *Itm)
168 {
169 Dequeue(Itm);
170
171 for (ItemIterator I = Items.begin(); I != Items.end();)
172 {
173 if (*I == Itm)
174 {
175 Items.erase(I);
176 I = Items.begin();
177 }
178 else
179 I++;
180 }
181 }
182 /*}}}*/
183 // Acquire::Add - Add a worker /*{{{*/
184 // ---------------------------------------------------------------------
185 /* A list of workers is kept so that the select loop can direct their FD
186 usage. */
187 void pkgAcquire::Add(Worker *Work)
188 {
189 Work->NextAcquire = Workers;
190 Workers = Work;
191 }
192 /*}}}*/
193 // Acquire::Remove - Remove a worker /*{{{*/
194 // ---------------------------------------------------------------------
195 /* A worker has died. This can not be done while the select loop is running
196 as it would require that RunFds could handling a changing list state and
197 it cant.. */
198 void pkgAcquire::Remove(Worker *Work)
199 {
200 if (Running == true)
201 abort();
202
203 Worker **I = &Workers;
204 for (; *I != 0;)
205 {
206 if (*I == Work)
207 *I = (*I)->NextAcquire;
208 else
209 I = &(*I)->NextAcquire;
210 }
211 }
212 /*}}}*/
213 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
214 // ---------------------------------------------------------------------
215 /* This is the entry point for an item. An item calls this function when
216 it is constructed which creates a queue (based on the current queue
217 mode) and puts the item in that queue. If the system is running then
218 the queue might be started. */
219 void pkgAcquire::Enqueue(ItemDesc &Item)
220 {
221 // Determine which queue to put the item in
222 const MethodConfig *Config;
223 string Name = QueueName(Item.URI,Config);
224 if (Name.empty() == true)
225 return;
226
227 // Find the queue structure
228 Queue *I = Queues;
229 for (; I != 0 && I->Name != Name; I = I->Next);
230 if (I == 0)
231 {
232 I = new Queue(Name,this);
233 I->Next = Queues;
234 Queues = I;
235
236 if (Running == true)
237 I->Startup();
238 }
239
240 // See if this is a local only URI
241 if (Config->LocalOnly == true && Item.Owner->Complete == false)
242 Item.Owner->Local = true;
243 Item.Owner->Status = Item::StatIdle;
244
245 // Queue it into the named queue
246 if(I->Enqueue(Item))
247 ToFetch++;
248
249 // Some trace stuff
250 if (Debug == true)
251 {
252 clog << "Fetching " << Item.URI << endl;
253 clog << " to " << Item.Owner->DestFile << endl;
254 clog << " Queue is: " << Name << endl;
255 }
256 }
257 /*}}}*/
258 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
259 // ---------------------------------------------------------------------
260 /* This is called when an item is finished being fetched. It removes it
261 from all the queues */
262 void pkgAcquire::Dequeue(Item *Itm)
263 {
264 Queue *I = Queues;
265 bool Res = false;
266 for (; I != 0; I = I->Next)
267 Res |= I->Dequeue(Itm);
268
269 if (Debug == true)
270 clog << "Dequeuing " << Itm->DestFile << endl;
271 if (Res == true)
272 ToFetch--;
273 }
274 /*}}}*/
275 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
276 // ---------------------------------------------------------------------
277 /* The string returned depends on the configuration settings and the
278 method parameters. Given something like http://foo.org/bar it can
279 return http://foo.org or http */
280 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
281 {
282 URI U(Uri);
283
284 Config = GetConfig(U.Access);
285 if (Config == 0)
286 return string();
287
288 /* Single-Instance methods get exactly one queue per URI. This is
289 also used for the Access queue method */
290 if (Config->SingleInstance == true || QueueMode == QueueAccess)
291 return U.Access;
292
293 return U.Access + ':' + U.Host;
294 }
295 /*}}}*/
296 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
297 // ---------------------------------------------------------------------
298 /* This locates the configuration structure for an access method. If
299 a config structure cannot be found a Worker will be created to
300 retrieve it */
301 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
302 {
303 // Search for an existing config
304 MethodConfig *Conf;
305 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
306 if (Conf->Access == Access)
307 return Conf;
308
309 // Create the new config class
310 Conf = new MethodConfig;
311 Conf->Access = Access;
312 Conf->Next = Configs;
313 Configs = Conf;
314
315 // Create the worker to fetch the configuration
316 Worker Work(Conf);
317 if (Work.Start() == false)
318 return 0;
319
320 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
321 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
322 Conf->SingleInstance = true;
323
324 return Conf;
325 }
326 /*}}}*/
327 // Acquire::SetFds - Deal with readable FDs /*{{{*/
328 // ---------------------------------------------------------------------
329 /* Collect FDs that have activity monitors into the fd sets */
330 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
331 {
332 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
333 {
334 if (I->InReady == true && I->InFd >= 0)
335 {
336 if (Fd < I->InFd)
337 Fd = I->InFd;
338 FD_SET(I->InFd,RSet);
339 }
340 if (I->OutReady == true && I->OutFd >= 0)
341 {
342 if (Fd < I->OutFd)
343 Fd = I->OutFd;
344 FD_SET(I->OutFd,WSet);
345 }
346 }
347 }
348 /*}}}*/
349 // Acquire::RunFds - Deal with active FDs /*{{{*/
350 // ---------------------------------------------------------------------
351 /* Dispatch active FDs over to the proper workers. It is very important
352 that a worker never be erased while this is running! The queue class
353 should never erase a worker except during shutdown processing. */
354 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
355 {
356 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
357 {
358 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
359 I->InFdReady();
360 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
361 I->OutFdReady();
362 }
363 }
364 /*}}}*/
365 // Acquire::Run - Run the fetch sequence /*{{{*/
366 // ---------------------------------------------------------------------
367 /* This runs the queues. It manages a select loop for all of the
368 Worker tasks. The workers interact with the queues and items to
369 manage the actual fetch. */
370 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
371 {
372 Running = true;
373
374 for (Queue *I = Queues; I != 0; I = I->Next)
375 I->Startup();
376
377 if (Log != 0)
378 Log->Start();
379
380 bool WasCancelled = false;
381
382 // Run till all things have been acquired
383 struct timeval tv;
384 tv.tv_sec = 0;
385 tv.tv_usec = PulseIntervall;
386 while (ToFetch > 0)
387 {
388 fd_set RFds;
389 fd_set WFds;
390 int Highest = 0;
391 FD_ZERO(&RFds);
392 FD_ZERO(&WFds);
393 SetFds(Highest,&RFds,&WFds);
394
395 int Res;
396 do
397 {
398 Res = select(Highest+1,&RFds,&WFds,0,&tv);
399 }
400 while (Res < 0 && errno == EINTR);
401
402 if (Res < 0)
403 {
404 _error->Errno("select","Select has failed");
405 break;
406 }
407
408 RunFds(&RFds,&WFds);
409 if (_error->PendingError() == true)
410 break;
411
412 // Timeout, notify the log class
413 if (Res == 0 || (Log != 0 && Log->Update == true))
414 {
415 tv.tv_usec = PulseIntervall;
416 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
417 I->Pulse();
418 if (Log != 0 && Log->Pulse(this) == false)
419 {
420 WasCancelled = true;
421 break;
422 }
423 }
424 }
425
426 if (Log != 0)
427 Log->Stop();
428
429 // Shut down the acquire bits
430 Running = false;
431 for (Queue *I = Queues; I != 0; I = I->Next)
432 I->Shutdown(false);
433
434 // Shut down the items
435 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
436 (*I)->Finished();
437
438 if (_error->PendingError())
439 return Failed;
440 if (WasCancelled)
441 return Cancelled;
442 return Continue;
443 }
444 /*}}}*/
445 // Acquire::Bump - Called when an item is dequeued /*{{{*/
446 // ---------------------------------------------------------------------
447 /* This routine bumps idle queues in hopes that they will be able to fetch
448 the dequeued item */
449 void pkgAcquire::Bump()
450 {
451 for (Queue *I = Queues; I != 0; I = I->Next)
452 I->Bump();
453 }
454 /*}}}*/
455 // Acquire::WorkerStep - Step to the next worker /*{{{*/
456 // ---------------------------------------------------------------------
457 /* Not inlined to advoid including acquire-worker.h */
458 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
459 {
460 return I->NextAcquire;
461 };
462 /*}}}*/
463 // Acquire::Clean - Cleans a directory /*{{{*/
464 // ---------------------------------------------------------------------
465 /* This is a bit simplistic, it looks at every file in the dir and sees
466 if it is part of the download set. */
467 bool pkgAcquire::Clean(string Dir)
468 {
469 DIR *D = opendir(Dir.c_str());
470 if (D == 0)
471 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
472
473 string StartDir = SafeGetCWD();
474 if (chdir(Dir.c_str()) != 0)
475 {
476 closedir(D);
477 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
478 }
479
480 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
481 {
482 // Skip some files..
483 if (strcmp(Dir->d_name,"lock") == 0 ||
484 strcmp(Dir->d_name,"partial") == 0 ||
485 strcmp(Dir->d_name,".") == 0 ||
486 strcmp(Dir->d_name,"..") == 0)
487 continue;
488
489 // Look in the get list
490 ItemCIterator I = Items.begin();
491 for (; I != Items.end(); I++)
492 if (flNotDir((*I)->DestFile) == Dir->d_name)
493 break;
494
495 // Nothing found, nuke it
496 if (I == Items.end())
497 unlink(Dir->d_name);
498 };
499
500 closedir(D);
501 if (chdir(StartDir.c_str()) != 0)
502 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
503 return true;
504 }
505 /*}}}*/
506 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
507 // ---------------------------------------------------------------------
508 /* This is the total number of bytes needed */
509 unsigned long long pkgAcquire::TotalNeeded()
510 {
511 unsigned long long Total = 0;
512 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
513 Total += (*I)->FileSize;
514 return Total;
515 }
516 /*}}}*/
517 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
518 // ---------------------------------------------------------------------
519 /* This is the number of bytes that is not local */
520 unsigned long long pkgAcquire::FetchNeeded()
521 {
522 unsigned long long Total = 0;
523 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
524 if ((*I)->Local == false)
525 Total += (*I)->FileSize;
526 return Total;
527 }
528 /*}}}*/
529 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
530 // ---------------------------------------------------------------------
531 /* This is the number of bytes that is not local */
532 unsigned long long pkgAcquire::PartialPresent()
533 {
534 unsigned long long Total = 0;
535 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
536 if ((*I)->Local == false)
537 Total += (*I)->PartialSize;
538 return Total;
539 }
540 /*}}}*/
541 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
542 // ---------------------------------------------------------------------
543 /* */
544 pkgAcquire::UriIterator pkgAcquire::UriBegin()
545 {
546 return UriIterator(Queues);
547 }
548 /*}}}*/
549 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
550 // ---------------------------------------------------------------------
551 /* */
552 pkgAcquire::UriIterator pkgAcquire::UriEnd()
553 {
554 return UriIterator(0);
555 }
556 /*}}}*/
557 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
558 // ---------------------------------------------------------------------
559 /* */
560 pkgAcquire::MethodConfig::MethodConfig()
561 {
562 SingleInstance = false;
563 Pipeline = false;
564 SendConfig = false;
565 LocalOnly = false;
566 Removable = false;
567 Next = 0;
568 }
569 /*}}}*/
570 // Queue::Queue - Constructor /*{{{*/
571 // ---------------------------------------------------------------------
572 /* */
573 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
574 Owner(Owner)
575 {
576 Items = 0;
577 Next = 0;
578 Workers = 0;
579 MaxPipeDepth = 1;
580 PipeDepth = 0;
581 }
582 /*}}}*/
583 // Queue::~Queue - Destructor /*{{{*/
584 // ---------------------------------------------------------------------
585 /* */
586 pkgAcquire::Queue::~Queue()
587 {
588 Shutdown(true);
589
590 while (Items != 0)
591 {
592 QItem *Jnk = Items;
593 Items = Items->Next;
594 delete Jnk;
595 }
596 }
597 /*}}}*/
598 // Queue::Enqueue - Queue an item to the queue /*{{{*/
599 // ---------------------------------------------------------------------
600 /* */
601 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
602 {
603 QItem **I = &Items;
604 // move to the end of the queue and check for duplicates here
605 for (; *I != 0; I = &(*I)->Next)
606 if (Item.URI == (*I)->URI)
607 {
608 Item.Owner->Status = Item::StatDone;
609 return false;
610 }
611
612 // Create a new item
613 QItem *Itm = new QItem;
614 *Itm = Item;
615 Itm->Next = 0;
616 *I = Itm;
617
618 Item.Owner->QueueCounter++;
619 if (Items->Next == 0)
620 Cycle();
621 return true;
622 }
623 /*}}}*/
624 // Queue::Dequeue - Remove an item from the queue /*{{{*/
625 // ---------------------------------------------------------------------
626 /* We return true if we hit something */
627 bool pkgAcquire::Queue::Dequeue(Item *Owner)
628 {
629 if (Owner->Status == pkgAcquire::Item::StatFetching)
630 return _error->Error("Tried to dequeue a fetching object");
631
632 bool Res = false;
633
634 QItem **I = &Items;
635 for (; *I != 0;)
636 {
637 if ((*I)->Owner == Owner)
638 {
639 QItem *Jnk= *I;
640 *I = (*I)->Next;
641 Owner->QueueCounter--;
642 delete Jnk;
643 Res = true;
644 }
645 else
646 I = &(*I)->Next;
647 }
648
649 return Res;
650 }
651 /*}}}*/
652 // Queue::Startup - Start the worker processes /*{{{*/
653 // ---------------------------------------------------------------------
654 /* It is possible for this to be called with a pre-existing set of
655 workers. */
656 bool pkgAcquire::Queue::Startup()
657 {
658 if (Workers == 0)
659 {
660 URI U(Name);
661 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
662 if (Cnf == 0)
663 return false;
664
665 Workers = new Worker(this,Cnf,Owner->Log);
666 Owner->Add(Workers);
667 if (Workers->Start() == false)
668 return false;
669
670 /* When pipelining we commit 10 items. This needs to change when we
671 added other source retry to have cycle maintain a pipeline depth
672 on its own. */
673 if (Cnf->Pipeline == true)
674 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
675 else
676 MaxPipeDepth = 1;
677 }
678
679 return Cycle();
680 }
681 /*}}}*/
682 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
683 // ---------------------------------------------------------------------
684 /* If final is true then all workers are eliminated, otherwise only workers
685 that do not need cleanup are removed */
686 bool pkgAcquire::Queue::Shutdown(bool Final)
687 {
688 // Delete all of the workers
689 pkgAcquire::Worker **Cur = &Workers;
690 while (*Cur != 0)
691 {
692 pkgAcquire::Worker *Jnk = *Cur;
693 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
694 {
695 *Cur = Jnk->NextQueue;
696 Owner->Remove(Jnk);
697 delete Jnk;
698 }
699 else
700 Cur = &(*Cur)->NextQueue;
701 }
702
703 return true;
704 }
705 /*}}}*/
706 // Queue::FindItem - Find a URI in the item list /*{{{*/
707 // ---------------------------------------------------------------------
708 /* */
709 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
710 {
711 for (QItem *I = Items; I != 0; I = I->Next)
712 if (I->URI == URI && I->Worker == Owner)
713 return I;
714 return 0;
715 }
716 /*}}}*/
717 // Queue::ItemDone - Item has been completed /*{{{*/
718 // ---------------------------------------------------------------------
719 /* The worker signals this which causes the item to be removed from the
720 queue. If this is the last queue instance then it is removed from the
721 main queue too.*/
722 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
723 {
724 PipeDepth--;
725 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
726 Itm->Owner->Status = pkgAcquire::Item::StatDone;
727
728 if (Itm->Owner->QueueCounter <= 1)
729 Owner->Dequeue(Itm->Owner);
730 else
731 {
732 Dequeue(Itm->Owner);
733 Owner->Bump();
734 }
735
736 return Cycle();
737 }
738 /*}}}*/
739 // Queue::Cycle - Queue new items into the method /*{{{*/
740 // ---------------------------------------------------------------------
741 /* This locates a new idle item and sends it to the worker. If pipelining
742 is enabled then it keeps the pipe full. */
743 bool pkgAcquire::Queue::Cycle()
744 {
745 if (Items == 0 || Workers == 0)
746 return true;
747
748 if (PipeDepth < 0)
749 return _error->Error("Pipedepth failure");
750
751 // Look for a queable item
752 QItem *I = Items;
753 while (PipeDepth < (signed)MaxPipeDepth)
754 {
755 for (; I != 0; I = I->Next)
756 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
757 break;
758
759 // Nothing to do, queue is idle.
760 if (I == 0)
761 return true;
762
763 I->Worker = Workers;
764 I->Owner->Status = pkgAcquire::Item::StatFetching;
765 PipeDepth++;
766 if (Workers->QueueItem(I) == false)
767 return false;
768 }
769
770 return true;
771 }
772 /*}}}*/
773 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
774 // ---------------------------------------------------------------------
775 /* This is called when an item in multiple queues is dequeued */
776 void pkgAcquire::Queue::Bump()
777 {
778 Cycle();
779 }
780 /*}}}*/
781 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
782 // ---------------------------------------------------------------------
783 /* */
784 pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
785 {
786 Start();
787 }
788 /*}}}*/
789 // AcquireStatus::Pulse - Called periodically /*{{{*/
790 // ---------------------------------------------------------------------
791 /* This computes some internal state variables for the derived classes to
792 use. It generates the current downloaded bytes and total bytes to download
793 as well as the current CPS estimate. */
794 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
795 {
796 TotalBytes = 0;
797 CurrentBytes = 0;
798 TotalItems = 0;
799 CurrentItems = 0;
800
801 // Compute the total number of bytes to fetch
802 unsigned int Unknown = 0;
803 unsigned int Count = 0;
804 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
805 I++, Count++)
806 {
807 TotalItems++;
808 if ((*I)->Status == pkgAcquire::Item::StatDone)
809 CurrentItems++;
810
811 // Totally ignore local items
812 if ((*I)->Local == true)
813 continue;
814
815 TotalBytes += (*I)->FileSize;
816 if ((*I)->Complete == true)
817 CurrentBytes += (*I)->FileSize;
818 if ((*I)->FileSize == 0 && (*I)->Complete == false)
819 Unknown++;
820 }
821
822 // Compute the current completion
823 unsigned long ResumeSize = 0;
824 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
825 I = Owner->WorkerStep(I))
826 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
827 {
828 CurrentBytes += I->CurrentSize;
829 ResumeSize += I->ResumePoint;
830
831 // Files with unknown size always have 100% completion
832 if (I->CurrentItem->Owner->FileSize == 0 &&
833 I->CurrentItem->Owner->Complete == false)
834 TotalBytes += I->CurrentSize;
835 }
836
837 // Normalize the figures and account for unknown size downloads
838 if (TotalBytes <= 0)
839 TotalBytes = 1;
840 if (Unknown == Count)
841 TotalBytes = Unknown;
842
843 // Wha?! Is not supposed to happen.
844 if (CurrentBytes > TotalBytes)
845 CurrentBytes = TotalBytes;
846
847 // Compute the CPS
848 struct timeval NewTime;
849 gettimeofday(&NewTime,0);
850 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
851 NewTime.tv_sec - Time.tv_sec > 6)
852 {
853 double Delta = NewTime.tv_sec - Time.tv_sec +
854 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
855
856 // Compute the CPS value
857 if (Delta < 0.01)
858 CurrentCPS = 0;
859 else
860 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
861 LastBytes = CurrentBytes - ResumeSize;
862 ElapsedTime = (unsigned long)Delta;
863 Time = NewTime;
864 }
865
866 int fd = _config->FindI("APT::Status-Fd",-1);
867 if(fd > 0)
868 {
869 ostringstream status;
870
871 char msg[200];
872 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
873 unsigned long ETA =
874 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
875
876 // only show the ETA if it makes sense
877 if (ETA > 0 && ETA < 172800 /* two days */ )
878 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
879 else
880 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
881
882
883
884 // build the status str
885 status << "dlstatus:" << i
886 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
887 << ":" << msg
888 << endl;
889 write(fd, status.str().c_str(), status.str().size());
890 }
891
892 return true;
893 }
894 /*}}}*/
895 // AcquireStatus::Start - Called when the download is started /*{{{*/
896 // ---------------------------------------------------------------------
897 /* We just reset the counters */
898 void pkgAcquireStatus::Start()
899 {
900 gettimeofday(&Time,0);
901 gettimeofday(&StartTime,0);
902 LastBytes = 0;
903 CurrentCPS = 0;
904 CurrentBytes = 0;
905 TotalBytes = 0;
906 FetchedBytes = 0;
907 ElapsedTime = 0;
908 TotalItems = 0;
909 CurrentItems = 0;
910 }
911 /*}}}*/
912 // AcquireStatus::Stop - Finished downloading /*{{{*/
913 // ---------------------------------------------------------------------
914 /* This accurately computes the elapsed time and the total overall CPS. */
915 void pkgAcquireStatus::Stop()
916 {
917 // Compute the CPS and elapsed time
918 struct timeval NewTime;
919 gettimeofday(&NewTime,0);
920
921 double Delta = NewTime.tv_sec - StartTime.tv_sec +
922 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
923
924 // Compute the CPS value
925 if (Delta < 0.01)
926 CurrentCPS = 0;
927 else
928 CurrentCPS = FetchedBytes/Delta;
929 LastBytes = CurrentBytes;
930 ElapsedTime = (unsigned int)Delta;
931 }
932 /*}}}*/
933 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
934 // ---------------------------------------------------------------------
935 /* This is used to get accurate final transfer rate reporting. */
936 void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
937 {
938 FetchedBytes += Size - Resume;
939 }
940 /*}}}*/