Fixed --no-download
[ntk/apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
a3eaf954 3// $Id: acquire.cc,v 1.44 1999/12/09 05:22:33 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
7a7fa5f0 26#include <dirent.h>
8267fe24 27#include <sys/time.h>
524f8105 28#include <errno.h>
5f3edc0f 29#include <sys/stat.h>
0118833a
AL
30 /*}}}*/
31
32// Acquire::pkgAcquire - Constructor /*{{{*/
33// ---------------------------------------------------------------------
93bf083d 34/* We grab some runtime state from the configuration space */
8267fe24 35pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
36{
37 Queues = 0;
38 Configs = 0;
0a8a80e5
AL
39 Workers = 0;
40 ToFetch = 0;
8b89e57f 41 Running = false;
0a8a80e5
AL
42
43 string Mode = _config->Find("Acquire::Queue-Mode","host");
44 if (strcasecmp(Mode.c_str(),"host") == 0)
45 QueueMode = QueueHost;
46 if (strcasecmp(Mode.c_str(),"access") == 0)
47 QueueMode = QueueAccess;
48
49 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 50
0c55d1d2 51 // This is really a stupid place for this
5f3edc0f
AL
52 struct stat St;
53 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
54 S_ISDIR(St.st_mode) == 0)
0c55d1d2 55 _error->Error("Lists directory %spartial is missing.",
5f3edc0f
AL
56 _config->FindDir("Dir::State::lists").c_str());
57 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
58 S_ISDIR(St.st_mode) == 0)
0c55d1d2 59 _error->Error("Archive directory %spartial is missing.",
5f3edc0f 60 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
61}
62 /*}}}*/
63// Acquire::~pkgAcquire - Destructor /*{{{*/
64// ---------------------------------------------------------------------
93bf083d 65/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
66pkgAcquire::~pkgAcquire()
67{
3b5421b4
AL
68 while (Configs != 0)
69 {
70 MethodConfig *Jnk = Configs;
71 Configs = Configs->Next;
72 delete Jnk;
73 }
281daf46
AL
74
75 Shutdown();
76}
77 /*}}}*/
8e5fc8f5 78// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
79// ---------------------------------------------------------------------
80/* */
81void pkgAcquire::Shutdown()
82{
83 while (Items.size() != 0)
84 delete Items[0];
0a8a80e5
AL
85
86 while (Queues != 0)
87 {
88 Queue *Jnk = Queues;
89 Queues = Queues->Next;
90 delete Jnk;
91 }
0118833a
AL
92}
93 /*}}}*/
94// Acquire::Add - Add a new item /*{{{*/
95// ---------------------------------------------------------------------
93bf083d
AL
96/* This puts an item on the acquire list. This list is mainly for tracking
97 item status */
0118833a
AL
98void pkgAcquire::Add(Item *Itm)
99{
100 Items.push_back(Itm);
101}
102 /*}}}*/
103// Acquire::Remove - Remove a item /*{{{*/
104// ---------------------------------------------------------------------
93bf083d 105/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
106void pkgAcquire::Remove(Item *Itm)
107{
a3eaf954
AL
108 Dequeue(Itm);
109
0118833a
AL
110 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
111 {
112 if (*I == Itm)
113 Items.erase(I);
8267fe24 114 }
0118833a
AL
115}
116 /*}}}*/
0a8a80e5
AL
117// Acquire::Add - Add a worker /*{{{*/
118// ---------------------------------------------------------------------
93bf083d
AL
119/* A list of workers is kept so that the select loop can direct their FD
120 usage. */
0a8a80e5
AL
121void pkgAcquire::Add(Worker *Work)
122{
123 Work->NextAcquire = Workers;
124 Workers = Work;
125}
126 /*}}}*/
127// Acquire::Remove - Remove a worker /*{{{*/
128// ---------------------------------------------------------------------
93bf083d
AL
129/* A worker has died. This can not be done while the select loop is running
130 as it would require that RunFds could handling a changing list state and
131 it cant.. */
0a8a80e5
AL
132void pkgAcquire::Remove(Worker *Work)
133{
93bf083d
AL
134 if (Running == true)
135 abort();
136
0a8a80e5
AL
137 Worker **I = &Workers;
138 for (; *I != 0;)
139 {
140 if (*I == Work)
141 *I = (*I)->NextAcquire;
142 else
143 I = &(*I)->NextAcquire;
144 }
145}
146 /*}}}*/
0118833a
AL
147// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
148// ---------------------------------------------------------------------
93bf083d 149/* This is the entry point for an item. An item calls this function when
281daf46 150 it is constructed which creates a queue (based on the current queue
93bf083d
AL
151 mode) and puts the item in that queue. If the system is running then
152 the queue might be started. */
8267fe24 153void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 154{
0a8a80e5 155 // Determine which queue to put the item in
e331f6ed
AL
156 const MethodConfig *Config;
157 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
158 if (Name.empty() == true)
159 return;
160
161 // Find the queue structure
162 Queue *I = Queues;
163 for (; I != 0 && I->Name != Name; I = I->Next);
164 if (I == 0)
165 {
166 I = new Queue(Name,this);
167 I->Next = Queues;
168 Queues = I;
93bf083d
AL
169
170 if (Running == true)
171 I->Startup();
0a8a80e5 172 }
bfd22fc0 173
e331f6ed
AL
174 // See if this is a local only URI
175 if (Config->LocalOnly == true && Item.Owner->Complete == false)
176 Item.Owner->Local = true;
8267fe24 177 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
178
179 // Queue it into the named queue
8267fe24 180 I->Enqueue(Item);
0a8a80e5 181 ToFetch++;
93bf083d 182
0a8a80e5
AL
183 // Some trace stuff
184 if (Debug == true)
185 {
8267fe24
AL
186 clog << "Fetching " << Item.URI << endl;
187 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 188 clog << " Queue is: " << Name << endl;
0a8a80e5 189 }
3b5421b4
AL
190}
191 /*}}}*/
0a8a80e5 192// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 193// ---------------------------------------------------------------------
93bf083d
AL
194/* This is called when an item is finished being fetched. It removes it
195 from all the queues */
0a8a80e5
AL
196void pkgAcquire::Dequeue(Item *Itm)
197{
198 Queue *I = Queues;
bfd22fc0 199 bool Res = false;
0a8a80e5 200 for (; I != 0; I = I->Next)
bfd22fc0 201 Res |= I->Dequeue(Itm);
93bf083d
AL
202
203 if (Debug == true)
204 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
205 if (Res == true)
206 ToFetch--;
0a8a80e5
AL
207}
208 /*}}}*/
209// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
210// ---------------------------------------------------------------------
211/* The string returned depends on the configuration settings and the
212 method parameters. Given something like http://foo.org/bar it can
213 return http://foo.org or http */
e331f6ed 214string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 215{
93bf083d
AL
216 URI U(Uri);
217
e331f6ed 218 Config = GetConfig(U.Access);
0a8a80e5
AL
219 if (Config == 0)
220 return string();
221
222 /* Single-Instance methods get exactly one queue per URI. This is
223 also used for the Access queue method */
224 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 225 return U.Access;
93bf083d
AL
226
227 return U.Access + ':' + U.Host;
0118833a
AL
228}
229 /*}}}*/
3b5421b4
AL
230// Acquire::GetConfig - Fetch the configuration information /*{{{*/
231// ---------------------------------------------------------------------
232/* This locates the configuration structure for an access method. If
233 a config structure cannot be found a Worker will be created to
234 retrieve it */
0a8a80e5 235pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
236{
237 // Search for an existing config
238 MethodConfig *Conf;
239 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
240 if (Conf->Access == Access)
241 return Conf;
242
243 // Create the new config class
244 Conf = new MethodConfig;
245 Conf->Access = Access;
246 Conf->Next = Configs;
247 Configs = Conf;
0118833a 248
3b5421b4
AL
249 // Create the worker to fetch the configuration
250 Worker Work(Conf);
251 if (Work.Start() == false)
252 return 0;
253
254 return Conf;
255}
256 /*}}}*/
0a8a80e5
AL
257// Acquire::SetFds - Deal with readable FDs /*{{{*/
258// ---------------------------------------------------------------------
259/* Collect FDs that have activity monitors into the fd sets */
260void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
261{
262 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
263 {
264 if (I->InReady == true && I->InFd >= 0)
265 {
266 if (Fd < I->InFd)
267 Fd = I->InFd;
268 FD_SET(I->InFd,RSet);
269 }
270 if (I->OutReady == true && I->OutFd >= 0)
271 {
272 if (Fd < I->OutFd)
273 Fd = I->OutFd;
274 FD_SET(I->OutFd,WSet);
275 }
276 }
277}
278 /*}}}*/
279// Acquire::RunFds - Deal with active FDs /*{{{*/
280// ---------------------------------------------------------------------
93bf083d
AL
281/* Dispatch active FDs over to the proper workers. It is very important
282 that a worker never be erased while this is running! The queue class
283 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
284void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
285{
286 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
287 {
288 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
289 I->InFdReady();
290 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
291 I->OutFdReady();
292 }
293}
294 /*}}}*/
295// Acquire::Run - Run the fetch sequence /*{{{*/
296// ---------------------------------------------------------------------
297/* This runs the queues. It manages a select loop for all of the
298 Worker tasks. The workers interact with the queues and items to
299 manage the actual fetch. */
024d1123 300pkgAcquire::RunResult pkgAcquire::Run()
0a8a80e5 301{
8b89e57f
AL
302 Running = true;
303
0a8a80e5
AL
304 for (Queue *I = Queues; I != 0; I = I->Next)
305 I->Startup();
306
b98f2859
AL
307 if (Log != 0)
308 Log->Start();
309
024d1123
AL
310 bool WasCancelled = false;
311
0a8a80e5 312 // Run till all things have been acquired
8267fe24
AL
313 struct timeval tv;
314 tv.tv_sec = 0;
315 tv.tv_usec = 500000;
0a8a80e5
AL
316 while (ToFetch > 0)
317 {
318 fd_set RFds;
319 fd_set WFds;
320 int Highest = 0;
321 FD_ZERO(&RFds);
322 FD_ZERO(&WFds);
323 SetFds(Highest,&RFds,&WFds);
324
b0db36b1
AL
325 int Res;
326 do
327 {
328 Res = select(Highest+1,&RFds,&WFds,0,&tv);
329 }
330 while (Res < 0 && errno == EINTR);
331
8267fe24 332 if (Res < 0)
8b89e57f 333 {
8267fe24
AL
334 _error->Errno("select","Select has failed");
335 break;
8b89e57f 336 }
93bf083d 337
0a8a80e5 338 RunFds(&RFds,&WFds);
93bf083d
AL
339 if (_error->PendingError() == true)
340 break;
8267fe24
AL
341
342 // Timeout, notify the log class
343 if (Res == 0 || (Log != 0 && Log->Update == true))
344 {
345 tv.tv_usec = 500000;
346 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
347 I->Pulse();
024d1123
AL
348 if (Log != 0 && Log->Pulse(this) == false)
349 {
350 WasCancelled = true;
351 break;
352 }
8267fe24 353 }
0a8a80e5 354 }
be4401bf 355
b98f2859
AL
356 if (Log != 0)
357 Log->Stop();
358
be4401bf
AL
359 // Shut down the acquire bits
360 Running = false;
0a8a80e5 361 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 362 I->Shutdown(false);
0a8a80e5 363
ab559b35
AL
364 // Shut down the items
365 for (Item **I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 366 (*I)->Finished();
ab559b35 367
024d1123
AL
368 if (_error->PendingError())
369 return Failed;
370 if (WasCancelled)
371 return Cancelled;
372 return Continue;
93bf083d
AL
373}
374 /*}}}*/
be4401bf 375// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
376// ---------------------------------------------------------------------
377/* This routine bumps idle queues in hopes that they will be able to fetch
378 the dequeued item */
379void pkgAcquire::Bump()
380{
be4401bf
AL
381 for (Queue *I = Queues; I != 0; I = I->Next)
382 I->Bump();
0a8a80e5
AL
383}
384 /*}}}*/
8267fe24
AL
385// Acquire::WorkerStep - Step to the next worker /*{{{*/
386// ---------------------------------------------------------------------
387/* Not inlined to advoid including acquire-worker.h */
388pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
389{
390 return I->NextAcquire;
391};
392 /*}}}*/
a6568219 393// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
394// ---------------------------------------------------------------------
395/* This is a bit simplistic, it looks at every file in the dir and sees
396 if it is part of the download set. */
397bool pkgAcquire::Clean(string Dir)
398{
399 DIR *D = opendir(Dir.c_str());
400 if (D == 0)
401 return _error->Errno("opendir","Unable to read %s",Dir.c_str());
402
403 string StartDir = SafeGetCWD();
404 if (chdir(Dir.c_str()) != 0)
405 {
406 closedir(D);
407 return _error->Errno("chdir","Unable to change to ",Dir.c_str());
408 }
409
410 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
411 {
412 // Skip some files..
413 if (strcmp(Dir->d_name,"lock") == 0 ||
414 strcmp(Dir->d_name,"partial") == 0 ||
415 strcmp(Dir->d_name,".") == 0 ||
416 strcmp(Dir->d_name,"..") == 0)
417 continue;
418
419 // Look in the get list
420 vector<Item *>::iterator I = Items.begin();
421 for (; I != Items.end(); I++)
422 if (flNotDir((*I)->DestFile) == Dir->d_name)
423 break;
424
425 // Nothing found, nuke it
426 if (I == Items.end())
427 unlink(Dir->d_name);
428 };
429
430 chdir(StartDir.c_str());
431 closedir(D);
432 return true;
433}
434 /*}}}*/
a6568219
AL
435// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
436// ---------------------------------------------------------------------
437/* This is the total number of bytes needed */
438unsigned long pkgAcquire::TotalNeeded()
439{
440 unsigned long Total = 0;
441 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
442 Total += (*I)->FileSize;
443 return Total;
444}
445 /*}}}*/
446// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
447// ---------------------------------------------------------------------
448/* This is the number of bytes that is not local */
449unsigned long pkgAcquire::FetchNeeded()
450{
451 unsigned long Total = 0;
452 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
453 if ((*I)->Local == false)
454 Total += (*I)->FileSize;
455 return Total;
456}
457 /*}}}*/
6b1ff003
AL
458// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
459// ---------------------------------------------------------------------
460/* This is the number of bytes that is not local */
461unsigned long pkgAcquire::PartialPresent()
462{
463 unsigned long Total = 0;
464 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
465 if ((*I)->Local == false)
466 Total += (*I)->PartialSize;
467 return Total;
468}
469 /*}}}*/
8e5fc8f5 470// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
471// ---------------------------------------------------------------------
472/* */
473pkgAcquire::UriIterator pkgAcquire::UriBegin()
474{
475 return UriIterator(Queues);
476}
477 /*}}}*/
8e5fc8f5 478// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
479// ---------------------------------------------------------------------
480/* */
481pkgAcquire::UriIterator pkgAcquire::UriEnd()
482{
483 return UriIterator(0);
484}
485 /*}}}*/
0a8a80e5 486
e331f6ed
AL
487// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
488// ---------------------------------------------------------------------
489/* */
490pkgAcquire::MethodConfig::MethodConfig()
491{
492 SingleInstance = false;
e331f6ed
AL
493 Pipeline = false;
494 SendConfig = false;
495 LocalOnly = false;
496 Next = 0;
497}
498 /*}}}*/
499
0a8a80e5
AL
500// Queue::Queue - Constructor /*{{{*/
501// ---------------------------------------------------------------------
502/* */
503pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
504 Owner(Owner)
505{
506 Items = 0;
507 Next = 0;
508 Workers = 0;
b185acc2
AL
509 MaxPipeDepth = 1;
510 PipeDepth = 0;
0a8a80e5
AL
511}
512 /*}}}*/
513// Queue::~Queue - Destructor /*{{{*/
514// ---------------------------------------------------------------------
515/* */
516pkgAcquire::Queue::~Queue()
517{
8e5fc8f5 518 Shutdown(true);
0a8a80e5
AL
519
520 while (Items != 0)
521 {
522 QItem *Jnk = Items;
523 Items = Items->Next;
524 delete Jnk;
525 }
526}
527 /*}}}*/
528// Queue::Enqueue - Queue an item to the queue /*{{{*/
529// ---------------------------------------------------------------------
530/* */
8267fe24 531void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 532{
7a1b1f8b
AL
533 QItem **I = &Items;
534 for (; *I != 0; I = &(*I)->Next);
535
0a8a80e5 536 // Create a new item
7a1b1f8b
AL
537 QItem *Itm = new QItem;
538 *Itm = Item;
539 Itm->Next = 0;
540 *I = Itm;
0a8a80e5 541
8267fe24 542 Item.Owner->QueueCounter++;
93bf083d
AL
543 if (Items->Next == 0)
544 Cycle();
0a8a80e5
AL
545}
546 /*}}}*/
c88edf1d 547// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 548// ---------------------------------------------------------------------
b185acc2 549/* We return true if we hit something */
bfd22fc0 550bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 551{
b185acc2
AL
552 if (Owner->Status == pkgAcquire::Item::StatFetching)
553 return _error->Error("Tried to dequeue a fetching object");
554
bfd22fc0
AL
555 bool Res = false;
556
0a8a80e5
AL
557 QItem **I = &Items;
558 for (; *I != 0;)
559 {
560 if ((*I)->Owner == Owner)
561 {
562 QItem *Jnk= *I;
563 *I = (*I)->Next;
564 Owner->QueueCounter--;
565 delete Jnk;
bfd22fc0 566 Res = true;
0a8a80e5
AL
567 }
568 else
569 I = &(*I)->Next;
570 }
bfd22fc0
AL
571
572 return Res;
0a8a80e5
AL
573}
574 /*}}}*/
575// Queue::Startup - Start the worker processes /*{{{*/
576// ---------------------------------------------------------------------
8e5fc8f5
AL
577/* It is possible for this to be called with a pre-existing set of
578 workers. */
0a8a80e5
AL
579bool pkgAcquire::Queue::Startup()
580{
8e5fc8f5
AL
581 if (Workers == 0)
582 {
583 URI U(Name);
584 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
585 if (Cnf == 0)
586 return false;
587
588 Workers = new Worker(this,Cnf,Owner->Log);
589 Owner->Add(Workers);
590 if (Workers->Start() == false)
591 return false;
592
593 /* When pipelining we commit 10 items. This needs to change when we
594 added other source retry to have cycle maintain a pipeline depth
595 on its own. */
596 if (Cnf->Pipeline == true)
597 MaxPipeDepth = 10;
598 else
599 MaxPipeDepth = 1;
600 }
5cb5d8dc 601
93bf083d 602 return Cycle();
0a8a80e5
AL
603}
604 /*}}}*/
605// Queue::Shutdown - Shutdown the worker processes /*{{{*/
606// ---------------------------------------------------------------------
8e5fc8f5
AL
607/* If final is true then all workers are eliminated, otherwise only workers
608 that do not need cleanup are removed */
609bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
610{
611 // Delete all of the workers
8e5fc8f5
AL
612 pkgAcquire::Worker **Cur = &Workers;
613 while (*Cur != 0)
0a8a80e5 614 {
8e5fc8f5
AL
615 pkgAcquire::Worker *Jnk = *Cur;
616 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
617 {
618 *Cur = Jnk->NextQueue;
619 Owner->Remove(Jnk);
620 delete Jnk;
621 }
622 else
623 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
624 }
625
626 return true;
3b5421b4
AL
627}
628 /*}}}*/
7d8afa39 629// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
630// ---------------------------------------------------------------------
631/* */
632pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
633{
634 for (QItem *I = Items; I != 0; I = I->Next)
635 if (I->URI == URI && I->Worker == Owner)
636 return I;
637 return 0;
638}
639 /*}}}*/
640// Queue::ItemDone - Item has been completed /*{{{*/
641// ---------------------------------------------------------------------
642/* The worker signals this which causes the item to be removed from the
93bf083d
AL
643 queue. If this is the last queue instance then it is removed from the
644 main queue too.*/
c88edf1d
AL
645bool pkgAcquire::Queue::ItemDone(QItem *Itm)
646{
b185acc2 647 PipeDepth--;
db890fdb
AL
648 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
649 Itm->Owner->Status = pkgAcquire::Item::StatDone;
650
93bf083d
AL
651 if (Itm->Owner->QueueCounter <= 1)
652 Owner->Dequeue(Itm->Owner);
653 else
654 {
655 Dequeue(Itm->Owner);
656 Owner->Bump();
657 }
c88edf1d 658
93bf083d
AL
659 return Cycle();
660}
661 /*}}}*/
662// Queue::Cycle - Queue new items into the method /*{{{*/
663// ---------------------------------------------------------------------
b185acc2
AL
664/* This locates a new idle item and sends it to the worker. If pipelining
665 is enabled then it keeps the pipe full. */
93bf083d
AL
666bool pkgAcquire::Queue::Cycle()
667{
668 if (Items == 0 || Workers == 0)
c88edf1d
AL
669 return true;
670
e7432370
AL
671 if (PipeDepth < 0)
672 return _error->Error("Pipedepth failure");
673
93bf083d
AL
674 // Look for a queable item
675 QItem *I = Items;
e7432370 676 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
677 {
678 for (; I != 0; I = I->Next)
679 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
680 break;
681
682 // Nothing to do, queue is idle.
683 if (I == 0)
684 return true;
685
686 I->Worker = Workers;
687 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 688 PipeDepth++;
b185acc2
AL
689 if (Workers->QueueItem(I) == false)
690 return false;
691 }
93bf083d 692
b185acc2 693 return true;
c88edf1d
AL
694}
695 /*}}}*/
be4401bf
AL
696// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
697// ---------------------------------------------------------------------
b185acc2 698/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
699void pkgAcquire::Queue::Bump()
700{
b185acc2 701 Cycle();
be4401bf
AL
702}
703 /*}}}*/
b98f2859
AL
704
705// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
706// ---------------------------------------------------------------------
707/* */
708pkgAcquireStatus::pkgAcquireStatus()
709{
710 Start();
711}
712 /*}}}*/
713// AcquireStatus::Pulse - Called periodically /*{{{*/
714// ---------------------------------------------------------------------
715/* This computes some internal state variables for the derived classes to
716 use. It generates the current downloaded bytes and total bytes to download
717 as well as the current CPS estimate. */
024d1123 718bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
719{
720 TotalBytes = 0;
721 CurrentBytes = 0;
d568ed2d
AL
722 TotalItems = 0;
723 CurrentItems = 0;
b98f2859
AL
724
725 // Compute the total number of bytes to fetch
726 unsigned int Unknown = 0;
727 unsigned int Count = 0;
728 for (pkgAcquire::Item **I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
729 I++, Count++)
730 {
d568ed2d
AL
731 TotalItems++;
732 if ((*I)->Status == pkgAcquire::Item::StatDone)
733 CurrentItems++;
734
a6568219
AL
735 // Totally ignore local items
736 if ((*I)->Local == true)
737 continue;
738
b98f2859
AL
739 TotalBytes += (*I)->FileSize;
740 if ((*I)->Complete == true)
741 CurrentBytes += (*I)->FileSize;
742 if ((*I)->FileSize == 0 && (*I)->Complete == false)
743 Unknown++;
744 }
745
746 // Compute the current completion
aa0e1101 747 unsigned long ResumeSize = 0;
b98f2859
AL
748 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
749 I = Owner->WorkerStep(I))
750 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
751 {
752 CurrentBytes += I->CurrentSize;
753 ResumeSize += I->ResumePoint;
754
755 // Files with unknown size always have 100% completion
756 if (I->CurrentItem->Owner->FileSize == 0 &&
757 I->CurrentItem->Owner->Complete == false)
758 TotalBytes += I->CurrentSize;
759 }
760
b98f2859
AL
761 // Normalize the figures and account for unknown size downloads
762 if (TotalBytes <= 0)
763 TotalBytes = 1;
764 if (Unknown == Count)
765 TotalBytes = Unknown;
18ef0a78
AL
766
767 // Wha?! Is not supposed to happen.
768 if (CurrentBytes > TotalBytes)
769 CurrentBytes = TotalBytes;
b98f2859
AL
770
771 // Compute the CPS
772 struct timeval NewTime;
773 gettimeofday(&NewTime,0);
774 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
775 NewTime.tv_sec - Time.tv_sec > 6)
776 {
f17ac097
AL
777 double Delta = NewTime.tv_sec - Time.tv_sec +
778 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 779
b98f2859 780 // Compute the CPS value
f17ac097 781 if (Delta < 0.01)
e331f6ed
AL
782 CurrentCPS = 0;
783 else
aa0e1101
AL
784 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
785 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 786 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
787 Time = NewTime;
788 }
024d1123
AL
789
790 return true;
b98f2859
AL
791}
792 /*}}}*/
793// AcquireStatus::Start - Called when the download is started /*{{{*/
794// ---------------------------------------------------------------------
795/* We just reset the counters */
796void pkgAcquireStatus::Start()
797{
798 gettimeofday(&Time,0);
799 gettimeofday(&StartTime,0);
800 LastBytes = 0;
801 CurrentCPS = 0;
802 CurrentBytes = 0;
803 TotalBytes = 0;
804 FetchedBytes = 0;
805 ElapsedTime = 0;
d568ed2d
AL
806 TotalItems = 0;
807 CurrentItems = 0;
b98f2859
AL
808}
809 /*}}}*/
a6568219 810// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
811// ---------------------------------------------------------------------
812/* This accurately computes the elapsed time and the total overall CPS. */
813void pkgAcquireStatus::Stop()
814{
815 // Compute the CPS and elapsed time
816 struct timeval NewTime;
817 gettimeofday(&NewTime,0);
818
31a0531d
AL
819 double Delta = NewTime.tv_sec - StartTime.tv_sec +
820 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 821
b98f2859 822 // Compute the CPS value
31a0531d 823 if (Delta < 0.01)
e331f6ed
AL
824 CurrentCPS = 0;
825 else
31a0531d 826 CurrentCPS = FetchedBytes/Delta;
b98f2859 827 LastBytes = CurrentBytes;
31a0531d 828 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
829}
830 /*}}}*/
831// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
832// ---------------------------------------------------------------------
833/* This is used to get accurate final transfer rate reporting. */
834void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 835{
b98f2859
AL
836 FetchedBytes += Size - Resume;
837}
838 /*}}}*/