G++3 fixes from Randolph
[ntk/apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
b4fc9b6f 3// $Id: acquire.cc,v 1.48 2001/05/22 04:17:18 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
b2e465d6 26#include <apti18n.h>
b4fc9b6f
AL
27
28#include <iostream>
b2e465d6 29
7a7fa5f0 30#include <dirent.h>
8267fe24 31#include <sys/time.h>
524f8105 32#include <errno.h>
5f3edc0f 33#include <sys/stat.h>
0118833a
AL
34 /*}}}*/
35
b4fc9b6f
AL
36using namespace std;
37
0118833a
AL
38// Acquire::pkgAcquire - Constructor /*{{{*/
39// ---------------------------------------------------------------------
93bf083d 40/* We grab some runtime state from the configuration space */
8267fe24 41pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
42{
43 Queues = 0;
44 Configs = 0;
0a8a80e5
AL
45 Workers = 0;
46 ToFetch = 0;
8b89e57f 47 Running = false;
0a8a80e5
AL
48
49 string Mode = _config->Find("Acquire::Queue-Mode","host");
50 if (strcasecmp(Mode.c_str(),"host") == 0)
51 QueueMode = QueueHost;
52 if (strcasecmp(Mode.c_str(),"access") == 0)
53 QueueMode = QueueAccess;
54
55 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 56
0c55d1d2 57 // This is really a stupid place for this
5f3edc0f
AL
58 struct stat St;
59 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
60 S_ISDIR(St.st_mode) == 0)
b2e465d6 61 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
62 _config->FindDir("Dir::State::lists").c_str());
63 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
64 S_ISDIR(St.st_mode) == 0)
b2e465d6 65 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 66 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
67}
68 /*}}}*/
69// Acquire::~pkgAcquire - Destructor /*{{{*/
70// ---------------------------------------------------------------------
93bf083d 71/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
72pkgAcquire::~pkgAcquire()
73{
459681d3
AL
74 Shutdown();
75
3b5421b4
AL
76 while (Configs != 0)
77 {
78 MethodConfig *Jnk = Configs;
79 Configs = Configs->Next;
80 delete Jnk;
81 }
281daf46
AL
82}
83 /*}}}*/
8e5fc8f5 84// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
85// ---------------------------------------------------------------------
86/* */
87void pkgAcquire::Shutdown()
88{
89 while (Items.size() != 0)
90 delete Items[0];
0a8a80e5
AL
91
92 while (Queues != 0)
93 {
94 Queue *Jnk = Queues;
95 Queues = Queues->Next;
96 delete Jnk;
97 }
0118833a
AL
98}
99 /*}}}*/
100// Acquire::Add - Add a new item /*{{{*/
101// ---------------------------------------------------------------------
93bf083d
AL
102/* This puts an item on the acquire list. This list is mainly for tracking
103 item status */
0118833a
AL
104void pkgAcquire::Add(Item *Itm)
105{
106 Items.push_back(Itm);
107}
108 /*}}}*/
109// Acquire::Remove - Remove a item /*{{{*/
110// ---------------------------------------------------------------------
93bf083d 111/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
112void pkgAcquire::Remove(Item *Itm)
113{
a3eaf954
AL
114 Dequeue(Itm);
115
b4fc9b6f 116 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
0118833a
AL
117 {
118 if (*I == Itm)
b4fc9b6f 119 {
0118833a 120 Items.erase(I);
b4fc9b6f
AL
121 I = Items.begin();
122 }
8267fe24 123 }
0118833a
AL
124}
125 /*}}}*/
0a8a80e5
AL
126// Acquire::Add - Add a worker /*{{{*/
127// ---------------------------------------------------------------------
93bf083d
AL
128/* A list of workers is kept so that the select loop can direct their FD
129 usage. */
0a8a80e5
AL
130void pkgAcquire::Add(Worker *Work)
131{
132 Work->NextAcquire = Workers;
133 Workers = Work;
134}
135 /*}}}*/
136// Acquire::Remove - Remove a worker /*{{{*/
137// ---------------------------------------------------------------------
93bf083d
AL
138/* A worker has died. This can not be done while the select loop is running
139 as it would require that RunFds could handling a changing list state and
140 it cant.. */
0a8a80e5
AL
141void pkgAcquire::Remove(Worker *Work)
142{
93bf083d
AL
143 if (Running == true)
144 abort();
145
0a8a80e5
AL
146 Worker **I = &Workers;
147 for (; *I != 0;)
148 {
149 if (*I == Work)
150 *I = (*I)->NextAcquire;
151 else
152 I = &(*I)->NextAcquire;
153 }
154}
155 /*}}}*/
0118833a
AL
156// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
157// ---------------------------------------------------------------------
93bf083d 158/* This is the entry point for an item. An item calls this function when
281daf46 159 it is constructed which creates a queue (based on the current queue
93bf083d
AL
160 mode) and puts the item in that queue. If the system is running then
161 the queue might be started. */
8267fe24 162void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 163{
0a8a80e5 164 // Determine which queue to put the item in
e331f6ed
AL
165 const MethodConfig *Config;
166 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
167 if (Name.empty() == true)
168 return;
169
170 // Find the queue structure
171 Queue *I = Queues;
172 for (; I != 0 && I->Name != Name; I = I->Next);
173 if (I == 0)
174 {
175 I = new Queue(Name,this);
176 I->Next = Queues;
177 Queues = I;
93bf083d
AL
178
179 if (Running == true)
180 I->Startup();
0a8a80e5 181 }
bfd22fc0 182
e331f6ed
AL
183 // See if this is a local only URI
184 if (Config->LocalOnly == true && Item.Owner->Complete == false)
185 Item.Owner->Local = true;
8267fe24 186 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
187
188 // Queue it into the named queue
8267fe24 189 I->Enqueue(Item);
0a8a80e5 190 ToFetch++;
93bf083d 191
0a8a80e5
AL
192 // Some trace stuff
193 if (Debug == true)
194 {
8267fe24
AL
195 clog << "Fetching " << Item.URI << endl;
196 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 197 clog << " Queue is: " << Name << endl;
0a8a80e5 198 }
3b5421b4
AL
199}
200 /*}}}*/
0a8a80e5 201// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 202// ---------------------------------------------------------------------
93bf083d
AL
203/* This is called when an item is finished being fetched. It removes it
204 from all the queues */
0a8a80e5
AL
205void pkgAcquire::Dequeue(Item *Itm)
206{
207 Queue *I = Queues;
bfd22fc0 208 bool Res = false;
0a8a80e5 209 for (; I != 0; I = I->Next)
bfd22fc0 210 Res |= I->Dequeue(Itm);
93bf083d
AL
211
212 if (Debug == true)
213 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
214 if (Res == true)
215 ToFetch--;
0a8a80e5
AL
216}
217 /*}}}*/
218// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
219// ---------------------------------------------------------------------
220/* The string returned depends on the configuration settings and the
221 method parameters. Given something like http://foo.org/bar it can
222 return http://foo.org or http */
e331f6ed 223string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 224{
93bf083d
AL
225 URI U(Uri);
226
e331f6ed 227 Config = GetConfig(U.Access);
0a8a80e5
AL
228 if (Config == 0)
229 return string();
230
231 /* Single-Instance methods get exactly one queue per URI. This is
232 also used for the Access queue method */
233 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 234 return U.Access;
93bf083d
AL
235
236 return U.Access + ':' + U.Host;
0118833a
AL
237}
238 /*}}}*/
3b5421b4
AL
239// Acquire::GetConfig - Fetch the configuration information /*{{{*/
240// ---------------------------------------------------------------------
241/* This locates the configuration structure for an access method. If
242 a config structure cannot be found a Worker will be created to
243 retrieve it */
0a8a80e5 244pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
245{
246 // Search for an existing config
247 MethodConfig *Conf;
248 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
249 if (Conf->Access == Access)
250 return Conf;
251
252 // Create the new config class
253 Conf = new MethodConfig;
254 Conf->Access = Access;
255 Conf->Next = Configs;
256 Configs = Conf;
0118833a 257
3b5421b4
AL
258 // Create the worker to fetch the configuration
259 Worker Work(Conf);
260 if (Work.Start() == false)
261 return 0;
262
263 return Conf;
264}
265 /*}}}*/
0a8a80e5
AL
266// Acquire::SetFds - Deal with readable FDs /*{{{*/
267// ---------------------------------------------------------------------
268/* Collect FDs that have activity monitors into the fd sets */
269void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
270{
271 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
272 {
273 if (I->InReady == true && I->InFd >= 0)
274 {
275 if (Fd < I->InFd)
276 Fd = I->InFd;
277 FD_SET(I->InFd,RSet);
278 }
279 if (I->OutReady == true && I->OutFd >= 0)
280 {
281 if (Fd < I->OutFd)
282 Fd = I->OutFd;
283 FD_SET(I->OutFd,WSet);
284 }
285 }
286}
287 /*}}}*/
288// Acquire::RunFds - Deal with active FDs /*{{{*/
289// ---------------------------------------------------------------------
93bf083d
AL
290/* Dispatch active FDs over to the proper workers. It is very important
291 that a worker never be erased while this is running! The queue class
292 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
293void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
294{
295 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
296 {
297 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
298 I->InFdReady();
299 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
300 I->OutFdReady();
301 }
302}
303 /*}}}*/
304// Acquire::Run - Run the fetch sequence /*{{{*/
305// ---------------------------------------------------------------------
306/* This runs the queues. It manages a select loop for all of the
307 Worker tasks. The workers interact with the queues and items to
308 manage the actual fetch. */
024d1123 309pkgAcquire::RunResult pkgAcquire::Run()
0a8a80e5 310{
8b89e57f
AL
311 Running = true;
312
0a8a80e5
AL
313 for (Queue *I = Queues; I != 0; I = I->Next)
314 I->Startup();
315
b98f2859
AL
316 if (Log != 0)
317 Log->Start();
318
024d1123
AL
319 bool WasCancelled = false;
320
0a8a80e5 321 // Run till all things have been acquired
8267fe24
AL
322 struct timeval tv;
323 tv.tv_sec = 0;
324 tv.tv_usec = 500000;
0a8a80e5
AL
325 while (ToFetch > 0)
326 {
327 fd_set RFds;
328 fd_set WFds;
329 int Highest = 0;
330 FD_ZERO(&RFds);
331 FD_ZERO(&WFds);
332 SetFds(Highest,&RFds,&WFds);
333
b0db36b1
AL
334 int Res;
335 do
336 {
337 Res = select(Highest+1,&RFds,&WFds,0,&tv);
338 }
339 while (Res < 0 && errno == EINTR);
340
8267fe24 341 if (Res < 0)
8b89e57f 342 {
8267fe24
AL
343 _error->Errno("select","Select has failed");
344 break;
8b89e57f 345 }
93bf083d 346
0a8a80e5 347 RunFds(&RFds,&WFds);
93bf083d
AL
348 if (_error->PendingError() == true)
349 break;
8267fe24
AL
350
351 // Timeout, notify the log class
352 if (Res == 0 || (Log != 0 && Log->Update == true))
353 {
354 tv.tv_usec = 500000;
355 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
356 I->Pulse();
024d1123
AL
357 if (Log != 0 && Log->Pulse(this) == false)
358 {
359 WasCancelled = true;
360 break;
361 }
8267fe24 362 }
0a8a80e5 363 }
be4401bf 364
b98f2859
AL
365 if (Log != 0)
366 Log->Stop();
367
be4401bf
AL
368 // Shut down the acquire bits
369 Running = false;
0a8a80e5 370 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 371 I->Shutdown(false);
0a8a80e5 372
ab559b35 373 // Shut down the items
b4fc9b6f 374 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 375 (*I)->Finished();
ab559b35 376
024d1123
AL
377 if (_error->PendingError())
378 return Failed;
379 if (WasCancelled)
380 return Cancelled;
381 return Continue;
93bf083d
AL
382}
383 /*}}}*/
be4401bf 384// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
385// ---------------------------------------------------------------------
386/* This routine bumps idle queues in hopes that they will be able to fetch
387 the dequeued item */
388void pkgAcquire::Bump()
389{
be4401bf
AL
390 for (Queue *I = Queues; I != 0; I = I->Next)
391 I->Bump();
0a8a80e5
AL
392}
393 /*}}}*/
8267fe24
AL
394// Acquire::WorkerStep - Step to the next worker /*{{{*/
395// ---------------------------------------------------------------------
396/* Not inlined to advoid including acquire-worker.h */
397pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
398{
399 return I->NextAcquire;
400};
401 /*}}}*/
a6568219 402// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
403// ---------------------------------------------------------------------
404/* This is a bit simplistic, it looks at every file in the dir and sees
405 if it is part of the download set. */
406bool pkgAcquire::Clean(string Dir)
407{
408 DIR *D = opendir(Dir.c_str());
409 if (D == 0)
b2e465d6 410 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
411
412 string StartDir = SafeGetCWD();
413 if (chdir(Dir.c_str()) != 0)
414 {
415 closedir(D);
b2e465d6 416 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
417 }
418
419 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
420 {
421 // Skip some files..
422 if (strcmp(Dir->d_name,"lock") == 0 ||
423 strcmp(Dir->d_name,"partial") == 0 ||
424 strcmp(Dir->d_name,".") == 0 ||
425 strcmp(Dir->d_name,"..") == 0)
426 continue;
427
428 // Look in the get list
b4fc9b6f 429 ItemCIterator I = Items.begin();
7a7fa5f0
AL
430 for (; I != Items.end(); I++)
431 if (flNotDir((*I)->DestFile) == Dir->d_name)
432 break;
433
434 // Nothing found, nuke it
435 if (I == Items.end())
436 unlink(Dir->d_name);
437 };
438
439 chdir(StartDir.c_str());
440 closedir(D);
441 return true;
442}
443 /*}}}*/
a6568219
AL
444// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
445// ---------------------------------------------------------------------
446/* This is the total number of bytes needed */
b2e465d6 447double pkgAcquire::TotalNeeded()
a6568219 448{
b2e465d6 449 double Total = 0;
b4fc9b6f 450 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
451 Total += (*I)->FileSize;
452 return Total;
453}
454 /*}}}*/
455// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
456// ---------------------------------------------------------------------
457/* This is the number of bytes that is not local */
b2e465d6 458double pkgAcquire::FetchNeeded()
a6568219 459{
b2e465d6 460 double Total = 0;
b4fc9b6f 461 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
462 if ((*I)->Local == false)
463 Total += (*I)->FileSize;
464 return Total;
465}
466 /*}}}*/
6b1ff003
AL
467// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
468// ---------------------------------------------------------------------
469/* This is the number of bytes that is not local */
b2e465d6 470double pkgAcquire::PartialPresent()
6b1ff003 471{
b2e465d6 472 double Total = 0;
b4fc9b6f 473 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
474 if ((*I)->Local == false)
475 Total += (*I)->PartialSize;
476 return Total;
477}
478 /*}}}*/
8e5fc8f5 479// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
480// ---------------------------------------------------------------------
481/* */
482pkgAcquire::UriIterator pkgAcquire::UriBegin()
483{
484 return UriIterator(Queues);
485}
486 /*}}}*/
8e5fc8f5 487// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
488// ---------------------------------------------------------------------
489/* */
490pkgAcquire::UriIterator pkgAcquire::UriEnd()
491{
492 return UriIterator(0);
493}
494 /*}}}*/
0a8a80e5 495
e331f6ed
AL
496// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
497// ---------------------------------------------------------------------
498/* */
499pkgAcquire::MethodConfig::MethodConfig()
500{
501 SingleInstance = false;
e331f6ed
AL
502 Pipeline = false;
503 SendConfig = false;
504 LocalOnly = false;
459681d3 505 Removable = false;
e331f6ed
AL
506 Next = 0;
507}
508 /*}}}*/
509
0a8a80e5
AL
510// Queue::Queue - Constructor /*{{{*/
511// ---------------------------------------------------------------------
512/* */
513pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
514 Owner(Owner)
515{
516 Items = 0;
517 Next = 0;
518 Workers = 0;
b185acc2
AL
519 MaxPipeDepth = 1;
520 PipeDepth = 0;
0a8a80e5
AL
521}
522 /*}}}*/
523// Queue::~Queue - Destructor /*{{{*/
524// ---------------------------------------------------------------------
525/* */
526pkgAcquire::Queue::~Queue()
527{
8e5fc8f5 528 Shutdown(true);
0a8a80e5
AL
529
530 while (Items != 0)
531 {
532 QItem *Jnk = Items;
533 Items = Items->Next;
534 delete Jnk;
535 }
536}
537 /*}}}*/
538// Queue::Enqueue - Queue an item to the queue /*{{{*/
539// ---------------------------------------------------------------------
540/* */
8267fe24 541void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 542{
7a1b1f8b
AL
543 QItem **I = &Items;
544 for (; *I != 0; I = &(*I)->Next);
545
0a8a80e5 546 // Create a new item
7a1b1f8b
AL
547 QItem *Itm = new QItem;
548 *Itm = Item;
549 Itm->Next = 0;
550 *I = Itm;
0a8a80e5 551
8267fe24 552 Item.Owner->QueueCounter++;
93bf083d
AL
553 if (Items->Next == 0)
554 Cycle();
0a8a80e5
AL
555}
556 /*}}}*/
c88edf1d 557// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 558// ---------------------------------------------------------------------
b185acc2 559/* We return true if we hit something */
bfd22fc0 560bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 561{
b185acc2
AL
562 if (Owner->Status == pkgAcquire::Item::StatFetching)
563 return _error->Error("Tried to dequeue a fetching object");
564
bfd22fc0
AL
565 bool Res = false;
566
0a8a80e5
AL
567 QItem **I = &Items;
568 for (; *I != 0;)
569 {
570 if ((*I)->Owner == Owner)
571 {
572 QItem *Jnk= *I;
573 *I = (*I)->Next;
574 Owner->QueueCounter--;
575 delete Jnk;
bfd22fc0 576 Res = true;
0a8a80e5
AL
577 }
578 else
579 I = &(*I)->Next;
580 }
bfd22fc0
AL
581
582 return Res;
0a8a80e5
AL
583}
584 /*}}}*/
585// Queue::Startup - Start the worker processes /*{{{*/
586// ---------------------------------------------------------------------
8e5fc8f5
AL
587/* It is possible for this to be called with a pre-existing set of
588 workers. */
0a8a80e5
AL
589bool pkgAcquire::Queue::Startup()
590{
8e5fc8f5
AL
591 if (Workers == 0)
592 {
593 URI U(Name);
594 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
595 if (Cnf == 0)
596 return false;
597
598 Workers = new Worker(this,Cnf,Owner->Log);
599 Owner->Add(Workers);
600 if (Workers->Start() == false)
601 return false;
602
603 /* When pipelining we commit 10 items. This needs to change when we
604 added other source retry to have cycle maintain a pipeline depth
605 on its own. */
606 if (Cnf->Pipeline == true)
607 MaxPipeDepth = 10;
608 else
609 MaxPipeDepth = 1;
610 }
5cb5d8dc 611
93bf083d 612 return Cycle();
0a8a80e5
AL
613}
614 /*}}}*/
615// Queue::Shutdown - Shutdown the worker processes /*{{{*/
616// ---------------------------------------------------------------------
8e5fc8f5
AL
617/* If final is true then all workers are eliminated, otherwise only workers
618 that do not need cleanup are removed */
619bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
620{
621 // Delete all of the workers
8e5fc8f5
AL
622 pkgAcquire::Worker **Cur = &Workers;
623 while (*Cur != 0)
0a8a80e5 624 {
8e5fc8f5
AL
625 pkgAcquire::Worker *Jnk = *Cur;
626 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
627 {
628 *Cur = Jnk->NextQueue;
629 Owner->Remove(Jnk);
630 delete Jnk;
631 }
632 else
633 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
634 }
635
636 return true;
3b5421b4
AL
637}
638 /*}}}*/
7d8afa39 639// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
640// ---------------------------------------------------------------------
641/* */
642pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
643{
644 for (QItem *I = Items; I != 0; I = I->Next)
645 if (I->URI == URI && I->Worker == Owner)
646 return I;
647 return 0;
648}
649 /*}}}*/
650// Queue::ItemDone - Item has been completed /*{{{*/
651// ---------------------------------------------------------------------
652/* The worker signals this which causes the item to be removed from the
93bf083d
AL
653 queue. If this is the last queue instance then it is removed from the
654 main queue too.*/
c88edf1d
AL
655bool pkgAcquire::Queue::ItemDone(QItem *Itm)
656{
b185acc2 657 PipeDepth--;
db890fdb
AL
658 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
659 Itm->Owner->Status = pkgAcquire::Item::StatDone;
660
93bf083d
AL
661 if (Itm->Owner->QueueCounter <= 1)
662 Owner->Dequeue(Itm->Owner);
663 else
664 {
665 Dequeue(Itm->Owner);
666 Owner->Bump();
667 }
c88edf1d 668
93bf083d
AL
669 return Cycle();
670}
671 /*}}}*/
672// Queue::Cycle - Queue new items into the method /*{{{*/
673// ---------------------------------------------------------------------
b185acc2
AL
674/* This locates a new idle item and sends it to the worker. If pipelining
675 is enabled then it keeps the pipe full. */
93bf083d
AL
676bool pkgAcquire::Queue::Cycle()
677{
678 if (Items == 0 || Workers == 0)
c88edf1d
AL
679 return true;
680
e7432370
AL
681 if (PipeDepth < 0)
682 return _error->Error("Pipedepth failure");
683
93bf083d
AL
684 // Look for a queable item
685 QItem *I = Items;
e7432370 686 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
687 {
688 for (; I != 0; I = I->Next)
689 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
690 break;
691
692 // Nothing to do, queue is idle.
693 if (I == 0)
694 return true;
695
696 I->Worker = Workers;
697 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 698 PipeDepth++;
b185acc2
AL
699 if (Workers->QueueItem(I) == false)
700 return false;
701 }
93bf083d 702
b185acc2 703 return true;
c88edf1d
AL
704}
705 /*}}}*/
be4401bf
AL
706// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
707// ---------------------------------------------------------------------
b185acc2 708/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
709void pkgAcquire::Queue::Bump()
710{
b185acc2 711 Cycle();
be4401bf
AL
712}
713 /*}}}*/
b98f2859
AL
714
715// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
716// ---------------------------------------------------------------------
717/* */
c5ccf175 718pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
719{
720 Start();
721}
722 /*}}}*/
723// AcquireStatus::Pulse - Called periodically /*{{{*/
724// ---------------------------------------------------------------------
725/* This computes some internal state variables for the derived classes to
726 use. It generates the current downloaded bytes and total bytes to download
727 as well as the current CPS estimate. */
024d1123 728bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
729{
730 TotalBytes = 0;
731 CurrentBytes = 0;
d568ed2d
AL
732 TotalItems = 0;
733 CurrentItems = 0;
b98f2859
AL
734
735 // Compute the total number of bytes to fetch
736 unsigned int Unknown = 0;
737 unsigned int Count = 0;
b4fc9b6f 738 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
739 I++, Count++)
740 {
d568ed2d
AL
741 TotalItems++;
742 if ((*I)->Status == pkgAcquire::Item::StatDone)
743 CurrentItems++;
744
a6568219
AL
745 // Totally ignore local items
746 if ((*I)->Local == true)
747 continue;
b2e465d6 748
b98f2859
AL
749 TotalBytes += (*I)->FileSize;
750 if ((*I)->Complete == true)
751 CurrentBytes += (*I)->FileSize;
752 if ((*I)->FileSize == 0 && (*I)->Complete == false)
753 Unknown++;
754 }
755
756 // Compute the current completion
aa0e1101 757 unsigned long ResumeSize = 0;
b98f2859
AL
758 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
759 I = Owner->WorkerStep(I))
760 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
761 {
762 CurrentBytes += I->CurrentSize;
763 ResumeSize += I->ResumePoint;
764
765 // Files with unknown size always have 100% completion
766 if (I->CurrentItem->Owner->FileSize == 0 &&
767 I->CurrentItem->Owner->Complete == false)
768 TotalBytes += I->CurrentSize;
769 }
770
b98f2859
AL
771 // Normalize the figures and account for unknown size downloads
772 if (TotalBytes <= 0)
773 TotalBytes = 1;
774 if (Unknown == Count)
775 TotalBytes = Unknown;
18ef0a78
AL
776
777 // Wha?! Is not supposed to happen.
778 if (CurrentBytes > TotalBytes)
779 CurrentBytes = TotalBytes;
b98f2859
AL
780
781 // Compute the CPS
782 struct timeval NewTime;
783 gettimeofday(&NewTime,0);
784 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
785 NewTime.tv_sec - Time.tv_sec > 6)
786 {
f17ac097
AL
787 double Delta = NewTime.tv_sec - Time.tv_sec +
788 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 789
b98f2859 790 // Compute the CPS value
f17ac097 791 if (Delta < 0.01)
e331f6ed
AL
792 CurrentCPS = 0;
793 else
aa0e1101
AL
794 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
795 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 796 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
797 Time = NewTime;
798 }
024d1123
AL
799
800 return true;
b98f2859
AL
801}
802 /*}}}*/
803// AcquireStatus::Start - Called when the download is started /*{{{*/
804// ---------------------------------------------------------------------
805/* We just reset the counters */
806void pkgAcquireStatus::Start()
807{
808 gettimeofday(&Time,0);
809 gettimeofday(&StartTime,0);
810 LastBytes = 0;
811 CurrentCPS = 0;
812 CurrentBytes = 0;
813 TotalBytes = 0;
814 FetchedBytes = 0;
815 ElapsedTime = 0;
d568ed2d
AL
816 TotalItems = 0;
817 CurrentItems = 0;
b98f2859
AL
818}
819 /*}}}*/
a6568219 820// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
821// ---------------------------------------------------------------------
822/* This accurately computes the elapsed time and the total overall CPS. */
823void pkgAcquireStatus::Stop()
824{
825 // Compute the CPS and elapsed time
826 struct timeval NewTime;
827 gettimeofday(&NewTime,0);
828
31a0531d
AL
829 double Delta = NewTime.tv_sec - StartTime.tv_sec +
830 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 831
b98f2859 832 // Compute the CPS value
31a0531d 833 if (Delta < 0.01)
e331f6ed
AL
834 CurrentCPS = 0;
835 else
31a0531d 836 CurrentCPS = FetchedBytes/Delta;
b98f2859 837 LastBytes = CurrentBytes;
31a0531d 838 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
839}
840 /*}}}*/
841// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
842// ---------------------------------------------------------------------
843/* This is used to get accurate final transfer rate reporting. */
844void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 845{
b98f2859
AL
846 FetchedBytes += Size - Resume;
847}
848 /*}}}*/