* apt-pkg/acquire-item.cc:
[ntk/apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
0118833a
AL
16#include <apt-pkg/acquire.h>
17#include <apt-pkg/acquire-item.h>
18#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
19#include <apt-pkg/configuration.h>
20#include <apt-pkg/error.h>
cdcc6d34 21#include <apt-pkg/strutl.h>
8267fe24 22
b2e465d6 23#include <apti18n.h>
b4fc9b6f
AL
24
25#include <iostream>
75ef8f14 26#include <sstream>
b2e465d6 27
7a7fa5f0 28#include <dirent.h>
8267fe24 29#include <sys/time.h>
524f8105 30#include <errno.h>
5f3edc0f 31#include <sys/stat.h>
0118833a
AL
32 /*}}}*/
33
b4fc9b6f
AL
34using namespace std;
35
0118833a
AL
36// Acquire::pkgAcquire - Constructor /*{{{*/
37// ---------------------------------------------------------------------
93bf083d 38/* We grab some runtime state from the configuration space */
8267fe24 39pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
40{
41 Queues = 0;
42 Configs = 0;
0a8a80e5
AL
43 Workers = 0;
44 ToFetch = 0;
8b89e57f 45 Running = false;
0a8a80e5
AL
46
47 string Mode = _config->Find("Acquire::Queue-Mode","host");
48 if (strcasecmp(Mode.c_str(),"host") == 0)
49 QueueMode = QueueHost;
50 if (strcasecmp(Mode.c_str(),"access") == 0)
51 QueueMode = QueueAccess;
52
53 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 54
0c55d1d2 55 // This is really a stupid place for this
5f3edc0f
AL
56 struct stat St;
57 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
58 S_ISDIR(St.st_mode) == 0)
b2e465d6 59 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
60 _config->FindDir("Dir::State::lists").c_str());
61 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
62 S_ISDIR(St.st_mode) == 0)
b2e465d6 63 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 64 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
65}
66 /*}}}*/
67// Acquire::~pkgAcquire - Destructor /*{{{*/
68// ---------------------------------------------------------------------
93bf083d 69/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
70pkgAcquire::~pkgAcquire()
71{
459681d3
AL
72 Shutdown();
73
3b5421b4
AL
74 while (Configs != 0)
75 {
76 MethodConfig *Jnk = Configs;
77 Configs = Configs->Next;
78 delete Jnk;
79 }
281daf46
AL
80}
81 /*}}}*/
8e5fc8f5 82// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
83// ---------------------------------------------------------------------
84/* */
85void pkgAcquire::Shutdown()
86{
87 while (Items.size() != 0)
1b480911
AL
88 {
89 if (Items[0]->Status == Item::StatFetching)
90 Items[0]->Status = Item::StatError;
281daf46 91 delete Items[0];
1b480911 92 }
0a8a80e5
AL
93
94 while (Queues != 0)
95 {
96 Queue *Jnk = Queues;
97 Queues = Queues->Next;
98 delete Jnk;
99 }
0118833a
AL
100}
101 /*}}}*/
102// Acquire::Add - Add a new item /*{{{*/
103// ---------------------------------------------------------------------
93bf083d
AL
104/* This puts an item on the acquire list. This list is mainly for tracking
105 item status */
0118833a
AL
106void pkgAcquire::Add(Item *Itm)
107{
108 Items.push_back(Itm);
109}
110 /*}}}*/
111// Acquire::Remove - Remove a item /*{{{*/
112// ---------------------------------------------------------------------
93bf083d 113/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
114void pkgAcquire::Remove(Item *Itm)
115{
a3eaf954
AL
116 Dequeue(Itm);
117
753b3525 118 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
119 {
120 if (*I == Itm)
b4fc9b6f 121 {
0118833a 122 Items.erase(I);
b4fc9b6f
AL
123 I = Items.begin();
124 }
753b3525
AL
125 else
126 I++;
8267fe24 127 }
0118833a
AL
128}
129 /*}}}*/
0a8a80e5
AL
130// Acquire::Add - Add a worker /*{{{*/
131// ---------------------------------------------------------------------
93bf083d
AL
132/* A list of workers is kept so that the select loop can direct their FD
133 usage. */
0a8a80e5
AL
134void pkgAcquire::Add(Worker *Work)
135{
136 Work->NextAcquire = Workers;
137 Workers = Work;
138}
139 /*}}}*/
140// Acquire::Remove - Remove a worker /*{{{*/
141// ---------------------------------------------------------------------
93bf083d
AL
142/* A worker has died. This can not be done while the select loop is running
143 as it would require that RunFds could handling a changing list state and
144 it cant.. */
0a8a80e5
AL
145void pkgAcquire::Remove(Worker *Work)
146{
93bf083d
AL
147 if (Running == true)
148 abort();
149
0a8a80e5
AL
150 Worker **I = &Workers;
151 for (; *I != 0;)
152 {
153 if (*I == Work)
154 *I = (*I)->NextAcquire;
155 else
156 I = &(*I)->NextAcquire;
157 }
158}
159 /*}}}*/
0118833a
AL
160// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
161// ---------------------------------------------------------------------
93bf083d 162/* This is the entry point for an item. An item calls this function when
281daf46 163 it is constructed which creates a queue (based on the current queue
93bf083d
AL
164 mode) and puts the item in that queue. If the system is running then
165 the queue might be started. */
8267fe24 166void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 167{
0a8a80e5 168 // Determine which queue to put the item in
e331f6ed
AL
169 const MethodConfig *Config;
170 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
171 if (Name.empty() == true)
172 return;
173
174 // Find the queue structure
175 Queue *I = Queues;
176 for (; I != 0 && I->Name != Name; I = I->Next);
177 if (I == 0)
178 {
179 I = new Queue(Name,this);
180 I->Next = Queues;
181 Queues = I;
93bf083d
AL
182
183 if (Running == true)
184 I->Startup();
0a8a80e5 185 }
bfd22fc0 186
e331f6ed
AL
187 // See if this is a local only URI
188 if (Config->LocalOnly == true && Item.Owner->Complete == false)
189 Item.Owner->Local = true;
8267fe24 190 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
191
192 // Queue it into the named queue
c03462c6
MV
193 if(I->Enqueue(Item))
194 ToFetch++;
195
0a8a80e5
AL
196 // Some trace stuff
197 if (Debug == true)
198 {
8267fe24
AL
199 clog << "Fetching " << Item.URI << endl;
200 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 201 clog << " Queue is: " << Name << endl;
0a8a80e5 202 }
3b5421b4
AL
203}
204 /*}}}*/
0a8a80e5 205// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 206// ---------------------------------------------------------------------
93bf083d
AL
207/* This is called when an item is finished being fetched. It removes it
208 from all the queues */
0a8a80e5
AL
209void pkgAcquire::Dequeue(Item *Itm)
210{
211 Queue *I = Queues;
bfd22fc0 212 bool Res = false;
0a8a80e5 213 for (; I != 0; I = I->Next)
bfd22fc0 214 Res |= I->Dequeue(Itm);
93bf083d
AL
215
216 if (Debug == true)
217 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
218 if (Res == true)
219 ToFetch--;
0a8a80e5
AL
220}
221 /*}}}*/
222// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
223// ---------------------------------------------------------------------
224/* The string returned depends on the configuration settings and the
225 method parameters. Given something like http://foo.org/bar it can
226 return http://foo.org or http */
e331f6ed 227string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 228{
93bf083d
AL
229 URI U(Uri);
230
e331f6ed 231 Config = GetConfig(U.Access);
0a8a80e5
AL
232 if (Config == 0)
233 return string();
234
235 /* Single-Instance methods get exactly one queue per URI. This is
236 also used for the Access queue method */
237 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 238 return U.Access;
93bf083d
AL
239
240 return U.Access + ':' + U.Host;
0118833a
AL
241}
242 /*}}}*/
3b5421b4
AL
243// Acquire::GetConfig - Fetch the configuration information /*{{{*/
244// ---------------------------------------------------------------------
245/* This locates the configuration structure for an access method. If
246 a config structure cannot be found a Worker will be created to
247 retrieve it */
0a8a80e5 248pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
249{
250 // Search for an existing config
251 MethodConfig *Conf;
252 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
253 if (Conf->Access == Access)
254 return Conf;
255
256 // Create the new config class
257 Conf = new MethodConfig;
258 Conf->Access = Access;
259 Conf->Next = Configs;
260 Configs = Conf;
0118833a 261
3b5421b4
AL
262 // Create the worker to fetch the configuration
263 Worker Work(Conf);
264 if (Work.Start() == false)
265 return 0;
7c6e2dc7
MV
266
267 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
268 if(_config->FindI("Acquire::"+Access+"::DlLimit",0) > 0)
269 Conf->SingleInstance = true;
270
3b5421b4
AL
271 return Conf;
272}
273 /*}}}*/
0a8a80e5
AL
274// Acquire::SetFds - Deal with readable FDs /*{{{*/
275// ---------------------------------------------------------------------
276/* Collect FDs that have activity monitors into the fd sets */
277void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
278{
279 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
280 {
281 if (I->InReady == true && I->InFd >= 0)
282 {
283 if (Fd < I->InFd)
284 Fd = I->InFd;
285 FD_SET(I->InFd,RSet);
286 }
287 if (I->OutReady == true && I->OutFd >= 0)
288 {
289 if (Fd < I->OutFd)
290 Fd = I->OutFd;
291 FD_SET(I->OutFd,WSet);
292 }
293 }
294}
295 /*}}}*/
296// Acquire::RunFds - Deal with active FDs /*{{{*/
297// ---------------------------------------------------------------------
93bf083d
AL
298/* Dispatch active FDs over to the proper workers. It is very important
299 that a worker never be erased while this is running! The queue class
300 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
301void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
302{
303 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
304 {
305 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
306 I->InFdReady();
307 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
308 I->OutFdReady();
309 }
310}
311 /*}}}*/
312// Acquire::Run - Run the fetch sequence /*{{{*/
313// ---------------------------------------------------------------------
314/* This runs the queues. It manages a select loop for all of the
315 Worker tasks. The workers interact with the queues and items to
316 manage the actual fetch. */
1c5f7e5f 317pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 318{
8b89e57f
AL
319 Running = true;
320
0a8a80e5
AL
321 for (Queue *I = Queues; I != 0; I = I->Next)
322 I->Startup();
323
b98f2859
AL
324 if (Log != 0)
325 Log->Start();
326
024d1123
AL
327 bool WasCancelled = false;
328
0a8a80e5 329 // Run till all things have been acquired
8267fe24
AL
330 struct timeval tv;
331 tv.tv_sec = 0;
1c5f7e5f 332 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
333 while (ToFetch > 0)
334 {
335 fd_set RFds;
336 fd_set WFds;
337 int Highest = 0;
338 FD_ZERO(&RFds);
339 FD_ZERO(&WFds);
340 SetFds(Highest,&RFds,&WFds);
341
b0db36b1
AL
342 int Res;
343 do
344 {
345 Res = select(Highest+1,&RFds,&WFds,0,&tv);
346 }
347 while (Res < 0 && errno == EINTR);
348
8267fe24 349 if (Res < 0)
8b89e57f 350 {
8267fe24
AL
351 _error->Errno("select","Select has failed");
352 break;
8b89e57f 353 }
93bf083d 354
0a8a80e5 355 RunFds(&RFds,&WFds);
93bf083d
AL
356 if (_error->PendingError() == true)
357 break;
8267fe24
AL
358
359 // Timeout, notify the log class
360 if (Res == 0 || (Log != 0 && Log->Update == true))
361 {
1c5f7e5f 362 tv.tv_usec = PulseIntervall;
8267fe24
AL
363 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
364 I->Pulse();
024d1123
AL
365 if (Log != 0 && Log->Pulse(this) == false)
366 {
367 WasCancelled = true;
368 break;
369 }
8267fe24 370 }
0a8a80e5 371 }
be4401bf 372
b98f2859
AL
373 if (Log != 0)
374 Log->Stop();
375
be4401bf
AL
376 // Shut down the acquire bits
377 Running = false;
0a8a80e5 378 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 379 I->Shutdown(false);
0a8a80e5 380
ab559b35 381 // Shut down the items
b4fc9b6f 382 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 383 (*I)->Finished();
ab559b35 384
024d1123
AL
385 if (_error->PendingError())
386 return Failed;
387 if (WasCancelled)
388 return Cancelled;
389 return Continue;
93bf083d
AL
390}
391 /*}}}*/
be4401bf 392// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
393// ---------------------------------------------------------------------
394/* This routine bumps idle queues in hopes that they will be able to fetch
395 the dequeued item */
396void pkgAcquire::Bump()
397{
be4401bf
AL
398 for (Queue *I = Queues; I != 0; I = I->Next)
399 I->Bump();
0a8a80e5
AL
400}
401 /*}}}*/
8267fe24
AL
402// Acquire::WorkerStep - Step to the next worker /*{{{*/
403// ---------------------------------------------------------------------
404/* Not inlined to advoid including acquire-worker.h */
405pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
406{
407 return I->NextAcquire;
408};
409 /*}}}*/
a6568219 410// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
411// ---------------------------------------------------------------------
412/* This is a bit simplistic, it looks at every file in the dir and sees
413 if it is part of the download set. */
414bool pkgAcquire::Clean(string Dir)
415{
416 DIR *D = opendir(Dir.c_str());
417 if (D == 0)
b2e465d6 418 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
419
420 string StartDir = SafeGetCWD();
421 if (chdir(Dir.c_str()) != 0)
422 {
423 closedir(D);
b2e465d6 424 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
425 }
426
427 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
428 {
429 // Skip some files..
430 if (strcmp(Dir->d_name,"lock") == 0 ||
431 strcmp(Dir->d_name,"partial") == 0 ||
432 strcmp(Dir->d_name,".") == 0 ||
433 strcmp(Dir->d_name,"..") == 0)
434 continue;
435
436 // Look in the get list
b4fc9b6f 437 ItemCIterator I = Items.begin();
7a7fa5f0
AL
438 for (; I != Items.end(); I++)
439 if (flNotDir((*I)->DestFile) == Dir->d_name)
440 break;
441
442 // Nothing found, nuke it
443 if (I == Items.end())
444 unlink(Dir->d_name);
445 };
446
447 chdir(StartDir.c_str());
448 closedir(D);
449 return true;
450}
451 /*}}}*/
a6568219
AL
452// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
453// ---------------------------------------------------------------------
454/* This is the total number of bytes needed */
b2e465d6 455double pkgAcquire::TotalNeeded()
a6568219 456{
b2e465d6 457 double Total = 0;
b4fc9b6f 458 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
459 Total += (*I)->FileSize;
460 return Total;
461}
462 /*}}}*/
463// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
464// ---------------------------------------------------------------------
465/* This is the number of bytes that is not local */
b2e465d6 466double pkgAcquire::FetchNeeded()
a6568219 467{
b2e465d6 468 double Total = 0;
b4fc9b6f 469 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
470 if ((*I)->Local == false)
471 Total += (*I)->FileSize;
472 return Total;
473}
474 /*}}}*/
6b1ff003
AL
475// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
476// ---------------------------------------------------------------------
477/* This is the number of bytes that is not local */
b2e465d6 478double pkgAcquire::PartialPresent()
6b1ff003 479{
b2e465d6 480 double Total = 0;
b4fc9b6f 481 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
482 if ((*I)->Local == false)
483 Total += (*I)->PartialSize;
484 return Total;
485}
b3d44315 486
8e5fc8f5 487// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
488// ---------------------------------------------------------------------
489/* */
490pkgAcquire::UriIterator pkgAcquire::UriBegin()
491{
492 return UriIterator(Queues);
493}
494 /*}}}*/
8e5fc8f5 495// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
496// ---------------------------------------------------------------------
497/* */
498pkgAcquire::UriIterator pkgAcquire::UriEnd()
499{
500 return UriIterator(0);
501}
502 /*}}}*/
0a8a80e5 503
e331f6ed
AL
504// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
505// ---------------------------------------------------------------------
506/* */
507pkgAcquire::MethodConfig::MethodConfig()
508{
509 SingleInstance = false;
e331f6ed
AL
510 Pipeline = false;
511 SendConfig = false;
512 LocalOnly = false;
459681d3 513 Removable = false;
e331f6ed
AL
514 Next = 0;
515}
516 /*}}}*/
517
0a8a80e5
AL
518// Queue::Queue - Constructor /*{{{*/
519// ---------------------------------------------------------------------
520/* */
521pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
522 Owner(Owner)
523{
524 Items = 0;
525 Next = 0;
526 Workers = 0;
b185acc2
AL
527 MaxPipeDepth = 1;
528 PipeDepth = 0;
0a8a80e5
AL
529}
530 /*}}}*/
531// Queue::~Queue - Destructor /*{{{*/
532// ---------------------------------------------------------------------
533/* */
534pkgAcquire::Queue::~Queue()
535{
8e5fc8f5 536 Shutdown(true);
0a8a80e5
AL
537
538 while (Items != 0)
539 {
540 QItem *Jnk = Items;
541 Items = Items->Next;
542 delete Jnk;
543 }
544}
545 /*}}}*/
546// Queue::Enqueue - Queue an item to the queue /*{{{*/
547// ---------------------------------------------------------------------
548/* */
c03462c6 549bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 550{
7a1b1f8b 551 QItem **I = &Items;
c03462c6
MV
552 // move to the end of the queue and check for duplicates here
553 for (; *I != 0; I = &(*I)->Next)
554 if (Item.URI == (*I)->URI)
555 {
556 Item.Owner->Status = Item::StatDone;
557 return false;
558 }
559
0a8a80e5 560 // Create a new item
7a1b1f8b
AL
561 QItem *Itm = new QItem;
562 *Itm = Item;
563 Itm->Next = 0;
564 *I = Itm;
0a8a80e5 565
8267fe24 566 Item.Owner->QueueCounter++;
93bf083d
AL
567 if (Items->Next == 0)
568 Cycle();
c03462c6 569 return true;
0a8a80e5
AL
570}
571 /*}}}*/
c88edf1d 572// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 573// ---------------------------------------------------------------------
b185acc2 574/* We return true if we hit something */
bfd22fc0 575bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 576{
b185acc2
AL
577 if (Owner->Status == pkgAcquire::Item::StatFetching)
578 return _error->Error("Tried to dequeue a fetching object");
579
bfd22fc0
AL
580 bool Res = false;
581
0a8a80e5
AL
582 QItem **I = &Items;
583 for (; *I != 0;)
584 {
585 if ((*I)->Owner == Owner)
586 {
587 QItem *Jnk= *I;
588 *I = (*I)->Next;
589 Owner->QueueCounter--;
590 delete Jnk;
bfd22fc0 591 Res = true;
0a8a80e5
AL
592 }
593 else
594 I = &(*I)->Next;
595 }
bfd22fc0
AL
596
597 return Res;
0a8a80e5
AL
598}
599 /*}}}*/
600// Queue::Startup - Start the worker processes /*{{{*/
601// ---------------------------------------------------------------------
8e5fc8f5
AL
602/* It is possible for this to be called with a pre-existing set of
603 workers. */
0a8a80e5
AL
604bool pkgAcquire::Queue::Startup()
605{
8e5fc8f5
AL
606 if (Workers == 0)
607 {
608 URI U(Name);
609 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
610 if (Cnf == 0)
611 return false;
612
613 Workers = new Worker(this,Cnf,Owner->Log);
614 Owner->Add(Workers);
615 if (Workers->Start() == false)
616 return false;
617
618 /* When pipelining we commit 10 items. This needs to change when we
619 added other source retry to have cycle maintain a pipeline depth
620 on its own. */
621 if (Cnf->Pipeline == true)
7a59dff6 622 MaxPipeDepth = 1000;
8e5fc8f5
AL
623 else
624 MaxPipeDepth = 1;
625 }
5cb5d8dc 626
93bf083d 627 return Cycle();
0a8a80e5
AL
628}
629 /*}}}*/
630// Queue::Shutdown - Shutdown the worker processes /*{{{*/
631// ---------------------------------------------------------------------
8e5fc8f5
AL
632/* If final is true then all workers are eliminated, otherwise only workers
633 that do not need cleanup are removed */
634bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
635{
636 // Delete all of the workers
8e5fc8f5
AL
637 pkgAcquire::Worker **Cur = &Workers;
638 while (*Cur != 0)
0a8a80e5 639 {
8e5fc8f5
AL
640 pkgAcquire::Worker *Jnk = *Cur;
641 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
642 {
643 *Cur = Jnk->NextQueue;
644 Owner->Remove(Jnk);
645 delete Jnk;
646 }
647 else
648 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
649 }
650
651 return true;
3b5421b4
AL
652}
653 /*}}}*/
7d8afa39 654// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
655// ---------------------------------------------------------------------
656/* */
657pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
658{
659 for (QItem *I = Items; I != 0; I = I->Next)
660 if (I->URI == URI && I->Worker == Owner)
661 return I;
662 return 0;
663}
664 /*}}}*/
665// Queue::ItemDone - Item has been completed /*{{{*/
666// ---------------------------------------------------------------------
667/* The worker signals this which causes the item to be removed from the
93bf083d
AL
668 queue. If this is the last queue instance then it is removed from the
669 main queue too.*/
c88edf1d
AL
670bool pkgAcquire::Queue::ItemDone(QItem *Itm)
671{
b185acc2 672 PipeDepth--;
db890fdb
AL
673 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
674 Itm->Owner->Status = pkgAcquire::Item::StatDone;
675
93bf083d
AL
676 if (Itm->Owner->QueueCounter <= 1)
677 Owner->Dequeue(Itm->Owner);
678 else
679 {
680 Dequeue(Itm->Owner);
681 Owner->Bump();
682 }
c88edf1d 683
93bf083d
AL
684 return Cycle();
685}
686 /*}}}*/
687// Queue::Cycle - Queue new items into the method /*{{{*/
688// ---------------------------------------------------------------------
b185acc2
AL
689/* This locates a new idle item and sends it to the worker. If pipelining
690 is enabled then it keeps the pipe full. */
93bf083d
AL
691bool pkgAcquire::Queue::Cycle()
692{
693 if (Items == 0 || Workers == 0)
c88edf1d
AL
694 return true;
695
e7432370
AL
696 if (PipeDepth < 0)
697 return _error->Error("Pipedepth failure");
698
93bf083d
AL
699 // Look for a queable item
700 QItem *I = Items;
e7432370 701 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
702 {
703 for (; I != 0; I = I->Next)
704 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
705 break;
706
707 // Nothing to do, queue is idle.
708 if (I == 0)
709 return true;
710
711 I->Worker = Workers;
712 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 713 PipeDepth++;
b185acc2
AL
714 if (Workers->QueueItem(I) == false)
715 return false;
716 }
93bf083d 717
b185acc2 718 return true;
c88edf1d
AL
719}
720 /*}}}*/
be4401bf
AL
721// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
722// ---------------------------------------------------------------------
b185acc2 723/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
724void pkgAcquire::Queue::Bump()
725{
b185acc2 726 Cycle();
be4401bf
AL
727}
728 /*}}}*/
b98f2859
AL
729
730// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
731// ---------------------------------------------------------------------
732/* */
c5ccf175 733pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
734{
735 Start();
736}
737 /*}}}*/
738// AcquireStatus::Pulse - Called periodically /*{{{*/
739// ---------------------------------------------------------------------
740/* This computes some internal state variables for the derived classes to
741 use. It generates the current downloaded bytes and total bytes to download
742 as well as the current CPS estimate. */
024d1123 743bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
744{
745 TotalBytes = 0;
746 CurrentBytes = 0;
d568ed2d
AL
747 TotalItems = 0;
748 CurrentItems = 0;
b98f2859
AL
749
750 // Compute the total number of bytes to fetch
751 unsigned int Unknown = 0;
752 unsigned int Count = 0;
b4fc9b6f 753 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
754 I++, Count++)
755 {
d568ed2d
AL
756 TotalItems++;
757 if ((*I)->Status == pkgAcquire::Item::StatDone)
758 CurrentItems++;
759
a6568219
AL
760 // Totally ignore local items
761 if ((*I)->Local == true)
762 continue;
b2e465d6 763
b98f2859
AL
764 TotalBytes += (*I)->FileSize;
765 if ((*I)->Complete == true)
766 CurrentBytes += (*I)->FileSize;
767 if ((*I)->FileSize == 0 && (*I)->Complete == false)
768 Unknown++;
769 }
770
771 // Compute the current completion
aa0e1101 772 unsigned long ResumeSize = 0;
b98f2859
AL
773 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
774 I = Owner->WorkerStep(I))
775 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
776 {
777 CurrentBytes += I->CurrentSize;
778 ResumeSize += I->ResumePoint;
779
780 // Files with unknown size always have 100% completion
781 if (I->CurrentItem->Owner->FileSize == 0 &&
782 I->CurrentItem->Owner->Complete == false)
783 TotalBytes += I->CurrentSize;
784 }
785
b98f2859
AL
786 // Normalize the figures and account for unknown size downloads
787 if (TotalBytes <= 0)
788 TotalBytes = 1;
789 if (Unknown == Count)
790 TotalBytes = Unknown;
18ef0a78
AL
791
792 // Wha?! Is not supposed to happen.
793 if (CurrentBytes > TotalBytes)
794 CurrentBytes = TotalBytes;
b98f2859
AL
795
796 // Compute the CPS
797 struct timeval NewTime;
798 gettimeofday(&NewTime,0);
2ec1674d 799 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
800 NewTime.tv_sec - Time.tv_sec > 6)
801 {
f17ac097
AL
802 double Delta = NewTime.tv_sec - Time.tv_sec +
803 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 804
b98f2859 805 // Compute the CPS value
f17ac097 806 if (Delta < 0.01)
e331f6ed
AL
807 CurrentCPS = 0;
808 else
aa0e1101
AL
809 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
810 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 811 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
812 Time = NewTime;
813 }
024d1123 814
75ef8f14
MV
815 int fd = _config->FindI("APT::Status-Fd",-1);
816 if(fd > 0)
817 {
818 ostringstream status;
819
820 char msg[200];
821 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
822 unsigned long ETA =
823 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
824
1e8b4c0f
MV
825 // only show the ETA if it makes sense
826 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 827 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 828 else
0c508b03 829 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
830
831
75ef8f14
MV
832
833 // build the status str
834 status << "dlstatus:" << i
835 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
836 << ":" << msg
837 << endl;
838 write(fd, status.str().c_str(), status.str().size());
839 }
840
024d1123 841 return true;
b98f2859
AL
842}
843 /*}}}*/
844// AcquireStatus::Start - Called when the download is started /*{{{*/
845// ---------------------------------------------------------------------
846/* We just reset the counters */
847void pkgAcquireStatus::Start()
848{
849 gettimeofday(&Time,0);
850 gettimeofday(&StartTime,0);
851 LastBytes = 0;
852 CurrentCPS = 0;
853 CurrentBytes = 0;
854 TotalBytes = 0;
855 FetchedBytes = 0;
856 ElapsedTime = 0;
d568ed2d
AL
857 TotalItems = 0;
858 CurrentItems = 0;
b98f2859
AL
859}
860 /*}}}*/
a6568219 861// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
862// ---------------------------------------------------------------------
863/* This accurately computes the elapsed time and the total overall CPS. */
864void pkgAcquireStatus::Stop()
865{
866 // Compute the CPS and elapsed time
867 struct timeval NewTime;
868 gettimeofday(&NewTime,0);
869
31a0531d
AL
870 double Delta = NewTime.tv_sec - StartTime.tv_sec +
871 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 872
b98f2859 873 // Compute the CPS value
31a0531d 874 if (Delta < 0.01)
e331f6ed
AL
875 CurrentCPS = 0;
876 else
31a0531d 877 CurrentCPS = FetchedBytes/Delta;
b98f2859 878 LastBytes = CurrentBytes;
31a0531d 879 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
880}
881 /*}}}*/
882// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
883// ---------------------------------------------------------------------
884/* This is used to get accurate final transfer rate reporting. */
885void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 886{
b98f2859
AL
887 FetchedBytes += Size - Resume;
888}
889 /*}}}*/