* debian/control:
[ntk/apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
0118833a
AL
16#include <apt-pkg/acquire.h>
17#include <apt-pkg/acquire-item.h>
18#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
19#include <apt-pkg/configuration.h>
20#include <apt-pkg/error.h>
cdcc6d34 21#include <apt-pkg/strutl.h>
1cd1c398 22#include <apt-pkg/fileutl.h>
8267fe24 23
b2e465d6 24#include <apti18n.h>
b4fc9b6f
AL
25
26#include <iostream>
75ef8f14 27#include <sstream>
526334a0
MV
28#include <stdio.h>
29
7a7fa5f0 30#include <dirent.h>
8267fe24 31#include <sys/time.h>
524f8105 32#include <errno.h>
0118833a
AL
33 /*}}}*/
34
b4fc9b6f
AL
35using namespace std;
36
0118833a
AL
37// Acquire::pkgAcquire - Constructor /*{{{*/
38// ---------------------------------------------------------------------
93bf083d 39/* We grab some runtime state from the configuration space */
1cd1c398
DK
40pkgAcquire::pkgAcquire() : Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
41 Debug(_config->FindB("Debug::pkgAcquire",false)),
42 Running(false), LockFD(-1)
0118833a 43{
1cd1c398 44 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
45 if (strcasecmp(Mode.c_str(),"host") == 0)
46 QueueMode = QueueHost;
47 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
48 QueueMode = QueueAccess;
49}
50pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : Queues(0), Workers(0),
51 Configs(0), Log(Progress), ToFetch(0),
52 Debug(_config->FindB("Debug::pkgAcquire",false)),
53 Running(false), LockFD(-1)
54{
55 string const Mode = _config->Find("Acquire::Queue-Mode","host");
56 if (strcasecmp(Mode.c_str(),"host") == 0)
57 QueueMode = QueueHost;
58 if (strcasecmp(Mode.c_str(),"access") == 0)
59 QueueMode = QueueAccess;
60 Setup(Progress, "");
61}
62 /*}}}*/
63// Acquire::Setup - Delayed Constructor /*{{{*/
64// ---------------------------------------------------------------------
65/* Do everything needed to be a complete Acquire object and report the
66 success (or failure) back so the user knows that something is wrong… */
67bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
68{
69 Log = Progress;
0a8a80e5 70
1cd1c398 71 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
72 string const listDir = _config->FindDir("Dir::State::lists");
73 string const partialListDir = listDir + "partial/";
74 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
75 string const partialArchivesDir = archivesDir + "partial/";
76
77 if (CheckDirectory(_config->FindDir("Dir::State"), partialListDir) == false &&
78 CheckDirectory(listDir, partialListDir) == false)
79 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
80
81 if (CheckDirectory(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
82 CheckDirectory(archivesDir, partialArchivesDir) == false)
83 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
84
85 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
86 return true;
87
88 // Lock the directory this acquire object will work in
89 LockFD = GetLock(flCombine(Lock, "lock"));
90 if (LockFD == -1)
91 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
92
93 return true;
94}
95 /*}}}*/
96// Acquire::CheckDirectory - ensure that the given directory exists /*{{{*/
97// ---------------------------------------------------------------------
98/* a small wrapper around CreateDirectory to check if it exists and to
99 remove the trailing "/apt/" from the parent directory if needed */
100bool pkgAcquire::CheckDirectory(string const &Parent, string const &Path) const
101{
102 if (DirectoryExists(Path) == true)
103 return true;
104
105 size_t const len = Parent.size();
9c2c9c24 106 if (len > 5 && Parent.find("/apt/", len - 6, 5) == len - 5)
1cd1c398
DK
107 {
108 if (CreateDirectory(Parent.substr(0,len-5), Path) == true)
109 return true;
110 }
111 else if (CreateDirectory(Parent, Path) == true)
112 return true;
113
9c2c9c24 114 return false;
0118833a
AL
115}
116 /*}}}*/
117// Acquire::~pkgAcquire - Destructor /*{{{*/
118// ---------------------------------------------------------------------
93bf083d 119/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
120pkgAcquire::~pkgAcquire()
121{
459681d3 122 Shutdown();
1cd1c398
DK
123
124 if (LockFD != -1)
125 close(LockFD);
126
3b5421b4
AL
127 while (Configs != 0)
128 {
129 MethodConfig *Jnk = Configs;
130 Configs = Configs->Next;
131 delete Jnk;
132 }
281daf46
AL
133}
134 /*}}}*/
8e5fc8f5 135// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
136// ---------------------------------------------------------------------
137/* */
138void pkgAcquire::Shutdown()
139{
140 while (Items.size() != 0)
1b480911
AL
141 {
142 if (Items[0]->Status == Item::StatFetching)
143 Items[0]->Status = Item::StatError;
281daf46 144 delete Items[0];
1b480911 145 }
0a8a80e5
AL
146
147 while (Queues != 0)
148 {
149 Queue *Jnk = Queues;
150 Queues = Queues->Next;
151 delete Jnk;
152 }
0118833a
AL
153}
154 /*}}}*/
155// Acquire::Add - Add a new item /*{{{*/
156// ---------------------------------------------------------------------
93bf083d
AL
157/* This puts an item on the acquire list. This list is mainly for tracking
158 item status */
0118833a
AL
159void pkgAcquire::Add(Item *Itm)
160{
161 Items.push_back(Itm);
162}
163 /*}}}*/
164// Acquire::Remove - Remove a item /*{{{*/
165// ---------------------------------------------------------------------
93bf083d 166/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
167void pkgAcquire::Remove(Item *Itm)
168{
a3eaf954
AL
169 Dequeue(Itm);
170
753b3525 171 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
172 {
173 if (*I == Itm)
b4fc9b6f 174 {
0118833a 175 Items.erase(I);
b4fc9b6f
AL
176 I = Items.begin();
177 }
753b3525
AL
178 else
179 I++;
8267fe24 180 }
0118833a
AL
181}
182 /*}}}*/
0a8a80e5
AL
183// Acquire::Add - Add a worker /*{{{*/
184// ---------------------------------------------------------------------
93bf083d
AL
185/* A list of workers is kept so that the select loop can direct their FD
186 usage. */
0a8a80e5
AL
187void pkgAcquire::Add(Worker *Work)
188{
189 Work->NextAcquire = Workers;
190 Workers = Work;
191}
192 /*}}}*/
193// Acquire::Remove - Remove a worker /*{{{*/
194// ---------------------------------------------------------------------
93bf083d
AL
195/* A worker has died. This can not be done while the select loop is running
196 as it would require that RunFds could handling a changing list state and
197 it cant.. */
0a8a80e5
AL
198void pkgAcquire::Remove(Worker *Work)
199{
93bf083d
AL
200 if (Running == true)
201 abort();
202
0a8a80e5
AL
203 Worker **I = &Workers;
204 for (; *I != 0;)
205 {
206 if (*I == Work)
207 *I = (*I)->NextAcquire;
208 else
209 I = &(*I)->NextAcquire;
210 }
211}
212 /*}}}*/
0118833a
AL
213// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
214// ---------------------------------------------------------------------
93bf083d 215/* This is the entry point for an item. An item calls this function when
281daf46 216 it is constructed which creates a queue (based on the current queue
93bf083d
AL
217 mode) and puts the item in that queue. If the system is running then
218 the queue might be started. */
8267fe24 219void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 220{
0a8a80e5 221 // Determine which queue to put the item in
e331f6ed
AL
222 const MethodConfig *Config;
223 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
224 if (Name.empty() == true)
225 return;
226
227 // Find the queue structure
228 Queue *I = Queues;
229 for (; I != 0 && I->Name != Name; I = I->Next);
230 if (I == 0)
231 {
232 I = new Queue(Name,this);
233 I->Next = Queues;
234 Queues = I;
93bf083d
AL
235
236 if (Running == true)
237 I->Startup();
0a8a80e5 238 }
bfd22fc0 239
e331f6ed
AL
240 // See if this is a local only URI
241 if (Config->LocalOnly == true && Item.Owner->Complete == false)
242 Item.Owner->Local = true;
8267fe24 243 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
244
245 // Queue it into the named queue
c03462c6
MV
246 if(I->Enqueue(Item))
247 ToFetch++;
248
0a8a80e5
AL
249 // Some trace stuff
250 if (Debug == true)
251 {
8267fe24
AL
252 clog << "Fetching " << Item.URI << endl;
253 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 254 clog << " Queue is: " << Name << endl;
0a8a80e5 255 }
3b5421b4
AL
256}
257 /*}}}*/
0a8a80e5 258// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 259// ---------------------------------------------------------------------
93bf083d
AL
260/* This is called when an item is finished being fetched. It removes it
261 from all the queues */
0a8a80e5
AL
262void pkgAcquire::Dequeue(Item *Itm)
263{
264 Queue *I = Queues;
bfd22fc0 265 bool Res = false;
0a8a80e5 266 for (; I != 0; I = I->Next)
bfd22fc0 267 Res |= I->Dequeue(Itm);
93bf083d
AL
268
269 if (Debug == true)
270 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
271 if (Res == true)
272 ToFetch--;
0a8a80e5
AL
273}
274 /*}}}*/
275// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
276// ---------------------------------------------------------------------
277/* The string returned depends on the configuration settings and the
278 method parameters. Given something like http://foo.org/bar it can
279 return http://foo.org or http */
e331f6ed 280string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 281{
93bf083d
AL
282 URI U(Uri);
283
e331f6ed 284 Config = GetConfig(U.Access);
0a8a80e5
AL
285 if (Config == 0)
286 return string();
287
288 /* Single-Instance methods get exactly one queue per URI. This is
289 also used for the Access queue method */
290 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 291 return U.Access;
93bf083d
AL
292
293 return U.Access + ':' + U.Host;
0118833a
AL
294}
295 /*}}}*/
3b5421b4
AL
296// Acquire::GetConfig - Fetch the configuration information /*{{{*/
297// ---------------------------------------------------------------------
298/* This locates the configuration structure for an access method. If
299 a config structure cannot be found a Worker will be created to
300 retrieve it */
0a8a80e5 301pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
302{
303 // Search for an existing config
304 MethodConfig *Conf;
305 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
306 if (Conf->Access == Access)
307 return Conf;
308
309 // Create the new config class
310 Conf = new MethodConfig;
311 Conf->Access = Access;
312 Conf->Next = Configs;
313 Configs = Conf;
0118833a 314
3b5421b4
AL
315 // Create the worker to fetch the configuration
316 Worker Work(Conf);
317 if (Work.Start() == false)
318 return 0;
7c6e2dc7
MV
319
320 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 321 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
322 Conf->SingleInstance = true;
323
3b5421b4
AL
324 return Conf;
325}
326 /*}}}*/
0a8a80e5
AL
327// Acquire::SetFds - Deal with readable FDs /*{{{*/
328// ---------------------------------------------------------------------
329/* Collect FDs that have activity monitors into the fd sets */
330void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
331{
332 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
333 {
334 if (I->InReady == true && I->InFd >= 0)
335 {
336 if (Fd < I->InFd)
337 Fd = I->InFd;
338 FD_SET(I->InFd,RSet);
339 }
340 if (I->OutReady == true && I->OutFd >= 0)
341 {
342 if (Fd < I->OutFd)
343 Fd = I->OutFd;
344 FD_SET(I->OutFd,WSet);
345 }
346 }
347}
348 /*}}}*/
349// Acquire::RunFds - Deal with active FDs /*{{{*/
350// ---------------------------------------------------------------------
93bf083d
AL
351/* Dispatch active FDs over to the proper workers. It is very important
352 that a worker never be erased while this is running! The queue class
353 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
354void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
355{
356 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
357 {
358 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
359 I->InFdReady();
360 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
361 I->OutFdReady();
362 }
363}
364 /*}}}*/
365// Acquire::Run - Run the fetch sequence /*{{{*/
366// ---------------------------------------------------------------------
367/* This runs the queues. It manages a select loop for all of the
368 Worker tasks. The workers interact with the queues and items to
369 manage the actual fetch. */
1c5f7e5f 370pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 371{
8b89e57f
AL
372 Running = true;
373
0a8a80e5
AL
374 for (Queue *I = Queues; I != 0; I = I->Next)
375 I->Startup();
376
b98f2859
AL
377 if (Log != 0)
378 Log->Start();
379
024d1123
AL
380 bool WasCancelled = false;
381
0a8a80e5 382 // Run till all things have been acquired
8267fe24
AL
383 struct timeval tv;
384 tv.tv_sec = 0;
1c5f7e5f 385 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
386 while (ToFetch > 0)
387 {
388 fd_set RFds;
389 fd_set WFds;
390 int Highest = 0;
391 FD_ZERO(&RFds);
392 FD_ZERO(&WFds);
393 SetFds(Highest,&RFds,&WFds);
394
b0db36b1
AL
395 int Res;
396 do
397 {
398 Res = select(Highest+1,&RFds,&WFds,0,&tv);
399 }
400 while (Res < 0 && errno == EINTR);
401
8267fe24 402 if (Res < 0)
8b89e57f 403 {
8267fe24
AL
404 _error->Errno("select","Select has failed");
405 break;
8b89e57f 406 }
93bf083d 407
0a8a80e5 408 RunFds(&RFds,&WFds);
93bf083d
AL
409 if (_error->PendingError() == true)
410 break;
8267fe24
AL
411
412 // Timeout, notify the log class
413 if (Res == 0 || (Log != 0 && Log->Update == true))
414 {
1c5f7e5f 415 tv.tv_usec = PulseIntervall;
8267fe24
AL
416 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
417 I->Pulse();
024d1123
AL
418 if (Log != 0 && Log->Pulse(this) == false)
419 {
420 WasCancelled = true;
421 break;
422 }
8267fe24 423 }
0a8a80e5 424 }
be4401bf 425
b98f2859
AL
426 if (Log != 0)
427 Log->Stop();
428
be4401bf
AL
429 // Shut down the acquire bits
430 Running = false;
0a8a80e5 431 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 432 I->Shutdown(false);
0a8a80e5 433
ab559b35 434 // Shut down the items
b4fc9b6f 435 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 436 (*I)->Finished();
ab559b35 437
024d1123
AL
438 if (_error->PendingError())
439 return Failed;
440 if (WasCancelled)
441 return Cancelled;
442 return Continue;
93bf083d
AL
443}
444 /*}}}*/
be4401bf 445// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
446// ---------------------------------------------------------------------
447/* This routine bumps idle queues in hopes that they will be able to fetch
448 the dequeued item */
449void pkgAcquire::Bump()
450{
be4401bf
AL
451 for (Queue *I = Queues; I != 0; I = I->Next)
452 I->Bump();
0a8a80e5
AL
453}
454 /*}}}*/
8267fe24
AL
455// Acquire::WorkerStep - Step to the next worker /*{{{*/
456// ---------------------------------------------------------------------
457/* Not inlined to advoid including acquire-worker.h */
458pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
459{
460 return I->NextAcquire;
461};
462 /*}}}*/
a6568219 463// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
464// ---------------------------------------------------------------------
465/* This is a bit simplistic, it looks at every file in the dir and sees
466 if it is part of the download set. */
467bool pkgAcquire::Clean(string Dir)
468{
469 DIR *D = opendir(Dir.c_str());
470 if (D == 0)
b2e465d6 471 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
472
473 string StartDir = SafeGetCWD();
474 if (chdir(Dir.c_str()) != 0)
475 {
476 closedir(D);
b2e465d6 477 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
478 }
479
480 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
481 {
482 // Skip some files..
483 if (strcmp(Dir->d_name,"lock") == 0 ||
484 strcmp(Dir->d_name,"partial") == 0 ||
485 strcmp(Dir->d_name,".") == 0 ||
486 strcmp(Dir->d_name,"..") == 0)
487 continue;
488
489 // Look in the get list
b4fc9b6f 490 ItemCIterator I = Items.begin();
7a7fa5f0
AL
491 for (; I != Items.end(); I++)
492 if (flNotDir((*I)->DestFile) == Dir->d_name)
493 break;
494
495 // Nothing found, nuke it
496 if (I == Items.end())
497 unlink(Dir->d_name);
498 };
499
7a7fa5f0 500 closedir(D);
3c8cda8b
MV
501 if (chdir(StartDir.c_str()) != 0)
502 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
503 return true;
504}
505 /*}}}*/
a6568219
AL
506// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
507// ---------------------------------------------------------------------
508/* This is the total number of bytes needed */
a3c4c81a 509unsigned long long pkgAcquire::TotalNeeded()
a6568219 510{
a3c4c81a 511 unsigned long long Total = 0;
b4fc9b6f 512 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
513 Total += (*I)->FileSize;
514 return Total;
515}
516 /*}}}*/
517// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
518// ---------------------------------------------------------------------
519/* This is the number of bytes that is not local */
a3c4c81a 520unsigned long long pkgAcquire::FetchNeeded()
a6568219 521{
a3c4c81a 522 unsigned long long Total = 0;
b4fc9b6f 523 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
524 if ((*I)->Local == false)
525 Total += (*I)->FileSize;
526 return Total;
527}
528 /*}}}*/
6b1ff003
AL
529// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
530// ---------------------------------------------------------------------
531/* This is the number of bytes that is not local */
a3c4c81a 532unsigned long long pkgAcquire::PartialPresent()
6b1ff003 533{
a3c4c81a 534 unsigned long long Total = 0;
b4fc9b6f 535 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
536 if ((*I)->Local == false)
537 Total += (*I)->PartialSize;
538 return Total;
539}
92fcbfc1 540 /*}}}*/
8e5fc8f5 541// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
542// ---------------------------------------------------------------------
543/* */
544pkgAcquire::UriIterator pkgAcquire::UriBegin()
545{
546 return UriIterator(Queues);
547}
548 /*}}}*/
8e5fc8f5 549// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
550// ---------------------------------------------------------------------
551/* */
552pkgAcquire::UriIterator pkgAcquire::UriEnd()
553{
554 return UriIterator(0);
555}
556 /*}}}*/
e331f6ed
AL
557// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
558// ---------------------------------------------------------------------
559/* */
560pkgAcquire::MethodConfig::MethodConfig()
561{
562 SingleInstance = false;
e331f6ed
AL
563 Pipeline = false;
564 SendConfig = false;
565 LocalOnly = false;
459681d3 566 Removable = false;
e331f6ed
AL
567 Next = 0;
568}
569 /*}}}*/
0a8a80e5
AL
570// Queue::Queue - Constructor /*{{{*/
571// ---------------------------------------------------------------------
572/* */
573pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
574 Owner(Owner)
575{
576 Items = 0;
577 Next = 0;
578 Workers = 0;
b185acc2
AL
579 MaxPipeDepth = 1;
580 PipeDepth = 0;
0a8a80e5
AL
581}
582 /*}}}*/
583// Queue::~Queue - Destructor /*{{{*/
584// ---------------------------------------------------------------------
585/* */
586pkgAcquire::Queue::~Queue()
587{
8e5fc8f5 588 Shutdown(true);
0a8a80e5
AL
589
590 while (Items != 0)
591 {
592 QItem *Jnk = Items;
593 Items = Items->Next;
594 delete Jnk;
595 }
596}
597 /*}}}*/
598// Queue::Enqueue - Queue an item to the queue /*{{{*/
599// ---------------------------------------------------------------------
600/* */
c03462c6 601bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 602{
7a1b1f8b 603 QItem **I = &Items;
c03462c6
MV
604 // move to the end of the queue and check for duplicates here
605 for (; *I != 0; I = &(*I)->Next)
606 if (Item.URI == (*I)->URI)
607 {
608 Item.Owner->Status = Item::StatDone;
609 return false;
610 }
611
0a8a80e5 612 // Create a new item
7a1b1f8b
AL
613 QItem *Itm = new QItem;
614 *Itm = Item;
615 Itm->Next = 0;
616 *I = Itm;
0a8a80e5 617
8267fe24 618 Item.Owner->QueueCounter++;
93bf083d
AL
619 if (Items->Next == 0)
620 Cycle();
c03462c6 621 return true;
0a8a80e5
AL
622}
623 /*}}}*/
c88edf1d 624// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 625// ---------------------------------------------------------------------
b185acc2 626/* We return true if we hit something */
bfd22fc0 627bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 628{
b185acc2
AL
629 if (Owner->Status == pkgAcquire::Item::StatFetching)
630 return _error->Error("Tried to dequeue a fetching object");
631
bfd22fc0
AL
632 bool Res = false;
633
0a8a80e5
AL
634 QItem **I = &Items;
635 for (; *I != 0;)
636 {
637 if ((*I)->Owner == Owner)
638 {
639 QItem *Jnk= *I;
640 *I = (*I)->Next;
641 Owner->QueueCounter--;
642 delete Jnk;
bfd22fc0 643 Res = true;
0a8a80e5
AL
644 }
645 else
646 I = &(*I)->Next;
647 }
bfd22fc0
AL
648
649 return Res;
0a8a80e5
AL
650}
651 /*}}}*/
652// Queue::Startup - Start the worker processes /*{{{*/
653// ---------------------------------------------------------------------
8e5fc8f5
AL
654/* It is possible for this to be called with a pre-existing set of
655 workers. */
0a8a80e5
AL
656bool pkgAcquire::Queue::Startup()
657{
8e5fc8f5
AL
658 if (Workers == 0)
659 {
660 URI U(Name);
661 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
662 if (Cnf == 0)
663 return false;
664
665 Workers = new Worker(this,Cnf,Owner->Log);
666 Owner->Add(Workers);
667 if (Workers->Start() == false)
668 return false;
669
670 /* When pipelining we commit 10 items. This needs to change when we
671 added other source retry to have cycle maintain a pipeline depth
672 on its own. */
673 if (Cnf->Pipeline == true)
6ce72612 674 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
675 else
676 MaxPipeDepth = 1;
677 }
5cb5d8dc 678
93bf083d 679 return Cycle();
0a8a80e5
AL
680}
681 /*}}}*/
682// Queue::Shutdown - Shutdown the worker processes /*{{{*/
683// ---------------------------------------------------------------------
8e5fc8f5
AL
684/* If final is true then all workers are eliminated, otherwise only workers
685 that do not need cleanup are removed */
686bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
687{
688 // Delete all of the workers
8e5fc8f5
AL
689 pkgAcquire::Worker **Cur = &Workers;
690 while (*Cur != 0)
0a8a80e5 691 {
8e5fc8f5
AL
692 pkgAcquire::Worker *Jnk = *Cur;
693 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
694 {
695 *Cur = Jnk->NextQueue;
696 Owner->Remove(Jnk);
697 delete Jnk;
698 }
699 else
700 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
701 }
702
703 return true;
3b5421b4
AL
704}
705 /*}}}*/
7d8afa39 706// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
707// ---------------------------------------------------------------------
708/* */
709pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
710{
711 for (QItem *I = Items; I != 0; I = I->Next)
712 if (I->URI == URI && I->Worker == Owner)
713 return I;
714 return 0;
715}
716 /*}}}*/
717// Queue::ItemDone - Item has been completed /*{{{*/
718// ---------------------------------------------------------------------
719/* The worker signals this which causes the item to be removed from the
93bf083d
AL
720 queue. If this is the last queue instance then it is removed from the
721 main queue too.*/
c88edf1d
AL
722bool pkgAcquire::Queue::ItemDone(QItem *Itm)
723{
b185acc2 724 PipeDepth--;
db890fdb
AL
725 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
726 Itm->Owner->Status = pkgAcquire::Item::StatDone;
727
93bf083d
AL
728 if (Itm->Owner->QueueCounter <= 1)
729 Owner->Dequeue(Itm->Owner);
730 else
731 {
732 Dequeue(Itm->Owner);
733 Owner->Bump();
734 }
c88edf1d 735
93bf083d
AL
736 return Cycle();
737}
738 /*}}}*/
739// Queue::Cycle - Queue new items into the method /*{{{*/
740// ---------------------------------------------------------------------
b185acc2
AL
741/* This locates a new idle item and sends it to the worker. If pipelining
742 is enabled then it keeps the pipe full. */
93bf083d
AL
743bool pkgAcquire::Queue::Cycle()
744{
745 if (Items == 0 || Workers == 0)
c88edf1d
AL
746 return true;
747
e7432370
AL
748 if (PipeDepth < 0)
749 return _error->Error("Pipedepth failure");
750
93bf083d
AL
751 // Look for a queable item
752 QItem *I = Items;
e7432370 753 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
754 {
755 for (; I != 0; I = I->Next)
756 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
757 break;
758
759 // Nothing to do, queue is idle.
760 if (I == 0)
761 return true;
762
763 I->Worker = Workers;
764 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 765 PipeDepth++;
b185acc2
AL
766 if (Workers->QueueItem(I) == false)
767 return false;
768 }
93bf083d 769
b185acc2 770 return true;
c88edf1d
AL
771}
772 /*}}}*/
be4401bf
AL
773// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
774// ---------------------------------------------------------------------
b185acc2 775/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
776void pkgAcquire::Queue::Bump()
777{
b185acc2 778 Cycle();
be4401bf
AL
779}
780 /*}}}*/
b98f2859
AL
781// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
782// ---------------------------------------------------------------------
783/* */
c5ccf175 784pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
785{
786 Start();
787}
788 /*}}}*/
789// AcquireStatus::Pulse - Called periodically /*{{{*/
790// ---------------------------------------------------------------------
791/* This computes some internal state variables for the derived classes to
792 use. It generates the current downloaded bytes and total bytes to download
793 as well as the current CPS estimate. */
024d1123 794bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
795{
796 TotalBytes = 0;
797 CurrentBytes = 0;
d568ed2d
AL
798 TotalItems = 0;
799 CurrentItems = 0;
b98f2859
AL
800
801 // Compute the total number of bytes to fetch
802 unsigned int Unknown = 0;
803 unsigned int Count = 0;
b4fc9b6f 804 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
805 I++, Count++)
806 {
d568ed2d
AL
807 TotalItems++;
808 if ((*I)->Status == pkgAcquire::Item::StatDone)
809 CurrentItems++;
810
a6568219
AL
811 // Totally ignore local items
812 if ((*I)->Local == true)
813 continue;
b2e465d6 814
b98f2859
AL
815 TotalBytes += (*I)->FileSize;
816 if ((*I)->Complete == true)
817 CurrentBytes += (*I)->FileSize;
818 if ((*I)->FileSize == 0 && (*I)->Complete == false)
819 Unknown++;
820 }
821
822 // Compute the current completion
aa0e1101 823 unsigned long ResumeSize = 0;
b98f2859
AL
824 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
825 I = Owner->WorkerStep(I))
826 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
827 {
828 CurrentBytes += I->CurrentSize;
829 ResumeSize += I->ResumePoint;
830
831 // Files with unknown size always have 100% completion
832 if (I->CurrentItem->Owner->FileSize == 0 &&
833 I->CurrentItem->Owner->Complete == false)
834 TotalBytes += I->CurrentSize;
835 }
836
b98f2859
AL
837 // Normalize the figures and account for unknown size downloads
838 if (TotalBytes <= 0)
839 TotalBytes = 1;
840 if (Unknown == Count)
841 TotalBytes = Unknown;
18ef0a78
AL
842
843 // Wha?! Is not supposed to happen.
844 if (CurrentBytes > TotalBytes)
845 CurrentBytes = TotalBytes;
b98f2859
AL
846
847 // Compute the CPS
848 struct timeval NewTime;
849 gettimeofday(&NewTime,0);
2ec1674d 850 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
851 NewTime.tv_sec - Time.tv_sec > 6)
852 {
f17ac097
AL
853 double Delta = NewTime.tv_sec - Time.tv_sec +
854 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 855
b98f2859 856 // Compute the CPS value
f17ac097 857 if (Delta < 0.01)
e331f6ed
AL
858 CurrentCPS = 0;
859 else
aa0e1101
AL
860 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
861 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 862 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
863 Time = NewTime;
864 }
024d1123 865
75ef8f14
MV
866 int fd = _config->FindI("APT::Status-Fd",-1);
867 if(fd > 0)
868 {
869 ostringstream status;
870
871 char msg[200];
872 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
873 unsigned long ETA =
874 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
875
1e8b4c0f
MV
876 // only show the ETA if it makes sense
877 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 878 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 879 else
0c508b03 880 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
881
882
75ef8f14
MV
883
884 // build the status str
885 status << "dlstatus:" << i
886 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
887 << ":" << msg
888 << endl;
889 write(fd, status.str().c_str(), status.str().size());
890 }
891
024d1123 892 return true;
b98f2859
AL
893}
894 /*}}}*/
895// AcquireStatus::Start - Called when the download is started /*{{{*/
896// ---------------------------------------------------------------------
897/* We just reset the counters */
898void pkgAcquireStatus::Start()
899{
900 gettimeofday(&Time,0);
901 gettimeofday(&StartTime,0);
902 LastBytes = 0;
903 CurrentCPS = 0;
904 CurrentBytes = 0;
905 TotalBytes = 0;
906 FetchedBytes = 0;
907 ElapsedTime = 0;
d568ed2d
AL
908 TotalItems = 0;
909 CurrentItems = 0;
b98f2859
AL
910}
911 /*}}}*/
a6568219 912// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
913// ---------------------------------------------------------------------
914/* This accurately computes the elapsed time and the total overall CPS. */
915void pkgAcquireStatus::Stop()
916{
917 // Compute the CPS and elapsed time
918 struct timeval NewTime;
919 gettimeofday(&NewTime,0);
920
31a0531d
AL
921 double Delta = NewTime.tv_sec - StartTime.tv_sec +
922 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 923
b98f2859 924 // Compute the CPS value
31a0531d 925 if (Delta < 0.01)
e331f6ed
AL
926 CurrentCPS = 0;
927 else
31a0531d 928 CurrentCPS = FetchedBytes/Delta;
b98f2859 929 LastBytes = CurrentBytes;
31a0531d 930 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
931}
932 /*}}}*/
933// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
934// ---------------------------------------------------------------------
935/* This is used to get accurate final transfer rate reporting. */
936void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 937{
b98f2859
AL
938 FetchedBytes += Size - Resume;
939}
940 /*}}}*/