G++3 fixes from Randolph
[ntk/apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
b2e465d6 3// $Id: acquire.cc,v 1.47 2001/02/20 07:03:17 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
b2e465d6
AL
26#include <apti18n.h>
27
7a7fa5f0 28#include <dirent.h>
8267fe24 29#include <sys/time.h>
524f8105 30#include <errno.h>
5f3edc0f 31#include <sys/stat.h>
0118833a
AL
32 /*}}}*/
33
34// Acquire::pkgAcquire - Constructor /*{{{*/
35// ---------------------------------------------------------------------
93bf083d 36/* We grab some runtime state from the configuration space */
8267fe24 37pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
38{
39 Queues = 0;
40 Configs = 0;
0a8a80e5
AL
41 Workers = 0;
42 ToFetch = 0;
8b89e57f 43 Running = false;
0a8a80e5
AL
44
45 string Mode = _config->Find("Acquire::Queue-Mode","host");
46 if (strcasecmp(Mode.c_str(),"host") == 0)
47 QueueMode = QueueHost;
48 if (strcasecmp(Mode.c_str(),"access") == 0)
49 QueueMode = QueueAccess;
50
51 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 52
0c55d1d2 53 // This is really a stupid place for this
5f3edc0f
AL
54 struct stat St;
55 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
56 S_ISDIR(St.st_mode) == 0)
b2e465d6 57 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
58 _config->FindDir("Dir::State::lists").c_str());
59 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
60 S_ISDIR(St.st_mode) == 0)
b2e465d6 61 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 62 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
63}
64 /*}}}*/
65// Acquire::~pkgAcquire - Destructor /*{{{*/
66// ---------------------------------------------------------------------
93bf083d 67/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
68pkgAcquire::~pkgAcquire()
69{
459681d3
AL
70 Shutdown();
71
3b5421b4
AL
72 while (Configs != 0)
73 {
74 MethodConfig *Jnk = Configs;
75 Configs = Configs->Next;
76 delete Jnk;
77 }
281daf46
AL
78}
79 /*}}}*/
8e5fc8f5 80// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
81// ---------------------------------------------------------------------
82/* */
83void pkgAcquire::Shutdown()
84{
85 while (Items.size() != 0)
86 delete Items[0];
0a8a80e5
AL
87
88 while (Queues != 0)
89 {
90 Queue *Jnk = Queues;
91 Queues = Queues->Next;
92 delete Jnk;
93 }
0118833a
AL
94}
95 /*}}}*/
96// Acquire::Add - Add a new item /*{{{*/
97// ---------------------------------------------------------------------
93bf083d
AL
98/* This puts an item on the acquire list. This list is mainly for tracking
99 item status */
0118833a
AL
100void pkgAcquire::Add(Item *Itm)
101{
102 Items.push_back(Itm);
103}
104 /*}}}*/
105// Acquire::Remove - Remove a item /*{{{*/
106// ---------------------------------------------------------------------
93bf083d 107/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
108void pkgAcquire::Remove(Item *Itm)
109{
a3eaf954
AL
110 Dequeue(Itm);
111
0118833a
AL
112 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
113 {
114 if (*I == Itm)
115 Items.erase(I);
8267fe24 116 }
0118833a
AL
117}
118 /*}}}*/
0a8a80e5
AL
119// Acquire::Add - Add a worker /*{{{*/
120// ---------------------------------------------------------------------
93bf083d
AL
121/* A list of workers is kept so that the select loop can direct their FD
122 usage. */
0a8a80e5
AL
123void pkgAcquire::Add(Worker *Work)
124{
125 Work->NextAcquire = Workers;
126 Workers = Work;
127}
128 /*}}}*/
129// Acquire::Remove - Remove a worker /*{{{*/
130// ---------------------------------------------------------------------
93bf083d
AL
131/* A worker has died. This can not be done while the select loop is running
132 as it would require that RunFds could handling a changing list state and
133 it cant.. */
0a8a80e5
AL
134void pkgAcquire::Remove(Worker *Work)
135{
93bf083d
AL
136 if (Running == true)
137 abort();
138
0a8a80e5
AL
139 Worker **I = &Workers;
140 for (; *I != 0;)
141 {
142 if (*I == Work)
143 *I = (*I)->NextAcquire;
144 else
145 I = &(*I)->NextAcquire;
146 }
147}
148 /*}}}*/
0118833a
AL
149// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
150// ---------------------------------------------------------------------
93bf083d 151/* This is the entry point for an item. An item calls this function when
281daf46 152 it is constructed which creates a queue (based on the current queue
93bf083d
AL
153 mode) and puts the item in that queue. If the system is running then
154 the queue might be started. */
8267fe24 155void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 156{
0a8a80e5 157 // Determine which queue to put the item in
e331f6ed
AL
158 const MethodConfig *Config;
159 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
160 if (Name.empty() == true)
161 return;
162
163 // Find the queue structure
164 Queue *I = Queues;
165 for (; I != 0 && I->Name != Name; I = I->Next);
166 if (I == 0)
167 {
168 I = new Queue(Name,this);
169 I->Next = Queues;
170 Queues = I;
93bf083d
AL
171
172 if (Running == true)
173 I->Startup();
0a8a80e5 174 }
bfd22fc0 175
e331f6ed
AL
176 // See if this is a local only URI
177 if (Config->LocalOnly == true && Item.Owner->Complete == false)
178 Item.Owner->Local = true;
8267fe24 179 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
180
181 // Queue it into the named queue
8267fe24 182 I->Enqueue(Item);
0a8a80e5 183 ToFetch++;
93bf083d 184
0a8a80e5
AL
185 // Some trace stuff
186 if (Debug == true)
187 {
8267fe24
AL
188 clog << "Fetching " << Item.URI << endl;
189 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 190 clog << " Queue is: " << Name << endl;
0a8a80e5 191 }
3b5421b4
AL
192}
193 /*}}}*/
0a8a80e5 194// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 195// ---------------------------------------------------------------------
93bf083d
AL
196/* This is called when an item is finished being fetched. It removes it
197 from all the queues */
0a8a80e5
AL
198void pkgAcquire::Dequeue(Item *Itm)
199{
200 Queue *I = Queues;
bfd22fc0 201 bool Res = false;
0a8a80e5 202 for (; I != 0; I = I->Next)
bfd22fc0 203 Res |= I->Dequeue(Itm);
93bf083d
AL
204
205 if (Debug == true)
206 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
207 if (Res == true)
208 ToFetch--;
0a8a80e5
AL
209}
210 /*}}}*/
211// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
212// ---------------------------------------------------------------------
213/* The string returned depends on the configuration settings and the
214 method parameters. Given something like http://foo.org/bar it can
215 return http://foo.org or http */
e331f6ed 216string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 217{
93bf083d
AL
218 URI U(Uri);
219
e331f6ed 220 Config = GetConfig(U.Access);
0a8a80e5
AL
221 if (Config == 0)
222 return string();
223
224 /* Single-Instance methods get exactly one queue per URI. This is
225 also used for the Access queue method */
226 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 227 return U.Access;
93bf083d
AL
228
229 return U.Access + ':' + U.Host;
0118833a
AL
230}
231 /*}}}*/
3b5421b4
AL
232// Acquire::GetConfig - Fetch the configuration information /*{{{*/
233// ---------------------------------------------------------------------
234/* This locates the configuration structure for an access method. If
235 a config structure cannot be found a Worker will be created to
236 retrieve it */
0a8a80e5 237pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
238{
239 // Search for an existing config
240 MethodConfig *Conf;
241 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
242 if (Conf->Access == Access)
243 return Conf;
244
245 // Create the new config class
246 Conf = new MethodConfig;
247 Conf->Access = Access;
248 Conf->Next = Configs;
249 Configs = Conf;
0118833a 250
3b5421b4
AL
251 // Create the worker to fetch the configuration
252 Worker Work(Conf);
253 if (Work.Start() == false)
254 return 0;
255
256 return Conf;
257}
258 /*}}}*/
0a8a80e5
AL
259// Acquire::SetFds - Deal with readable FDs /*{{{*/
260// ---------------------------------------------------------------------
261/* Collect FDs that have activity monitors into the fd sets */
262void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
263{
264 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
265 {
266 if (I->InReady == true && I->InFd >= 0)
267 {
268 if (Fd < I->InFd)
269 Fd = I->InFd;
270 FD_SET(I->InFd,RSet);
271 }
272 if (I->OutReady == true && I->OutFd >= 0)
273 {
274 if (Fd < I->OutFd)
275 Fd = I->OutFd;
276 FD_SET(I->OutFd,WSet);
277 }
278 }
279}
280 /*}}}*/
281// Acquire::RunFds - Deal with active FDs /*{{{*/
282// ---------------------------------------------------------------------
93bf083d
AL
283/* Dispatch active FDs over to the proper workers. It is very important
284 that a worker never be erased while this is running! The queue class
285 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
286void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
287{
288 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
289 {
290 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
291 I->InFdReady();
292 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
293 I->OutFdReady();
294 }
295}
296 /*}}}*/
297// Acquire::Run - Run the fetch sequence /*{{{*/
298// ---------------------------------------------------------------------
299/* This runs the queues. It manages a select loop for all of the
300 Worker tasks. The workers interact with the queues and items to
301 manage the actual fetch. */
024d1123 302pkgAcquire::RunResult pkgAcquire::Run()
0a8a80e5 303{
8b89e57f
AL
304 Running = true;
305
0a8a80e5
AL
306 for (Queue *I = Queues; I != 0; I = I->Next)
307 I->Startup();
308
b98f2859
AL
309 if (Log != 0)
310 Log->Start();
311
024d1123
AL
312 bool WasCancelled = false;
313
0a8a80e5 314 // Run till all things have been acquired
8267fe24
AL
315 struct timeval tv;
316 tv.tv_sec = 0;
317 tv.tv_usec = 500000;
0a8a80e5
AL
318 while (ToFetch > 0)
319 {
320 fd_set RFds;
321 fd_set WFds;
322 int Highest = 0;
323 FD_ZERO(&RFds);
324 FD_ZERO(&WFds);
325 SetFds(Highest,&RFds,&WFds);
326
b0db36b1
AL
327 int Res;
328 do
329 {
330 Res = select(Highest+1,&RFds,&WFds,0,&tv);
331 }
332 while (Res < 0 && errno == EINTR);
333
8267fe24 334 if (Res < 0)
8b89e57f 335 {
8267fe24
AL
336 _error->Errno("select","Select has failed");
337 break;
8b89e57f 338 }
93bf083d 339
0a8a80e5 340 RunFds(&RFds,&WFds);
93bf083d
AL
341 if (_error->PendingError() == true)
342 break;
8267fe24
AL
343
344 // Timeout, notify the log class
345 if (Res == 0 || (Log != 0 && Log->Update == true))
346 {
347 tv.tv_usec = 500000;
348 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
349 I->Pulse();
024d1123
AL
350 if (Log != 0 && Log->Pulse(this) == false)
351 {
352 WasCancelled = true;
353 break;
354 }
8267fe24 355 }
0a8a80e5 356 }
be4401bf 357
b98f2859
AL
358 if (Log != 0)
359 Log->Stop();
360
be4401bf
AL
361 // Shut down the acquire bits
362 Running = false;
0a8a80e5 363 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 364 I->Shutdown(false);
0a8a80e5 365
ab559b35
AL
366 // Shut down the items
367 for (Item **I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 368 (*I)->Finished();
ab559b35 369
024d1123
AL
370 if (_error->PendingError())
371 return Failed;
372 if (WasCancelled)
373 return Cancelled;
374 return Continue;
93bf083d
AL
375}
376 /*}}}*/
be4401bf 377// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
378// ---------------------------------------------------------------------
379/* This routine bumps idle queues in hopes that they will be able to fetch
380 the dequeued item */
381void pkgAcquire::Bump()
382{
be4401bf
AL
383 for (Queue *I = Queues; I != 0; I = I->Next)
384 I->Bump();
0a8a80e5
AL
385}
386 /*}}}*/
8267fe24
AL
387// Acquire::WorkerStep - Step to the next worker /*{{{*/
388// ---------------------------------------------------------------------
389/* Not inlined to advoid including acquire-worker.h */
390pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
391{
392 return I->NextAcquire;
393};
394 /*}}}*/
a6568219 395// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
396// ---------------------------------------------------------------------
397/* This is a bit simplistic, it looks at every file in the dir and sees
398 if it is part of the download set. */
399bool pkgAcquire::Clean(string Dir)
400{
401 DIR *D = opendir(Dir.c_str());
402 if (D == 0)
b2e465d6 403 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
404
405 string StartDir = SafeGetCWD();
406 if (chdir(Dir.c_str()) != 0)
407 {
408 closedir(D);
b2e465d6 409 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
410 }
411
412 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
413 {
414 // Skip some files..
415 if (strcmp(Dir->d_name,"lock") == 0 ||
416 strcmp(Dir->d_name,"partial") == 0 ||
417 strcmp(Dir->d_name,".") == 0 ||
418 strcmp(Dir->d_name,"..") == 0)
419 continue;
420
421 // Look in the get list
422 vector<Item *>::iterator I = Items.begin();
423 for (; I != Items.end(); I++)
424 if (flNotDir((*I)->DestFile) == Dir->d_name)
425 break;
426
427 // Nothing found, nuke it
428 if (I == Items.end())
429 unlink(Dir->d_name);
430 };
431
432 chdir(StartDir.c_str());
433 closedir(D);
434 return true;
435}
436 /*}}}*/
a6568219
AL
437// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
438// ---------------------------------------------------------------------
439/* This is the total number of bytes needed */
b2e465d6 440double pkgAcquire::TotalNeeded()
a6568219 441{
b2e465d6 442 double Total = 0;
a6568219
AL
443 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
444 Total += (*I)->FileSize;
445 return Total;
446}
447 /*}}}*/
448// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
449// ---------------------------------------------------------------------
450/* This is the number of bytes that is not local */
b2e465d6 451double pkgAcquire::FetchNeeded()
a6568219 452{
b2e465d6 453 double Total = 0;
a6568219
AL
454 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
455 if ((*I)->Local == false)
456 Total += (*I)->FileSize;
457 return Total;
458}
459 /*}}}*/
6b1ff003
AL
460// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
461// ---------------------------------------------------------------------
462/* This is the number of bytes that is not local */
b2e465d6 463double pkgAcquire::PartialPresent()
6b1ff003 464{
b2e465d6 465 double Total = 0;
6b1ff003
AL
466 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
467 if ((*I)->Local == false)
468 Total += (*I)->PartialSize;
469 return Total;
470}
471 /*}}}*/
8e5fc8f5 472// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
473// ---------------------------------------------------------------------
474/* */
475pkgAcquire::UriIterator pkgAcquire::UriBegin()
476{
477 return UriIterator(Queues);
478}
479 /*}}}*/
8e5fc8f5 480// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
481// ---------------------------------------------------------------------
482/* */
483pkgAcquire::UriIterator pkgAcquire::UriEnd()
484{
485 return UriIterator(0);
486}
487 /*}}}*/
0a8a80e5 488
e331f6ed
AL
489// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
490// ---------------------------------------------------------------------
491/* */
492pkgAcquire::MethodConfig::MethodConfig()
493{
494 SingleInstance = false;
e331f6ed
AL
495 Pipeline = false;
496 SendConfig = false;
497 LocalOnly = false;
459681d3 498 Removable = false;
e331f6ed
AL
499 Next = 0;
500}
501 /*}}}*/
502
0a8a80e5
AL
503// Queue::Queue - Constructor /*{{{*/
504// ---------------------------------------------------------------------
505/* */
506pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
507 Owner(Owner)
508{
509 Items = 0;
510 Next = 0;
511 Workers = 0;
b185acc2
AL
512 MaxPipeDepth = 1;
513 PipeDepth = 0;
0a8a80e5
AL
514}
515 /*}}}*/
516// Queue::~Queue - Destructor /*{{{*/
517// ---------------------------------------------------------------------
518/* */
519pkgAcquire::Queue::~Queue()
520{
8e5fc8f5 521 Shutdown(true);
0a8a80e5
AL
522
523 while (Items != 0)
524 {
525 QItem *Jnk = Items;
526 Items = Items->Next;
527 delete Jnk;
528 }
529}
530 /*}}}*/
531// Queue::Enqueue - Queue an item to the queue /*{{{*/
532// ---------------------------------------------------------------------
533/* */
8267fe24 534void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 535{
7a1b1f8b
AL
536 QItem **I = &Items;
537 for (; *I != 0; I = &(*I)->Next);
538
0a8a80e5 539 // Create a new item
7a1b1f8b
AL
540 QItem *Itm = new QItem;
541 *Itm = Item;
542 Itm->Next = 0;
543 *I = Itm;
0a8a80e5 544
8267fe24 545 Item.Owner->QueueCounter++;
93bf083d
AL
546 if (Items->Next == 0)
547 Cycle();
0a8a80e5
AL
548}
549 /*}}}*/
c88edf1d 550// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 551// ---------------------------------------------------------------------
b185acc2 552/* We return true if we hit something */
bfd22fc0 553bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 554{
b185acc2
AL
555 if (Owner->Status == pkgAcquire::Item::StatFetching)
556 return _error->Error("Tried to dequeue a fetching object");
557
bfd22fc0
AL
558 bool Res = false;
559
0a8a80e5
AL
560 QItem **I = &Items;
561 for (; *I != 0;)
562 {
563 if ((*I)->Owner == Owner)
564 {
565 QItem *Jnk= *I;
566 *I = (*I)->Next;
567 Owner->QueueCounter--;
568 delete Jnk;
bfd22fc0 569 Res = true;
0a8a80e5
AL
570 }
571 else
572 I = &(*I)->Next;
573 }
bfd22fc0
AL
574
575 return Res;
0a8a80e5
AL
576}
577 /*}}}*/
578// Queue::Startup - Start the worker processes /*{{{*/
579// ---------------------------------------------------------------------
8e5fc8f5
AL
580/* It is possible for this to be called with a pre-existing set of
581 workers. */
0a8a80e5
AL
582bool pkgAcquire::Queue::Startup()
583{
8e5fc8f5
AL
584 if (Workers == 0)
585 {
586 URI U(Name);
587 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
588 if (Cnf == 0)
589 return false;
590
591 Workers = new Worker(this,Cnf,Owner->Log);
592 Owner->Add(Workers);
593 if (Workers->Start() == false)
594 return false;
595
596 /* When pipelining we commit 10 items. This needs to change when we
597 added other source retry to have cycle maintain a pipeline depth
598 on its own. */
599 if (Cnf->Pipeline == true)
600 MaxPipeDepth = 10;
601 else
602 MaxPipeDepth = 1;
603 }
5cb5d8dc 604
93bf083d 605 return Cycle();
0a8a80e5
AL
606}
607 /*}}}*/
608// Queue::Shutdown - Shutdown the worker processes /*{{{*/
609// ---------------------------------------------------------------------
8e5fc8f5
AL
610/* If final is true then all workers are eliminated, otherwise only workers
611 that do not need cleanup are removed */
612bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
613{
614 // Delete all of the workers
8e5fc8f5
AL
615 pkgAcquire::Worker **Cur = &Workers;
616 while (*Cur != 0)
0a8a80e5 617 {
8e5fc8f5
AL
618 pkgAcquire::Worker *Jnk = *Cur;
619 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
620 {
621 *Cur = Jnk->NextQueue;
622 Owner->Remove(Jnk);
623 delete Jnk;
624 }
625 else
626 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
627 }
628
629 return true;
3b5421b4
AL
630}
631 /*}}}*/
7d8afa39 632// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
633// ---------------------------------------------------------------------
634/* */
635pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
636{
637 for (QItem *I = Items; I != 0; I = I->Next)
638 if (I->URI == URI && I->Worker == Owner)
639 return I;
640 return 0;
641}
642 /*}}}*/
643// Queue::ItemDone - Item has been completed /*{{{*/
644// ---------------------------------------------------------------------
645/* The worker signals this which causes the item to be removed from the
93bf083d
AL
646 queue. If this is the last queue instance then it is removed from the
647 main queue too.*/
c88edf1d
AL
648bool pkgAcquire::Queue::ItemDone(QItem *Itm)
649{
b185acc2 650 PipeDepth--;
db890fdb
AL
651 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
652 Itm->Owner->Status = pkgAcquire::Item::StatDone;
653
93bf083d
AL
654 if (Itm->Owner->QueueCounter <= 1)
655 Owner->Dequeue(Itm->Owner);
656 else
657 {
658 Dequeue(Itm->Owner);
659 Owner->Bump();
660 }
c88edf1d 661
93bf083d
AL
662 return Cycle();
663}
664 /*}}}*/
665// Queue::Cycle - Queue new items into the method /*{{{*/
666// ---------------------------------------------------------------------
b185acc2
AL
667/* This locates a new idle item and sends it to the worker. If pipelining
668 is enabled then it keeps the pipe full. */
93bf083d
AL
669bool pkgAcquire::Queue::Cycle()
670{
671 if (Items == 0 || Workers == 0)
c88edf1d
AL
672 return true;
673
e7432370
AL
674 if (PipeDepth < 0)
675 return _error->Error("Pipedepth failure");
676
93bf083d
AL
677 // Look for a queable item
678 QItem *I = Items;
e7432370 679 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
680 {
681 for (; I != 0; I = I->Next)
682 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
683 break;
684
685 // Nothing to do, queue is idle.
686 if (I == 0)
687 return true;
688
689 I->Worker = Workers;
690 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 691 PipeDepth++;
b185acc2
AL
692 if (Workers->QueueItem(I) == false)
693 return false;
694 }
93bf083d 695
b185acc2 696 return true;
c88edf1d
AL
697}
698 /*}}}*/
be4401bf
AL
699// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
700// ---------------------------------------------------------------------
b185acc2 701/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
702void pkgAcquire::Queue::Bump()
703{
b185acc2 704 Cycle();
be4401bf
AL
705}
706 /*}}}*/
b98f2859
AL
707
708// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
709// ---------------------------------------------------------------------
710/* */
c5ccf175 711pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
712{
713 Start();
714}
715 /*}}}*/
716// AcquireStatus::Pulse - Called periodically /*{{{*/
717// ---------------------------------------------------------------------
718/* This computes some internal state variables for the derived classes to
719 use. It generates the current downloaded bytes and total bytes to download
720 as well as the current CPS estimate. */
024d1123 721bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
722{
723 TotalBytes = 0;
724 CurrentBytes = 0;
d568ed2d
AL
725 TotalItems = 0;
726 CurrentItems = 0;
b98f2859
AL
727
728 // Compute the total number of bytes to fetch
729 unsigned int Unknown = 0;
730 unsigned int Count = 0;
731 for (pkgAcquire::Item **I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
732 I++, Count++)
733 {
d568ed2d
AL
734 TotalItems++;
735 if ((*I)->Status == pkgAcquire::Item::StatDone)
736 CurrentItems++;
737
a6568219
AL
738 // Totally ignore local items
739 if ((*I)->Local == true)
740 continue;
b2e465d6 741
b98f2859
AL
742 TotalBytes += (*I)->FileSize;
743 if ((*I)->Complete == true)
744 CurrentBytes += (*I)->FileSize;
745 if ((*I)->FileSize == 0 && (*I)->Complete == false)
746 Unknown++;
747 }
748
749 // Compute the current completion
aa0e1101 750 unsigned long ResumeSize = 0;
b98f2859
AL
751 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
752 I = Owner->WorkerStep(I))
753 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
754 {
755 CurrentBytes += I->CurrentSize;
756 ResumeSize += I->ResumePoint;
757
758 // Files with unknown size always have 100% completion
759 if (I->CurrentItem->Owner->FileSize == 0 &&
760 I->CurrentItem->Owner->Complete == false)
761 TotalBytes += I->CurrentSize;
762 }
763
b98f2859
AL
764 // Normalize the figures and account for unknown size downloads
765 if (TotalBytes <= 0)
766 TotalBytes = 1;
767 if (Unknown == Count)
768 TotalBytes = Unknown;
18ef0a78
AL
769
770 // Wha?! Is not supposed to happen.
771 if (CurrentBytes > TotalBytes)
772 CurrentBytes = TotalBytes;
b98f2859
AL
773
774 // Compute the CPS
775 struct timeval NewTime;
776 gettimeofday(&NewTime,0);
777 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
778 NewTime.tv_sec - Time.tv_sec > 6)
779 {
f17ac097
AL
780 double Delta = NewTime.tv_sec - Time.tv_sec +
781 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 782
b98f2859 783 // Compute the CPS value
f17ac097 784 if (Delta < 0.01)
e331f6ed
AL
785 CurrentCPS = 0;
786 else
aa0e1101
AL
787 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
788 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 789 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
790 Time = NewTime;
791 }
024d1123
AL
792
793 return true;
b98f2859
AL
794}
795 /*}}}*/
796// AcquireStatus::Start - Called when the download is started /*{{{*/
797// ---------------------------------------------------------------------
798/* We just reset the counters */
799void pkgAcquireStatus::Start()
800{
801 gettimeofday(&Time,0);
802 gettimeofday(&StartTime,0);
803 LastBytes = 0;
804 CurrentCPS = 0;
805 CurrentBytes = 0;
806 TotalBytes = 0;
807 FetchedBytes = 0;
808 ElapsedTime = 0;
d568ed2d
AL
809 TotalItems = 0;
810 CurrentItems = 0;
b98f2859
AL
811}
812 /*}}}*/
a6568219 813// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
814// ---------------------------------------------------------------------
815/* This accurately computes the elapsed time and the total overall CPS. */
816void pkgAcquireStatus::Stop()
817{
818 // Compute the CPS and elapsed time
819 struct timeval NewTime;
820 gettimeofday(&NewTime,0);
821
31a0531d
AL
822 double Delta = NewTime.tv_sec - StartTime.tv_sec +
823 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 824
b98f2859 825 // Compute the CPS value
31a0531d 826 if (Delta < 0.01)
e331f6ed
AL
827 CurrentCPS = 0;
828 else
31a0531d 829 CurrentCPS = FetchedBytes/Delta;
b98f2859 830 LastBytes = CurrentBytes;
31a0531d 831 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
832}
833 /*}}}*/
834// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
835// ---------------------------------------------------------------------
836/* This is used to get accurate final transfer rate reporting. */
837void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 838{
b98f2859
AL
839 FetchedBytes += Size - Resume;
840}
841 /*}}}*/