regex for package names executed on Grp- not PkgIterator
[ntk/apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
0118833a
AL
16#include <apt-pkg/acquire.h>
17#include <apt-pkg/acquire-item.h>
18#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
19#include <apt-pkg/configuration.h>
20#include <apt-pkg/error.h>
cdcc6d34 21#include <apt-pkg/strutl.h>
1cd1c398 22#include <apt-pkg/fileutl.h>
8267fe24 23
b2e465d6 24#include <apti18n.h>
b4fc9b6f
AL
25
26#include <iostream>
75ef8f14 27#include <sstream>
526334a0
MV
28#include <stdio.h>
29
7a7fa5f0 30#include <dirent.h>
8267fe24 31#include <sys/time.h>
524f8105 32#include <errno.h>
0118833a
AL
33 /*}}}*/
34
b4fc9b6f
AL
35using namespace std;
36
0118833a
AL
37// Acquire::pkgAcquire - Constructor /*{{{*/
38// ---------------------------------------------------------------------
93bf083d 39/* We grab some runtime state from the configuration space */
1cd1c398
DK
40pkgAcquire::pkgAcquire() : Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
41 Debug(_config->FindB("Debug::pkgAcquire",false)),
42 Running(false), LockFD(-1)
0118833a 43{
1cd1c398 44 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
45 if (strcasecmp(Mode.c_str(),"host") == 0)
46 QueueMode = QueueHost;
47 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
48 QueueMode = QueueAccess;
49}
50pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : Queues(0), Workers(0),
51 Configs(0), Log(Progress), ToFetch(0),
52 Debug(_config->FindB("Debug::pkgAcquire",false)),
53 Running(false), LockFD(-1)
54{
55 string const Mode = _config->Find("Acquire::Queue-Mode","host");
56 if (strcasecmp(Mode.c_str(),"host") == 0)
57 QueueMode = QueueHost;
58 if (strcasecmp(Mode.c_str(),"access") == 0)
59 QueueMode = QueueAccess;
60 Setup(Progress, "");
61}
62 /*}}}*/
63// Acquire::Setup - Delayed Constructor /*{{{*/
64// ---------------------------------------------------------------------
65/* Do everything needed to be a complete Acquire object and report the
66 success (or failure) back so the user knows that something is wrong… */
67bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
68{
69 Log = Progress;
0a8a80e5 70
1cd1c398
DK
71 // check for existence and possibly create auxiliary directories
72 if (CheckDirectory(_config->FindDir("Dir::State"), _config->FindDir("Dir::State::lists") + "partial/") == false ||
73 CheckDirectory(_config->FindDir("Dir::Cache"), _config->FindDir("Dir::Cache::Archives") + "partial/") == false)
74 return false;
75
76 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
77 return true;
78
79 // Lock the directory this acquire object will work in
80 LockFD = GetLock(flCombine(Lock, "lock"));
81 if (LockFD == -1)
82 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
83
84 return true;
85}
86 /*}}}*/
87// Acquire::CheckDirectory - ensure that the given directory exists /*{{{*/
88// ---------------------------------------------------------------------
89/* a small wrapper around CreateDirectory to check if it exists and to
90 remove the trailing "/apt/" from the parent directory if needed */
91bool pkgAcquire::CheckDirectory(string const &Parent, string const &Path) const
92{
93 if (DirectoryExists(Path) == true)
94 return true;
95
96 size_t const len = Parent.size();
97 if (len > 5 && Parent.find("/apt/", len - 6, 5) != len - 5)
98 {
99 if (CreateDirectory(Parent.substr(0,len-5), Path) == true)
100 return true;
101 }
102 else if (CreateDirectory(Parent, Path) == true)
103 return true;
104
105 return _error->Errno("Acquire", _("Directory %s can't be created."), Path.c_str());
0118833a
AL
106}
107 /*}}}*/
108// Acquire::~pkgAcquire - Destructor /*{{{*/
109// ---------------------------------------------------------------------
93bf083d 110/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
111pkgAcquire::~pkgAcquire()
112{
459681d3 113 Shutdown();
1cd1c398
DK
114
115 if (LockFD != -1)
116 close(LockFD);
117
3b5421b4
AL
118 while (Configs != 0)
119 {
120 MethodConfig *Jnk = Configs;
121 Configs = Configs->Next;
122 delete Jnk;
123 }
281daf46
AL
124}
125 /*}}}*/
8e5fc8f5 126// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
127// ---------------------------------------------------------------------
128/* */
129void pkgAcquire::Shutdown()
130{
131 while (Items.size() != 0)
1b480911
AL
132 {
133 if (Items[0]->Status == Item::StatFetching)
134 Items[0]->Status = Item::StatError;
281daf46 135 delete Items[0];
1b480911 136 }
0a8a80e5
AL
137
138 while (Queues != 0)
139 {
140 Queue *Jnk = Queues;
141 Queues = Queues->Next;
142 delete Jnk;
143 }
0118833a
AL
144}
145 /*}}}*/
146// Acquire::Add - Add a new item /*{{{*/
147// ---------------------------------------------------------------------
93bf083d
AL
148/* This puts an item on the acquire list. This list is mainly for tracking
149 item status */
0118833a
AL
150void pkgAcquire::Add(Item *Itm)
151{
152 Items.push_back(Itm);
153}
154 /*}}}*/
155// Acquire::Remove - Remove a item /*{{{*/
156// ---------------------------------------------------------------------
93bf083d 157/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
158void pkgAcquire::Remove(Item *Itm)
159{
a3eaf954
AL
160 Dequeue(Itm);
161
753b3525 162 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
163 {
164 if (*I == Itm)
b4fc9b6f 165 {
0118833a 166 Items.erase(I);
b4fc9b6f
AL
167 I = Items.begin();
168 }
753b3525
AL
169 else
170 I++;
8267fe24 171 }
0118833a
AL
172}
173 /*}}}*/
0a8a80e5
AL
174// Acquire::Add - Add a worker /*{{{*/
175// ---------------------------------------------------------------------
93bf083d
AL
176/* A list of workers is kept so that the select loop can direct their FD
177 usage. */
0a8a80e5
AL
178void pkgAcquire::Add(Worker *Work)
179{
180 Work->NextAcquire = Workers;
181 Workers = Work;
182}
183 /*}}}*/
184// Acquire::Remove - Remove a worker /*{{{*/
185// ---------------------------------------------------------------------
93bf083d
AL
186/* A worker has died. This can not be done while the select loop is running
187 as it would require that RunFds could handling a changing list state and
188 it cant.. */
0a8a80e5
AL
189void pkgAcquire::Remove(Worker *Work)
190{
93bf083d
AL
191 if (Running == true)
192 abort();
193
0a8a80e5
AL
194 Worker **I = &Workers;
195 for (; *I != 0;)
196 {
197 if (*I == Work)
198 *I = (*I)->NextAcquire;
199 else
200 I = &(*I)->NextAcquire;
201 }
202}
203 /*}}}*/
0118833a
AL
204// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
205// ---------------------------------------------------------------------
93bf083d 206/* This is the entry point for an item. An item calls this function when
281daf46 207 it is constructed which creates a queue (based on the current queue
93bf083d
AL
208 mode) and puts the item in that queue. If the system is running then
209 the queue might be started. */
8267fe24 210void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 211{
0a8a80e5 212 // Determine which queue to put the item in
e331f6ed
AL
213 const MethodConfig *Config;
214 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
215 if (Name.empty() == true)
216 return;
217
218 // Find the queue structure
219 Queue *I = Queues;
220 for (; I != 0 && I->Name != Name; I = I->Next);
221 if (I == 0)
222 {
223 I = new Queue(Name,this);
224 I->Next = Queues;
225 Queues = I;
93bf083d
AL
226
227 if (Running == true)
228 I->Startup();
0a8a80e5 229 }
bfd22fc0 230
e331f6ed
AL
231 // See if this is a local only URI
232 if (Config->LocalOnly == true && Item.Owner->Complete == false)
233 Item.Owner->Local = true;
8267fe24 234 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
235
236 // Queue it into the named queue
c03462c6
MV
237 if(I->Enqueue(Item))
238 ToFetch++;
239
0a8a80e5
AL
240 // Some trace stuff
241 if (Debug == true)
242 {
8267fe24
AL
243 clog << "Fetching " << Item.URI << endl;
244 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 245 clog << " Queue is: " << Name << endl;
0a8a80e5 246 }
3b5421b4
AL
247}
248 /*}}}*/
0a8a80e5 249// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 250// ---------------------------------------------------------------------
93bf083d
AL
251/* This is called when an item is finished being fetched. It removes it
252 from all the queues */
0a8a80e5
AL
253void pkgAcquire::Dequeue(Item *Itm)
254{
255 Queue *I = Queues;
bfd22fc0 256 bool Res = false;
0a8a80e5 257 for (; I != 0; I = I->Next)
bfd22fc0 258 Res |= I->Dequeue(Itm);
93bf083d
AL
259
260 if (Debug == true)
261 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
262 if (Res == true)
263 ToFetch--;
0a8a80e5
AL
264}
265 /*}}}*/
266// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
267// ---------------------------------------------------------------------
268/* The string returned depends on the configuration settings and the
269 method parameters. Given something like http://foo.org/bar it can
270 return http://foo.org or http */
e331f6ed 271string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 272{
93bf083d
AL
273 URI U(Uri);
274
e331f6ed 275 Config = GetConfig(U.Access);
0a8a80e5
AL
276 if (Config == 0)
277 return string();
278
279 /* Single-Instance methods get exactly one queue per URI. This is
280 also used for the Access queue method */
281 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 282 return U.Access;
93bf083d
AL
283
284 return U.Access + ':' + U.Host;
0118833a
AL
285}
286 /*}}}*/
3b5421b4
AL
287// Acquire::GetConfig - Fetch the configuration information /*{{{*/
288// ---------------------------------------------------------------------
289/* This locates the configuration structure for an access method. If
290 a config structure cannot be found a Worker will be created to
291 retrieve it */
0a8a80e5 292pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
293{
294 // Search for an existing config
295 MethodConfig *Conf;
296 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
297 if (Conf->Access == Access)
298 return Conf;
299
300 // Create the new config class
301 Conf = new MethodConfig;
302 Conf->Access = Access;
303 Conf->Next = Configs;
304 Configs = Conf;
0118833a 305
3b5421b4
AL
306 // Create the worker to fetch the configuration
307 Worker Work(Conf);
308 if (Work.Start() == false)
309 return 0;
7c6e2dc7
MV
310
311 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 312 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
313 Conf->SingleInstance = true;
314
3b5421b4
AL
315 return Conf;
316}
317 /*}}}*/
0a8a80e5
AL
318// Acquire::SetFds - Deal with readable FDs /*{{{*/
319// ---------------------------------------------------------------------
320/* Collect FDs that have activity monitors into the fd sets */
321void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
322{
323 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
324 {
325 if (I->InReady == true && I->InFd >= 0)
326 {
327 if (Fd < I->InFd)
328 Fd = I->InFd;
329 FD_SET(I->InFd,RSet);
330 }
331 if (I->OutReady == true && I->OutFd >= 0)
332 {
333 if (Fd < I->OutFd)
334 Fd = I->OutFd;
335 FD_SET(I->OutFd,WSet);
336 }
337 }
338}
339 /*}}}*/
340// Acquire::RunFds - Deal with active FDs /*{{{*/
341// ---------------------------------------------------------------------
93bf083d
AL
342/* Dispatch active FDs over to the proper workers. It is very important
343 that a worker never be erased while this is running! The queue class
344 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
345void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
346{
347 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
348 {
349 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
350 I->InFdReady();
351 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
352 I->OutFdReady();
353 }
354}
355 /*}}}*/
356// Acquire::Run - Run the fetch sequence /*{{{*/
357// ---------------------------------------------------------------------
358/* This runs the queues. It manages a select loop for all of the
359 Worker tasks. The workers interact with the queues and items to
360 manage the actual fetch. */
1c5f7e5f 361pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 362{
8b89e57f
AL
363 Running = true;
364
0a8a80e5
AL
365 for (Queue *I = Queues; I != 0; I = I->Next)
366 I->Startup();
367
b98f2859
AL
368 if (Log != 0)
369 Log->Start();
370
024d1123
AL
371 bool WasCancelled = false;
372
0a8a80e5 373 // Run till all things have been acquired
8267fe24
AL
374 struct timeval tv;
375 tv.tv_sec = 0;
1c5f7e5f 376 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
377 while (ToFetch > 0)
378 {
379 fd_set RFds;
380 fd_set WFds;
381 int Highest = 0;
382 FD_ZERO(&RFds);
383 FD_ZERO(&WFds);
384 SetFds(Highest,&RFds,&WFds);
385
b0db36b1
AL
386 int Res;
387 do
388 {
389 Res = select(Highest+1,&RFds,&WFds,0,&tv);
390 }
391 while (Res < 0 && errno == EINTR);
392
8267fe24 393 if (Res < 0)
8b89e57f 394 {
8267fe24
AL
395 _error->Errno("select","Select has failed");
396 break;
8b89e57f 397 }
93bf083d 398
0a8a80e5 399 RunFds(&RFds,&WFds);
93bf083d
AL
400 if (_error->PendingError() == true)
401 break;
8267fe24
AL
402
403 // Timeout, notify the log class
404 if (Res == 0 || (Log != 0 && Log->Update == true))
405 {
1c5f7e5f 406 tv.tv_usec = PulseIntervall;
8267fe24
AL
407 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
408 I->Pulse();
024d1123
AL
409 if (Log != 0 && Log->Pulse(this) == false)
410 {
411 WasCancelled = true;
412 break;
413 }
8267fe24 414 }
0a8a80e5 415 }
be4401bf 416
b98f2859
AL
417 if (Log != 0)
418 Log->Stop();
419
be4401bf
AL
420 // Shut down the acquire bits
421 Running = false;
0a8a80e5 422 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 423 I->Shutdown(false);
0a8a80e5 424
ab559b35 425 // Shut down the items
b4fc9b6f 426 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 427 (*I)->Finished();
ab559b35 428
024d1123
AL
429 if (_error->PendingError())
430 return Failed;
431 if (WasCancelled)
432 return Cancelled;
433 return Continue;
93bf083d
AL
434}
435 /*}}}*/
be4401bf 436// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
437// ---------------------------------------------------------------------
438/* This routine bumps idle queues in hopes that they will be able to fetch
439 the dequeued item */
440void pkgAcquire::Bump()
441{
be4401bf
AL
442 for (Queue *I = Queues; I != 0; I = I->Next)
443 I->Bump();
0a8a80e5
AL
444}
445 /*}}}*/
8267fe24
AL
446// Acquire::WorkerStep - Step to the next worker /*{{{*/
447// ---------------------------------------------------------------------
448/* Not inlined to advoid including acquire-worker.h */
449pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
450{
451 return I->NextAcquire;
452};
453 /*}}}*/
a6568219 454// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
455// ---------------------------------------------------------------------
456/* This is a bit simplistic, it looks at every file in the dir and sees
457 if it is part of the download set. */
458bool pkgAcquire::Clean(string Dir)
459{
460 DIR *D = opendir(Dir.c_str());
461 if (D == 0)
b2e465d6 462 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
463
464 string StartDir = SafeGetCWD();
465 if (chdir(Dir.c_str()) != 0)
466 {
467 closedir(D);
b2e465d6 468 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
469 }
470
471 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
472 {
473 // Skip some files..
474 if (strcmp(Dir->d_name,"lock") == 0 ||
475 strcmp(Dir->d_name,"partial") == 0 ||
476 strcmp(Dir->d_name,".") == 0 ||
477 strcmp(Dir->d_name,"..") == 0)
478 continue;
479
480 // Look in the get list
b4fc9b6f 481 ItemCIterator I = Items.begin();
7a7fa5f0
AL
482 for (; I != Items.end(); I++)
483 if (flNotDir((*I)->DestFile) == Dir->d_name)
484 break;
485
486 // Nothing found, nuke it
487 if (I == Items.end())
488 unlink(Dir->d_name);
489 };
490
7a7fa5f0 491 closedir(D);
3c8cda8b
MV
492 if (chdir(StartDir.c_str()) != 0)
493 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
494 return true;
495}
496 /*}}}*/
a6568219
AL
497// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
498// ---------------------------------------------------------------------
499/* This is the total number of bytes needed */
b2e465d6 500double pkgAcquire::TotalNeeded()
a6568219 501{
b2e465d6 502 double Total = 0;
b4fc9b6f 503 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
504 Total += (*I)->FileSize;
505 return Total;
506}
507 /*}}}*/
508// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
509// ---------------------------------------------------------------------
510/* This is the number of bytes that is not local */
b2e465d6 511double pkgAcquire::FetchNeeded()
a6568219 512{
b2e465d6 513 double Total = 0;
b4fc9b6f 514 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
515 if ((*I)->Local == false)
516 Total += (*I)->FileSize;
517 return Total;
518}
519 /*}}}*/
6b1ff003
AL
520// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
521// ---------------------------------------------------------------------
522/* This is the number of bytes that is not local */
b2e465d6 523double pkgAcquire::PartialPresent()
6b1ff003 524{
b2e465d6 525 double Total = 0;
b4fc9b6f 526 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
527 if ((*I)->Local == false)
528 Total += (*I)->PartialSize;
529 return Total;
530}
92fcbfc1 531 /*}}}*/
8e5fc8f5 532// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
533// ---------------------------------------------------------------------
534/* */
535pkgAcquire::UriIterator pkgAcquire::UriBegin()
536{
537 return UriIterator(Queues);
538}
539 /*}}}*/
8e5fc8f5 540// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
541// ---------------------------------------------------------------------
542/* */
543pkgAcquire::UriIterator pkgAcquire::UriEnd()
544{
545 return UriIterator(0);
546}
547 /*}}}*/
e331f6ed
AL
548// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
549// ---------------------------------------------------------------------
550/* */
551pkgAcquire::MethodConfig::MethodConfig()
552{
553 SingleInstance = false;
e331f6ed
AL
554 Pipeline = false;
555 SendConfig = false;
556 LocalOnly = false;
459681d3 557 Removable = false;
e331f6ed
AL
558 Next = 0;
559}
560 /*}}}*/
0a8a80e5
AL
561// Queue::Queue - Constructor /*{{{*/
562// ---------------------------------------------------------------------
563/* */
564pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
565 Owner(Owner)
566{
567 Items = 0;
568 Next = 0;
569 Workers = 0;
b185acc2
AL
570 MaxPipeDepth = 1;
571 PipeDepth = 0;
0a8a80e5
AL
572}
573 /*}}}*/
574// Queue::~Queue - Destructor /*{{{*/
575// ---------------------------------------------------------------------
576/* */
577pkgAcquire::Queue::~Queue()
578{
8e5fc8f5 579 Shutdown(true);
0a8a80e5
AL
580
581 while (Items != 0)
582 {
583 QItem *Jnk = Items;
584 Items = Items->Next;
585 delete Jnk;
586 }
587}
588 /*}}}*/
589// Queue::Enqueue - Queue an item to the queue /*{{{*/
590// ---------------------------------------------------------------------
591/* */
c03462c6 592bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 593{
7a1b1f8b 594 QItem **I = &Items;
c03462c6
MV
595 // move to the end of the queue and check for duplicates here
596 for (; *I != 0; I = &(*I)->Next)
597 if (Item.URI == (*I)->URI)
598 {
599 Item.Owner->Status = Item::StatDone;
600 return false;
601 }
602
0a8a80e5 603 // Create a new item
7a1b1f8b
AL
604 QItem *Itm = new QItem;
605 *Itm = Item;
606 Itm->Next = 0;
607 *I = Itm;
0a8a80e5 608
8267fe24 609 Item.Owner->QueueCounter++;
93bf083d
AL
610 if (Items->Next == 0)
611 Cycle();
c03462c6 612 return true;
0a8a80e5
AL
613}
614 /*}}}*/
c88edf1d 615// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 616// ---------------------------------------------------------------------
b185acc2 617/* We return true if we hit something */
bfd22fc0 618bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 619{
b185acc2
AL
620 if (Owner->Status == pkgAcquire::Item::StatFetching)
621 return _error->Error("Tried to dequeue a fetching object");
622
bfd22fc0
AL
623 bool Res = false;
624
0a8a80e5
AL
625 QItem **I = &Items;
626 for (; *I != 0;)
627 {
628 if ((*I)->Owner == Owner)
629 {
630 QItem *Jnk= *I;
631 *I = (*I)->Next;
632 Owner->QueueCounter--;
633 delete Jnk;
bfd22fc0 634 Res = true;
0a8a80e5
AL
635 }
636 else
637 I = &(*I)->Next;
638 }
bfd22fc0
AL
639
640 return Res;
0a8a80e5
AL
641}
642 /*}}}*/
643// Queue::Startup - Start the worker processes /*{{{*/
644// ---------------------------------------------------------------------
8e5fc8f5
AL
645/* It is possible for this to be called with a pre-existing set of
646 workers. */
0a8a80e5
AL
647bool pkgAcquire::Queue::Startup()
648{
8e5fc8f5
AL
649 if (Workers == 0)
650 {
651 URI U(Name);
652 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
653 if (Cnf == 0)
654 return false;
655
656 Workers = new Worker(this,Cnf,Owner->Log);
657 Owner->Add(Workers);
658 if (Workers->Start() == false)
659 return false;
660
661 /* When pipelining we commit 10 items. This needs to change when we
662 added other source retry to have cycle maintain a pipeline depth
663 on its own. */
664 if (Cnf->Pipeline == true)
6ce72612 665 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
666 else
667 MaxPipeDepth = 1;
668 }
5cb5d8dc 669
93bf083d 670 return Cycle();
0a8a80e5
AL
671}
672 /*}}}*/
673// Queue::Shutdown - Shutdown the worker processes /*{{{*/
674// ---------------------------------------------------------------------
8e5fc8f5
AL
675/* If final is true then all workers are eliminated, otherwise only workers
676 that do not need cleanup are removed */
677bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
678{
679 // Delete all of the workers
8e5fc8f5
AL
680 pkgAcquire::Worker **Cur = &Workers;
681 while (*Cur != 0)
0a8a80e5 682 {
8e5fc8f5
AL
683 pkgAcquire::Worker *Jnk = *Cur;
684 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
685 {
686 *Cur = Jnk->NextQueue;
687 Owner->Remove(Jnk);
688 delete Jnk;
689 }
690 else
691 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
692 }
693
694 return true;
3b5421b4
AL
695}
696 /*}}}*/
7d8afa39 697// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
698// ---------------------------------------------------------------------
699/* */
700pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
701{
702 for (QItem *I = Items; I != 0; I = I->Next)
703 if (I->URI == URI && I->Worker == Owner)
704 return I;
705 return 0;
706}
707 /*}}}*/
708// Queue::ItemDone - Item has been completed /*{{{*/
709// ---------------------------------------------------------------------
710/* The worker signals this which causes the item to be removed from the
93bf083d
AL
711 queue. If this is the last queue instance then it is removed from the
712 main queue too.*/
c88edf1d
AL
713bool pkgAcquire::Queue::ItemDone(QItem *Itm)
714{
b185acc2 715 PipeDepth--;
db890fdb
AL
716 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
717 Itm->Owner->Status = pkgAcquire::Item::StatDone;
718
93bf083d
AL
719 if (Itm->Owner->QueueCounter <= 1)
720 Owner->Dequeue(Itm->Owner);
721 else
722 {
723 Dequeue(Itm->Owner);
724 Owner->Bump();
725 }
c88edf1d 726
93bf083d
AL
727 return Cycle();
728}
729 /*}}}*/
730// Queue::Cycle - Queue new items into the method /*{{{*/
731// ---------------------------------------------------------------------
b185acc2
AL
732/* This locates a new idle item and sends it to the worker. If pipelining
733 is enabled then it keeps the pipe full. */
93bf083d
AL
734bool pkgAcquire::Queue::Cycle()
735{
736 if (Items == 0 || Workers == 0)
c88edf1d
AL
737 return true;
738
e7432370
AL
739 if (PipeDepth < 0)
740 return _error->Error("Pipedepth failure");
741
93bf083d
AL
742 // Look for a queable item
743 QItem *I = Items;
e7432370 744 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
745 {
746 for (; I != 0; I = I->Next)
747 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
748 break;
749
750 // Nothing to do, queue is idle.
751 if (I == 0)
752 return true;
753
754 I->Worker = Workers;
755 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 756 PipeDepth++;
b185acc2
AL
757 if (Workers->QueueItem(I) == false)
758 return false;
759 }
93bf083d 760
b185acc2 761 return true;
c88edf1d
AL
762}
763 /*}}}*/
be4401bf
AL
764// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
765// ---------------------------------------------------------------------
b185acc2 766/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
767void pkgAcquire::Queue::Bump()
768{
b185acc2 769 Cycle();
be4401bf
AL
770}
771 /*}}}*/
b98f2859
AL
772// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
773// ---------------------------------------------------------------------
774/* */
c5ccf175 775pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
776{
777 Start();
778}
779 /*}}}*/
780// AcquireStatus::Pulse - Called periodically /*{{{*/
781// ---------------------------------------------------------------------
782/* This computes some internal state variables for the derived classes to
783 use. It generates the current downloaded bytes and total bytes to download
784 as well as the current CPS estimate. */
024d1123 785bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
786{
787 TotalBytes = 0;
788 CurrentBytes = 0;
d568ed2d
AL
789 TotalItems = 0;
790 CurrentItems = 0;
b98f2859
AL
791
792 // Compute the total number of bytes to fetch
793 unsigned int Unknown = 0;
794 unsigned int Count = 0;
b4fc9b6f 795 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
796 I++, Count++)
797 {
d568ed2d
AL
798 TotalItems++;
799 if ((*I)->Status == pkgAcquire::Item::StatDone)
800 CurrentItems++;
801
a6568219
AL
802 // Totally ignore local items
803 if ((*I)->Local == true)
804 continue;
b2e465d6 805
b98f2859
AL
806 TotalBytes += (*I)->FileSize;
807 if ((*I)->Complete == true)
808 CurrentBytes += (*I)->FileSize;
809 if ((*I)->FileSize == 0 && (*I)->Complete == false)
810 Unknown++;
811 }
812
813 // Compute the current completion
aa0e1101 814 unsigned long ResumeSize = 0;
b98f2859
AL
815 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
816 I = Owner->WorkerStep(I))
817 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
818 {
819 CurrentBytes += I->CurrentSize;
820 ResumeSize += I->ResumePoint;
821
822 // Files with unknown size always have 100% completion
823 if (I->CurrentItem->Owner->FileSize == 0 &&
824 I->CurrentItem->Owner->Complete == false)
825 TotalBytes += I->CurrentSize;
826 }
827
b98f2859
AL
828 // Normalize the figures and account for unknown size downloads
829 if (TotalBytes <= 0)
830 TotalBytes = 1;
831 if (Unknown == Count)
832 TotalBytes = Unknown;
18ef0a78
AL
833
834 // Wha?! Is not supposed to happen.
835 if (CurrentBytes > TotalBytes)
836 CurrentBytes = TotalBytes;
b98f2859
AL
837
838 // Compute the CPS
839 struct timeval NewTime;
840 gettimeofday(&NewTime,0);
2ec1674d 841 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
842 NewTime.tv_sec - Time.tv_sec > 6)
843 {
f17ac097
AL
844 double Delta = NewTime.tv_sec - Time.tv_sec +
845 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 846
b98f2859 847 // Compute the CPS value
f17ac097 848 if (Delta < 0.01)
e331f6ed
AL
849 CurrentCPS = 0;
850 else
aa0e1101
AL
851 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
852 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 853 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
854 Time = NewTime;
855 }
024d1123 856
75ef8f14
MV
857 int fd = _config->FindI("APT::Status-Fd",-1);
858 if(fd > 0)
859 {
860 ostringstream status;
861
862 char msg[200];
863 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
864 unsigned long ETA =
865 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
866
1e8b4c0f
MV
867 // only show the ETA if it makes sense
868 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 869 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 870 else
0c508b03 871 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
872
873
75ef8f14
MV
874
875 // build the status str
876 status << "dlstatus:" << i
877 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
878 << ":" << msg
879 << endl;
880 write(fd, status.str().c_str(), status.str().size());
881 }
882
024d1123 883 return true;
b98f2859
AL
884}
885 /*}}}*/
886// AcquireStatus::Start - Called when the download is started /*{{{*/
887// ---------------------------------------------------------------------
888/* We just reset the counters */
889void pkgAcquireStatus::Start()
890{
891 gettimeofday(&Time,0);
892 gettimeofday(&StartTime,0);
893 LastBytes = 0;
894 CurrentCPS = 0;
895 CurrentBytes = 0;
896 TotalBytes = 0;
897 FetchedBytes = 0;
898 ElapsedTime = 0;
d568ed2d
AL
899 TotalItems = 0;
900 CurrentItems = 0;
b98f2859
AL
901}
902 /*}}}*/
a6568219 903// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
904// ---------------------------------------------------------------------
905/* This accurately computes the elapsed time and the total overall CPS. */
906void pkgAcquireStatus::Stop()
907{
908 // Compute the CPS and elapsed time
909 struct timeval NewTime;
910 gettimeofday(&NewTime,0);
911
31a0531d
AL
912 double Delta = NewTime.tv_sec - StartTime.tv_sec +
913 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 914
b98f2859 915 // Compute the CPS value
31a0531d 916 if (Delta < 0.01)
e331f6ed
AL
917 CurrentCPS = 0;
918 else
31a0531d 919 CurrentCPS = FetchedBytes/Delta;
b98f2859 920 LastBytes = CurrentBytes;
31a0531d 921 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
922}
923 /*}}}*/
924// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
925// ---------------------------------------------------------------------
926/* This is used to get accurate final transfer rate reporting. */
927void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 928{
b98f2859
AL
929 FetchedBytes += Size - Resume;
930}
931 /*}}}*/