* apt-pkg/acquire*.cc:
[ntk/apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
b4fc9b6f 26#include <iostream>
75ef8f14 27#include <sstream>
526334a0
MV
28#include <stdio.h>
29
7a7fa5f0 30#include <dirent.h>
8267fe24 31#include <sys/time.h>
524f8105 32#include <errno.h>
ea542140
DK
33
34#include <apti18n.h>
0118833a
AL
35 /*}}}*/
36
b4fc9b6f
AL
37using namespace std;
38
0118833a
AL
39// Acquire::pkgAcquire - Constructor /*{{{*/
40// ---------------------------------------------------------------------
93bf083d 41/* We grab some runtime state from the configuration space */
5efbd596 42pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 43 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 44 Running(false)
0118833a 45{
1cd1c398 46 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
47 if (strcasecmp(Mode.c_str(),"host") == 0)
48 QueueMode = QueueHost;
49 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
50 QueueMode = QueueAccess;
51}
5efbd596 52pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
53 Configs(0), Log(Progress), ToFetch(0),
54 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 55 Running(false)
1cd1c398
DK
56{
57 string const Mode = _config->Find("Acquire::Queue-Mode","host");
58 if (strcasecmp(Mode.c_str(),"host") == 0)
59 QueueMode = QueueHost;
60 if (strcasecmp(Mode.c_str(),"access") == 0)
61 QueueMode = QueueAccess;
62 Setup(Progress, "");
63}
64 /*}}}*/
65// Acquire::Setup - Delayed Constructor /*{{{*/
66// ---------------------------------------------------------------------
67/* Do everything needed to be a complete Acquire object and report the
68 success (or failure) back so the user knows that something is wrong… */
69bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
70{
71 Log = Progress;
0a8a80e5 72
1cd1c398 73 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
74 string const listDir = _config->FindDir("Dir::State::lists");
75 string const partialListDir = listDir + "partial/";
76 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
77 string const partialArchivesDir = archivesDir + "partial/";
78
7753e468
DK
79 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
80 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
81 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
82
7753e468
DK
83 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
84 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 85 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
86
87 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
88 return true;
89
90 // Lock the directory this acquire object will work in
91 LockFD = GetLock(flCombine(Lock, "lock"));
92 if (LockFD == -1)
93 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
94
95 return true;
96}
97 /*}}}*/
0118833a
AL
98// Acquire::~pkgAcquire - Destructor /*{{{*/
99// ---------------------------------------------------------------------
93bf083d 100/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
101pkgAcquire::~pkgAcquire()
102{
459681d3 103 Shutdown();
1cd1c398
DK
104
105 if (LockFD != -1)
106 close(LockFD);
107
3b5421b4
AL
108 while (Configs != 0)
109 {
110 MethodConfig *Jnk = Configs;
111 Configs = Configs->Next;
112 delete Jnk;
113 }
281daf46
AL
114}
115 /*}}}*/
8e5fc8f5 116// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
117// ---------------------------------------------------------------------
118/* */
119void pkgAcquire::Shutdown()
120{
f7f0d6c7 121 while (Items.empty() == false)
1b480911
AL
122 {
123 if (Items[0]->Status == Item::StatFetching)
124 Items[0]->Status = Item::StatError;
281daf46 125 delete Items[0];
1b480911 126 }
0a8a80e5
AL
127
128 while (Queues != 0)
129 {
130 Queue *Jnk = Queues;
131 Queues = Queues->Next;
132 delete Jnk;
133 }
0118833a
AL
134}
135 /*}}}*/
136// Acquire::Add - Add a new item /*{{{*/
137// ---------------------------------------------------------------------
93bf083d
AL
138/* This puts an item on the acquire list. This list is mainly for tracking
139 item status */
0118833a
AL
140void pkgAcquire::Add(Item *Itm)
141{
142 Items.push_back(Itm);
143}
144 /*}}}*/
145// Acquire::Remove - Remove a item /*{{{*/
146// ---------------------------------------------------------------------
93bf083d 147/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
148void pkgAcquire::Remove(Item *Itm)
149{
a3eaf954
AL
150 Dequeue(Itm);
151
753b3525 152 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
153 {
154 if (*I == Itm)
b4fc9b6f 155 {
0118833a 156 Items.erase(I);
b4fc9b6f
AL
157 I = Items.begin();
158 }
753b3525 159 else
f7f0d6c7 160 ++I;
8267fe24 161 }
0118833a
AL
162}
163 /*}}}*/
0a8a80e5
AL
164// Acquire::Add - Add a worker /*{{{*/
165// ---------------------------------------------------------------------
93bf083d
AL
166/* A list of workers is kept so that the select loop can direct their FD
167 usage. */
0a8a80e5
AL
168void pkgAcquire::Add(Worker *Work)
169{
170 Work->NextAcquire = Workers;
171 Workers = Work;
172}
173 /*}}}*/
174// Acquire::Remove - Remove a worker /*{{{*/
175// ---------------------------------------------------------------------
93bf083d
AL
176/* A worker has died. This can not be done while the select loop is running
177 as it would require that RunFds could handling a changing list state and
178 it cant.. */
0a8a80e5
AL
179void pkgAcquire::Remove(Worker *Work)
180{
93bf083d
AL
181 if (Running == true)
182 abort();
183
0a8a80e5
AL
184 Worker **I = &Workers;
185 for (; *I != 0;)
186 {
187 if (*I == Work)
188 *I = (*I)->NextAcquire;
189 else
190 I = &(*I)->NextAcquire;
191 }
192}
193 /*}}}*/
0118833a
AL
194// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
195// ---------------------------------------------------------------------
93bf083d 196/* This is the entry point for an item. An item calls this function when
281daf46 197 it is constructed which creates a queue (based on the current queue
93bf083d
AL
198 mode) and puts the item in that queue. If the system is running then
199 the queue might be started. */
8267fe24 200void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 201{
0a8a80e5 202 // Determine which queue to put the item in
e331f6ed
AL
203 const MethodConfig *Config;
204 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
205 if (Name.empty() == true)
206 return;
207
208 // Find the queue structure
209 Queue *I = Queues;
210 for (; I != 0 && I->Name != Name; I = I->Next);
211 if (I == 0)
212 {
213 I = new Queue(Name,this);
214 I->Next = Queues;
215 Queues = I;
93bf083d
AL
216
217 if (Running == true)
218 I->Startup();
0a8a80e5 219 }
bfd22fc0 220
e331f6ed
AL
221 // See if this is a local only URI
222 if (Config->LocalOnly == true && Item.Owner->Complete == false)
223 Item.Owner->Local = true;
8267fe24 224 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
225
226 // Queue it into the named queue
c03462c6
MV
227 if(I->Enqueue(Item))
228 ToFetch++;
229
0a8a80e5
AL
230 // Some trace stuff
231 if (Debug == true)
232 {
8267fe24
AL
233 clog << "Fetching " << Item.URI << endl;
234 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 235 clog << " Queue is: " << Name << endl;
0a8a80e5 236 }
3b5421b4
AL
237}
238 /*}}}*/
0a8a80e5 239// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 240// ---------------------------------------------------------------------
93bf083d
AL
241/* This is called when an item is finished being fetched. It removes it
242 from all the queues */
0a8a80e5
AL
243void pkgAcquire::Dequeue(Item *Itm)
244{
245 Queue *I = Queues;
bfd22fc0 246 bool Res = false;
93bf083d
AL
247 if (Debug == true)
248 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
249
250 for (; I != 0; I = I->Next)
251 {
252 if (I->Dequeue(Itm))
253 {
254 Res = true;
255 if (Debug == true)
256 clog << "Dequeued from " << I->Name << endl;
257 }
258 }
259
bfd22fc0
AL
260 if (Res == true)
261 ToFetch--;
0a8a80e5
AL
262}
263 /*}}}*/
264// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
265// ---------------------------------------------------------------------
266/* The string returned depends on the configuration settings and the
267 method parameters. Given something like http://foo.org/bar it can
268 return http://foo.org or http */
e331f6ed 269string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 270{
93bf083d
AL
271 URI U(Uri);
272
e331f6ed 273 Config = GetConfig(U.Access);
0a8a80e5
AL
274 if (Config == 0)
275 return string();
276
277 /* Single-Instance methods get exactly one queue per URI. This is
278 also used for the Access queue method */
279 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
280 return U.Access;
281
282 string AccessSchema = U.Access + ':',
283 FullQueueName = AccessSchema + U.Host;
284 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
285
286 Queue *I = Queues;
287 for (; I != 0; I = I->Next) {
288 // if the queue already exists, re-use it
289 if (I->Name == FullQueueName)
290 return FullQueueName;
291
292 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
293 Instances++;
294 }
295
296 if (Debug) {
297 clog << "Found " << Instances << " instances of " << U.Access << endl;
298 }
299
300 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
301 return U.Access;
93bf083d 302
5674f6b3 303 return FullQueueName;
0118833a
AL
304}
305 /*}}}*/
3b5421b4
AL
306// Acquire::GetConfig - Fetch the configuration information /*{{{*/
307// ---------------------------------------------------------------------
308/* This locates the configuration structure for an access method. If
309 a config structure cannot be found a Worker will be created to
310 retrieve it */
0a8a80e5 311pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
312{
313 // Search for an existing config
314 MethodConfig *Conf;
315 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
316 if (Conf->Access == Access)
317 return Conf;
318
319 // Create the new config class
320 Conf = new MethodConfig;
321 Conf->Access = Access;
322 Conf->Next = Configs;
323 Configs = Conf;
0118833a 324
3b5421b4
AL
325 // Create the worker to fetch the configuration
326 Worker Work(Conf);
327 if (Work.Start() == false)
328 return 0;
7c6e2dc7
MV
329
330 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 331 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
332 Conf->SingleInstance = true;
333
3b5421b4
AL
334 return Conf;
335}
336 /*}}}*/
0a8a80e5
AL
337// Acquire::SetFds - Deal with readable FDs /*{{{*/
338// ---------------------------------------------------------------------
339/* Collect FDs that have activity monitors into the fd sets */
340void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
341{
342 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
343 {
344 if (I->InReady == true && I->InFd >= 0)
345 {
346 if (Fd < I->InFd)
347 Fd = I->InFd;
348 FD_SET(I->InFd,RSet);
349 }
350 if (I->OutReady == true && I->OutFd >= 0)
351 {
352 if (Fd < I->OutFd)
353 Fd = I->OutFd;
354 FD_SET(I->OutFd,WSet);
355 }
356 }
357}
358 /*}}}*/
359// Acquire::RunFds - Deal with active FDs /*{{{*/
360// ---------------------------------------------------------------------
93bf083d
AL
361/* Dispatch active FDs over to the proper workers. It is very important
362 that a worker never be erased while this is running! The queue class
363 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
364void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
365{
366 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
367 {
368 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
369 I->InFdReady();
370 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
371 I->OutFdReady();
372 }
373}
374 /*}}}*/
375// Acquire::Run - Run the fetch sequence /*{{{*/
376// ---------------------------------------------------------------------
377/* This runs the queues. It manages a select loop for all of the
378 Worker tasks. The workers interact with the queues and items to
379 manage the actual fetch. */
1c5f7e5f 380pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 381{
8b89e57f
AL
382 Running = true;
383
0a8a80e5
AL
384 for (Queue *I = Queues; I != 0; I = I->Next)
385 I->Startup();
386
b98f2859
AL
387 if (Log != 0)
388 Log->Start();
389
024d1123
AL
390 bool WasCancelled = false;
391
0a8a80e5 392 // Run till all things have been acquired
8267fe24
AL
393 struct timeval tv;
394 tv.tv_sec = 0;
1c5f7e5f 395 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
396 while (ToFetch > 0)
397 {
398 fd_set RFds;
399 fd_set WFds;
400 int Highest = 0;
401 FD_ZERO(&RFds);
402 FD_ZERO(&WFds);
403 SetFds(Highest,&RFds,&WFds);
404
b0db36b1
AL
405 int Res;
406 do
407 {
408 Res = select(Highest+1,&RFds,&WFds,0,&tv);
409 }
410 while (Res < 0 && errno == EINTR);
411
8267fe24 412 if (Res < 0)
8b89e57f 413 {
8267fe24
AL
414 _error->Errno("select","Select has failed");
415 break;
8b89e57f 416 }
93bf083d 417
0a8a80e5 418 RunFds(&RFds,&WFds);
93bf083d
AL
419 if (_error->PendingError() == true)
420 break;
8267fe24
AL
421
422 // Timeout, notify the log class
423 if (Res == 0 || (Log != 0 && Log->Update == true))
424 {
1c5f7e5f 425 tv.tv_usec = PulseIntervall;
8267fe24
AL
426 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
427 I->Pulse();
024d1123
AL
428 if (Log != 0 && Log->Pulse(this) == false)
429 {
430 WasCancelled = true;
431 break;
432 }
8267fe24 433 }
0a8a80e5 434 }
be4401bf 435
b98f2859
AL
436 if (Log != 0)
437 Log->Stop();
438
be4401bf
AL
439 // Shut down the acquire bits
440 Running = false;
0a8a80e5 441 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 442 I->Shutdown(false);
0a8a80e5 443
ab559b35 444 // Shut down the items
f7f0d6c7 445 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 446 (*I)->Finished();
ab559b35 447
024d1123
AL
448 if (_error->PendingError())
449 return Failed;
450 if (WasCancelled)
451 return Cancelled;
452 return Continue;
93bf083d
AL
453}
454 /*}}}*/
be4401bf 455// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
456// ---------------------------------------------------------------------
457/* This routine bumps idle queues in hopes that they will be able to fetch
458 the dequeued item */
459void pkgAcquire::Bump()
460{
be4401bf
AL
461 for (Queue *I = Queues; I != 0; I = I->Next)
462 I->Bump();
0a8a80e5
AL
463}
464 /*}}}*/
8267fe24
AL
465// Acquire::WorkerStep - Step to the next worker /*{{{*/
466// ---------------------------------------------------------------------
467/* Not inlined to advoid including acquire-worker.h */
468pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
469{
470 return I->NextAcquire;
471};
472 /*}}}*/
a6568219 473// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
474// ---------------------------------------------------------------------
475/* This is a bit simplistic, it looks at every file in the dir and sees
476 if it is part of the download set. */
477bool pkgAcquire::Clean(string Dir)
478{
95b5f6c1
DK
479 // non-existing directories are by definition clean…
480 if (DirectoryExists(Dir) == false)
481 return true;
482
7a7fa5f0
AL
483 DIR *D = opendir(Dir.c_str());
484 if (D == 0)
b2e465d6 485 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
486
487 string StartDir = SafeGetCWD();
488 if (chdir(Dir.c_str()) != 0)
489 {
490 closedir(D);
b2e465d6 491 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
492 }
493
494 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
495 {
496 // Skip some files..
497 if (strcmp(Dir->d_name,"lock") == 0 ||
498 strcmp(Dir->d_name,"partial") == 0 ||
499 strcmp(Dir->d_name,".") == 0 ||
500 strcmp(Dir->d_name,"..") == 0)
501 continue;
502
503 // Look in the get list
b4fc9b6f 504 ItemCIterator I = Items.begin();
f7f0d6c7 505 for (; I != Items.end(); ++I)
7a7fa5f0
AL
506 if (flNotDir((*I)->DestFile) == Dir->d_name)
507 break;
508
509 // Nothing found, nuke it
510 if (I == Items.end())
511 unlink(Dir->d_name);
512 };
513
7a7fa5f0 514 closedir(D);
3c8cda8b
MV
515 if (chdir(StartDir.c_str()) != 0)
516 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
517 return true;
518}
519 /*}}}*/
a6568219
AL
520// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
521// ---------------------------------------------------------------------
522/* This is the total number of bytes needed */
a3c4c81a 523unsigned long long pkgAcquire::TotalNeeded()
a6568219 524{
a3c4c81a 525 unsigned long long Total = 0;
f7f0d6c7 526 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
527 Total += (*I)->FileSize;
528 return Total;
529}
530 /*}}}*/
531// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
532// ---------------------------------------------------------------------
533/* This is the number of bytes that is not local */
a3c4c81a 534unsigned long long pkgAcquire::FetchNeeded()
a6568219 535{
a3c4c81a 536 unsigned long long Total = 0;
f7f0d6c7 537 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
538 if ((*I)->Local == false)
539 Total += (*I)->FileSize;
540 return Total;
541}
542 /*}}}*/
6b1ff003
AL
543// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
544// ---------------------------------------------------------------------
545/* This is the number of bytes that is not local */
a3c4c81a 546unsigned long long pkgAcquire::PartialPresent()
6b1ff003 547{
a3c4c81a 548 unsigned long long Total = 0;
f7f0d6c7 549 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
550 if ((*I)->Local == false)
551 Total += (*I)->PartialSize;
552 return Total;
553}
92fcbfc1 554 /*}}}*/
8e5fc8f5 555// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
556// ---------------------------------------------------------------------
557/* */
558pkgAcquire::UriIterator pkgAcquire::UriBegin()
559{
560 return UriIterator(Queues);
561}
562 /*}}}*/
8e5fc8f5 563// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
564// ---------------------------------------------------------------------
565/* */
566pkgAcquire::UriIterator pkgAcquire::UriEnd()
567{
568 return UriIterator(0);
569}
570 /*}}}*/
e331f6ed
AL
571// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
572// ---------------------------------------------------------------------
573/* */
574pkgAcquire::MethodConfig::MethodConfig()
575{
576 SingleInstance = false;
e331f6ed
AL
577 Pipeline = false;
578 SendConfig = false;
579 LocalOnly = false;
459681d3 580 Removable = false;
e331f6ed
AL
581 Next = 0;
582}
583 /*}}}*/
0a8a80e5
AL
584// Queue::Queue - Constructor /*{{{*/
585// ---------------------------------------------------------------------
586/* */
587pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
588 Owner(Owner)
589{
590 Items = 0;
591 Next = 0;
592 Workers = 0;
b185acc2
AL
593 MaxPipeDepth = 1;
594 PipeDepth = 0;
0a8a80e5
AL
595}
596 /*}}}*/
597// Queue::~Queue - Destructor /*{{{*/
598// ---------------------------------------------------------------------
599/* */
600pkgAcquire::Queue::~Queue()
601{
8e5fc8f5 602 Shutdown(true);
0a8a80e5
AL
603
604 while (Items != 0)
605 {
606 QItem *Jnk = Items;
607 Items = Items->Next;
608 delete Jnk;
609 }
610}
611 /*}}}*/
612// Queue::Enqueue - Queue an item to the queue /*{{{*/
613// ---------------------------------------------------------------------
614/* */
c03462c6 615bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 616{
7a1b1f8b 617 QItem **I = &Items;
c03462c6
MV
618 // move to the end of the queue and check for duplicates here
619 for (; *I != 0; I = &(*I)->Next)
620 if (Item.URI == (*I)->URI)
621 {
622 Item.Owner->Status = Item::StatDone;
623 return false;
624 }
625
0a8a80e5 626 // Create a new item
7a1b1f8b
AL
627 QItem *Itm = new QItem;
628 *Itm = Item;
629 Itm->Next = 0;
630 *I = Itm;
0a8a80e5 631
8267fe24 632 Item.Owner->QueueCounter++;
93bf083d
AL
633 if (Items->Next == 0)
634 Cycle();
c03462c6 635 return true;
0a8a80e5
AL
636}
637 /*}}}*/
c88edf1d 638// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 639// ---------------------------------------------------------------------
b185acc2 640/* We return true if we hit something */
bfd22fc0 641bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 642{
b185acc2
AL
643 if (Owner->Status == pkgAcquire::Item::StatFetching)
644 return _error->Error("Tried to dequeue a fetching object");
645
bfd22fc0
AL
646 bool Res = false;
647
0a8a80e5
AL
648 QItem **I = &Items;
649 for (; *I != 0;)
650 {
651 if ((*I)->Owner == Owner)
652 {
653 QItem *Jnk= *I;
654 *I = (*I)->Next;
655 Owner->QueueCounter--;
656 delete Jnk;
bfd22fc0 657 Res = true;
0a8a80e5
AL
658 }
659 else
660 I = &(*I)->Next;
661 }
bfd22fc0
AL
662
663 return Res;
0a8a80e5
AL
664}
665 /*}}}*/
666// Queue::Startup - Start the worker processes /*{{{*/
667// ---------------------------------------------------------------------
8e5fc8f5
AL
668/* It is possible for this to be called with a pre-existing set of
669 workers. */
0a8a80e5
AL
670bool pkgAcquire::Queue::Startup()
671{
8e5fc8f5
AL
672 if (Workers == 0)
673 {
674 URI U(Name);
675 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
676 if (Cnf == 0)
677 return false;
678
679 Workers = new Worker(this,Cnf,Owner->Log);
680 Owner->Add(Workers);
681 if (Workers->Start() == false)
682 return false;
683
684 /* When pipelining we commit 10 items. This needs to change when we
685 added other source retry to have cycle maintain a pipeline depth
686 on its own. */
687 if (Cnf->Pipeline == true)
6ce72612 688 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
689 else
690 MaxPipeDepth = 1;
691 }
5cb5d8dc 692
93bf083d 693 return Cycle();
0a8a80e5
AL
694}
695 /*}}}*/
696// Queue::Shutdown - Shutdown the worker processes /*{{{*/
697// ---------------------------------------------------------------------
8e5fc8f5
AL
698/* If final is true then all workers are eliminated, otherwise only workers
699 that do not need cleanup are removed */
700bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
701{
702 // Delete all of the workers
8e5fc8f5
AL
703 pkgAcquire::Worker **Cur = &Workers;
704 while (*Cur != 0)
0a8a80e5 705 {
8e5fc8f5
AL
706 pkgAcquire::Worker *Jnk = *Cur;
707 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
708 {
709 *Cur = Jnk->NextQueue;
710 Owner->Remove(Jnk);
711 delete Jnk;
712 }
713 else
714 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
715 }
716
717 return true;
3b5421b4
AL
718}
719 /*}}}*/
7d8afa39 720// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
721// ---------------------------------------------------------------------
722/* */
723pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
724{
725 for (QItem *I = Items; I != 0; I = I->Next)
726 if (I->URI == URI && I->Worker == Owner)
727 return I;
728 return 0;
729}
730 /*}}}*/
731// Queue::ItemDone - Item has been completed /*{{{*/
732// ---------------------------------------------------------------------
733/* The worker signals this which causes the item to be removed from the
93bf083d
AL
734 queue. If this is the last queue instance then it is removed from the
735 main queue too.*/
c88edf1d
AL
736bool pkgAcquire::Queue::ItemDone(QItem *Itm)
737{
b185acc2 738 PipeDepth--;
db890fdb
AL
739 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
740 Itm->Owner->Status = pkgAcquire::Item::StatDone;
741
93bf083d
AL
742 if (Itm->Owner->QueueCounter <= 1)
743 Owner->Dequeue(Itm->Owner);
744 else
745 {
746 Dequeue(Itm->Owner);
747 Owner->Bump();
748 }
c88edf1d 749
93bf083d
AL
750 return Cycle();
751}
752 /*}}}*/
753// Queue::Cycle - Queue new items into the method /*{{{*/
754// ---------------------------------------------------------------------
b185acc2
AL
755/* This locates a new idle item and sends it to the worker. If pipelining
756 is enabled then it keeps the pipe full. */
93bf083d
AL
757bool pkgAcquire::Queue::Cycle()
758{
759 if (Items == 0 || Workers == 0)
c88edf1d
AL
760 return true;
761
e7432370
AL
762 if (PipeDepth < 0)
763 return _error->Error("Pipedepth failure");
764
93bf083d
AL
765 // Look for a queable item
766 QItem *I = Items;
e7432370 767 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
768 {
769 for (; I != 0; I = I->Next)
770 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
771 break;
772
773 // Nothing to do, queue is idle.
774 if (I == 0)
775 return true;
776
777 I->Worker = Workers;
778 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 779 PipeDepth++;
b185acc2
AL
780 if (Workers->QueueItem(I) == false)
781 return false;
782 }
93bf083d 783
b185acc2 784 return true;
c88edf1d
AL
785}
786 /*}}}*/
be4401bf
AL
787// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
788// ---------------------------------------------------------------------
b185acc2 789/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
790void pkgAcquire::Queue::Bump()
791{
b185acc2 792 Cycle();
be4401bf
AL
793}
794 /*}}}*/
b98f2859
AL
795// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
796// ---------------------------------------------------------------------
797/* */
dcaa1185 798pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
b98f2859
AL
799{
800 Start();
801}
802 /*}}}*/
803// AcquireStatus::Pulse - Called periodically /*{{{*/
804// ---------------------------------------------------------------------
805/* This computes some internal state variables for the derived classes to
806 use. It generates the current downloaded bytes and total bytes to download
807 as well as the current CPS estimate. */
024d1123 808bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
809{
810 TotalBytes = 0;
811 CurrentBytes = 0;
d568ed2d
AL
812 TotalItems = 0;
813 CurrentItems = 0;
b98f2859
AL
814
815 // Compute the total number of bytes to fetch
816 unsigned int Unknown = 0;
817 unsigned int Count = 0;
b4fc9b6f 818 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
f7f0d6c7 819 ++I, ++Count)
b98f2859 820 {
d568ed2d
AL
821 TotalItems++;
822 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 823 ++CurrentItems;
d568ed2d 824
a6568219
AL
825 // Totally ignore local items
826 if ((*I)->Local == true)
827 continue;
b2e465d6 828
b98f2859
AL
829 TotalBytes += (*I)->FileSize;
830 if ((*I)->Complete == true)
831 CurrentBytes += (*I)->FileSize;
832 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 833 ++Unknown;
b98f2859
AL
834 }
835
836 // Compute the current completion
dbbc5494 837 unsigned long long ResumeSize = 0;
b98f2859
AL
838 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
839 I = Owner->WorkerStep(I))
840 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
841 {
842 CurrentBytes += I->CurrentSize;
843 ResumeSize += I->ResumePoint;
844
845 // Files with unknown size always have 100% completion
846 if (I->CurrentItem->Owner->FileSize == 0 &&
847 I->CurrentItem->Owner->Complete == false)
848 TotalBytes += I->CurrentSize;
849 }
850
b98f2859
AL
851 // Normalize the figures and account for unknown size downloads
852 if (TotalBytes <= 0)
853 TotalBytes = 1;
854 if (Unknown == Count)
855 TotalBytes = Unknown;
18ef0a78
AL
856
857 // Wha?! Is not supposed to happen.
858 if (CurrentBytes > TotalBytes)
859 CurrentBytes = TotalBytes;
b98f2859
AL
860
861 // Compute the CPS
862 struct timeval NewTime;
863 gettimeofday(&NewTime,0);
2ec1674d 864 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
865 NewTime.tv_sec - Time.tv_sec > 6)
866 {
f17ac097
AL
867 double Delta = NewTime.tv_sec - Time.tv_sec +
868 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 869
b98f2859 870 // Compute the CPS value
f17ac097 871 if (Delta < 0.01)
e331f6ed
AL
872 CurrentCPS = 0;
873 else
aa0e1101
AL
874 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
875 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 876 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
877 Time = NewTime;
878 }
024d1123 879
75ef8f14
MV
880 int fd = _config->FindI("APT::Status-Fd",-1);
881 if(fd > 0)
882 {
883 ostringstream status;
884
885 char msg[200];
886 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
887 unsigned long long ETA = 0;
888 if(CurrentCPS > 0)
889 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 890
1e8b4c0f
MV
891 // only show the ETA if it makes sense
892 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 893 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 894 else
0c508b03 895 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
896
897
75ef8f14
MV
898
899 // build the status str
900 status << "dlstatus:" << i
901 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
902 << ":" << msg
903 << endl;
31bda500
DK
904
905 std::string const dlstatus = status.str();
d68d65ad 906 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
907 }
908
024d1123 909 return true;
b98f2859
AL
910}
911 /*}}}*/
912// AcquireStatus::Start - Called when the download is started /*{{{*/
913// ---------------------------------------------------------------------
914/* We just reset the counters */
915void pkgAcquireStatus::Start()
916{
917 gettimeofday(&Time,0);
918 gettimeofday(&StartTime,0);
919 LastBytes = 0;
920 CurrentCPS = 0;
921 CurrentBytes = 0;
922 TotalBytes = 0;
923 FetchedBytes = 0;
924 ElapsedTime = 0;
d568ed2d
AL
925 TotalItems = 0;
926 CurrentItems = 0;
b98f2859
AL
927}
928 /*}}}*/
a6568219 929// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
930// ---------------------------------------------------------------------
931/* This accurately computes the elapsed time and the total overall CPS. */
932void pkgAcquireStatus::Stop()
933{
934 // Compute the CPS and elapsed time
935 struct timeval NewTime;
936 gettimeofday(&NewTime,0);
937
31a0531d
AL
938 double Delta = NewTime.tv_sec - StartTime.tv_sec +
939 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 940
b98f2859 941 // Compute the CPS value
31a0531d 942 if (Delta < 0.01)
e331f6ed
AL
943 CurrentCPS = 0;
944 else
31a0531d 945 CurrentCPS = FetchedBytes/Delta;
b98f2859 946 LastBytes = CurrentBytes;
dbbc5494 947 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
948}
949 /*}}}*/
950// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
951// ---------------------------------------------------------------------
952/* This is used to get accurate final transfer rate reporting. */
73da43e9 953void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 954{
b98f2859
AL
955 FetchedBytes += Size - Resume;
956}
957 /*}}}*/