apt-get update works
[ntk/apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.13 1998/11/11 23:45:08 jgg Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #ifdef __GNUG__
17 #pragma implementation "apt-pkg/acquire.h"
18 #endif
19 #include <apt-pkg/acquire.h>
20 #include <apt-pkg/acquire-item.h>
21 #include <apt-pkg/acquire-worker.h>
22 #include <apt-pkg/configuration.h>
23 #include <apt-pkg/error.h>
24 #include <strutl.h>
25
26 #include <sys/time.h>
27 /*}}}*/
28
29 // Acquire::pkgAcquire - Constructor /*{{{*/
30 // ---------------------------------------------------------------------
31 /* We grab some runtime state from the configuration space */
32 pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
33 {
34 Queues = 0;
35 Configs = 0;
36 Workers = 0;
37 ToFetch = 0;
38 Running = false;
39
40 string Mode = _config->Find("Acquire::Queue-Mode","host");
41 if (strcasecmp(Mode.c_str(),"host") == 0)
42 QueueMode = QueueHost;
43 if (strcasecmp(Mode.c_str(),"access") == 0)
44 QueueMode = QueueAccess;
45
46 Debug = _config->FindB("Debug::pkgAcquire",false);
47 }
48 /*}}}*/
49 // Acquire::~pkgAcquire - Destructor /*{{{*/
50 // ---------------------------------------------------------------------
51 /* Free our memory, clean up the queues (destroy the workers) */
52 pkgAcquire::~pkgAcquire()
53 {
54 while (Items.size() != 0)
55 delete Items[0];
56
57 while (Configs != 0)
58 {
59 MethodConfig *Jnk = Configs;
60 Configs = Configs->Next;
61 delete Jnk;
62 }
63
64 while (Queues != 0)
65 {
66 Queue *Jnk = Queues;
67 Queues = Queues->Next;
68 delete Jnk;
69 }
70 }
71 /*}}}*/
72 // Acquire::Add - Add a new item /*{{{*/
73 // ---------------------------------------------------------------------
74 /* This puts an item on the acquire list. This list is mainly for tracking
75 item status */
76 void pkgAcquire::Add(Item *Itm)
77 {
78 Items.push_back(Itm);
79 }
80 /*}}}*/
81 // Acquire::Remove - Remove a item /*{{{*/
82 // ---------------------------------------------------------------------
83 /* Remove an item from the acquire list. This is usually not used.. */
84 void pkgAcquire::Remove(Item *Itm)
85 {
86 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
87 {
88 if (*I == Itm)
89 Items.erase(I);
90 }
91 }
92 /*}}}*/
93 // Acquire::Add - Add a worker /*{{{*/
94 // ---------------------------------------------------------------------
95 /* A list of workers is kept so that the select loop can direct their FD
96 usage. */
97 void pkgAcquire::Add(Worker *Work)
98 {
99 Work->NextAcquire = Workers;
100 Workers = Work;
101 }
102 /*}}}*/
103 // Acquire::Remove - Remove a worker /*{{{*/
104 // ---------------------------------------------------------------------
105 /* A worker has died. This can not be done while the select loop is running
106 as it would require that RunFds could handling a changing list state and
107 it cant.. */
108 void pkgAcquire::Remove(Worker *Work)
109 {
110 if (Running == true)
111 abort();
112
113 Worker **I = &Workers;
114 for (; *I != 0;)
115 {
116 if (*I == Work)
117 *I = (*I)->NextAcquire;
118 else
119 I = &(*I)->NextAcquire;
120 }
121 }
122 /*}}}*/
123 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
124 // ---------------------------------------------------------------------
125 /* This is the entry point for an item. An item calls this function when
126 it is construction which creates a queue (based on the current queue
127 mode) and puts the item in that queue. If the system is running then
128 the queue might be started. */
129 void pkgAcquire::Enqueue(ItemDesc &Item)
130 {
131 // Determine which queue to put the item in
132 string Name = QueueName(Item.URI);
133 if (Name.empty() == true)
134 return;
135
136 // Find the queue structure
137 Queue *I = Queues;
138 for (; I != 0 && I->Name != Name; I = I->Next);
139 if (I == 0)
140 {
141 I = new Queue(Name,this);
142 I->Next = Queues;
143 Queues = I;
144
145 if (Running == true)
146 I->Startup();
147 }
148
149 Item.Owner->Status = Item::StatIdle;
150
151 // Queue it into the named queue
152 I->Enqueue(Item);
153 ToFetch++;
154
155 // Some trace stuff
156 if (Debug == true)
157 {
158 clog << "Fetching " << Item.URI << endl;
159 clog << " to " << Item.Owner->DestFile << endl;
160 clog << " Queue is: " << QueueName(Item.URI) << endl;
161 }
162 }
163 /*}}}*/
164 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
165 // ---------------------------------------------------------------------
166 /* This is called when an item is finished being fetched. It removes it
167 from all the queues */
168 void pkgAcquire::Dequeue(Item *Itm)
169 {
170 Queue *I = Queues;
171 bool Res = false;
172 for (; I != 0; I = I->Next)
173 Res |= I->Dequeue(Itm);
174
175 if (Debug == true)
176 clog << "Dequeuing " << Itm->DestFile << endl;
177 if (Res == true)
178 ToFetch--;
179 }
180 /*}}}*/
181 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
182 // ---------------------------------------------------------------------
183 /* The string returned depends on the configuration settings and the
184 method parameters. Given something like http://foo.org/bar it can
185 return http://foo.org or http */
186 string pkgAcquire::QueueName(string Uri)
187 {
188 URI U(Uri);
189
190 const MethodConfig *Config = GetConfig(U.Access);
191 if (Config == 0)
192 return string();
193
194 /* Single-Instance methods get exactly one queue per URI. This is
195 also used for the Access queue method */
196 if (Config->SingleInstance == true || QueueMode == QueueAccess)
197 return U.Access;
198
199 return U.Access + ':' + U.Host;
200 }
201 /*}}}*/
202 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
203 // ---------------------------------------------------------------------
204 /* This locates the configuration structure for an access method. If
205 a config structure cannot be found a Worker will be created to
206 retrieve it */
207 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
208 {
209 // Search for an existing config
210 MethodConfig *Conf;
211 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
212 if (Conf->Access == Access)
213 return Conf;
214
215 // Create the new config class
216 Conf = new MethodConfig;
217 Conf->Access = Access;
218 Conf->Next = Configs;
219 Configs = Conf;
220
221 // Create the worker to fetch the configuration
222 Worker Work(Conf);
223 if (Work.Start() == false)
224 return 0;
225
226 return Conf;
227 }
228 /*}}}*/
229 // Acquire::SetFds - Deal with readable FDs /*{{{*/
230 // ---------------------------------------------------------------------
231 /* Collect FDs that have activity monitors into the fd sets */
232 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
233 {
234 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
235 {
236 if (I->InReady == true && I->InFd >= 0)
237 {
238 if (Fd < I->InFd)
239 Fd = I->InFd;
240 FD_SET(I->InFd,RSet);
241 }
242 if (I->OutReady == true && I->OutFd >= 0)
243 {
244 if (Fd < I->OutFd)
245 Fd = I->OutFd;
246 FD_SET(I->OutFd,WSet);
247 }
248 }
249 }
250 /*}}}*/
251 // Acquire::RunFds - Deal with active FDs /*{{{*/
252 // ---------------------------------------------------------------------
253 /* Dispatch active FDs over to the proper workers. It is very important
254 that a worker never be erased while this is running! The queue class
255 should never erase a worker except during shutdown processing. */
256 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
257 {
258 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
259 {
260 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
261 I->InFdReady();
262 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
263 I->OutFdReady();
264 }
265 }
266 /*}}}*/
267 // Acquire::Run - Run the fetch sequence /*{{{*/
268 // ---------------------------------------------------------------------
269 /* This runs the queues. It manages a select loop for all of the
270 Worker tasks. The workers interact with the queues and items to
271 manage the actual fetch. */
272 bool pkgAcquire::Run()
273 {
274 Running = true;
275
276 for (Queue *I = Queues; I != 0; I = I->Next)
277 I->Startup();
278
279 if (Log != 0)
280 Log->Start();
281
282 // Run till all things have been acquired
283 struct timeval tv;
284 tv.tv_sec = 0;
285 tv.tv_usec = 500000;
286 while (ToFetch > 0)
287 {
288 fd_set RFds;
289 fd_set WFds;
290 int Highest = 0;
291 FD_ZERO(&RFds);
292 FD_ZERO(&WFds);
293 SetFds(Highest,&RFds,&WFds);
294
295 int Res = select(Highest+1,&RFds,&WFds,0,&tv);
296 if (Res < 0)
297 {
298 _error->Errno("select","Select has failed");
299 break;
300 }
301
302 RunFds(&RFds,&WFds);
303 if (_error->PendingError() == true)
304 break;
305
306 // Timeout, notify the log class
307 if (Res == 0 || (Log != 0 && Log->Update == true))
308 {
309 tv.tv_usec = 500000;
310 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
311 I->Pulse();
312 if (Log != 0)
313 Log->Pulse(this);
314 }
315 }
316
317 if (Log != 0)
318 Log->Stop();
319
320 // Shut down the acquire bits
321 Running = false;
322 for (Queue *I = Queues; I != 0; I = I->Next)
323 I->Shutdown();
324
325 return !_error->PendingError();
326 }
327 /*}}}*/
328 // Acquire::Bump - Called when an item is dequeued /*{{{*/
329 // ---------------------------------------------------------------------
330 /* This routine bumps idle queues in hopes that they will be able to fetch
331 the dequeued item */
332 void pkgAcquire::Bump()
333 {
334 for (Queue *I = Queues; I != 0; I = I->Next)
335 I->Bump();
336 }
337 /*}}}*/
338 // Acquire::WorkerStep - Step to the next worker /*{{{*/
339 // ---------------------------------------------------------------------
340 /* Not inlined to advoid including acquire-worker.h */
341 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
342 {
343 return I->NextAcquire;
344 };
345 /*}}}*/
346
347 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
348 // ---------------------------------------------------------------------
349 /* */
350 pkgAcquire::MethodConfig::MethodConfig()
351 {
352 SingleInstance = false;
353 PreScan = false;
354 Pipeline = false;
355 SendConfig = false;
356 Next = 0;
357 }
358 /*}}}*/
359
360 // Queue::Queue - Constructor /*{{{*/
361 // ---------------------------------------------------------------------
362 /* */
363 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
364 Owner(Owner)
365 {
366 Items = 0;
367 Next = 0;
368 Workers = 0;
369 }
370 /*}}}*/
371 // Queue::~Queue - Destructor /*{{{*/
372 // ---------------------------------------------------------------------
373 /* */
374 pkgAcquire::Queue::~Queue()
375 {
376 Shutdown();
377
378 while (Items != 0)
379 {
380 QItem *Jnk = Items;
381 Items = Items->Next;
382 delete Jnk;
383 }
384 }
385 /*}}}*/
386 // Queue::Enqueue - Queue an item to the queue /*{{{*/
387 // ---------------------------------------------------------------------
388 /* */
389 void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
390 {
391 // Create a new item
392 QItem *I = new QItem;
393 I->Next = Items;
394 Items = I;
395 *I = Item;
396
397 Item.Owner->QueueCounter++;
398 if (Items->Next == 0)
399 Cycle();
400 }
401 /*}}}*/
402 // Queue::Dequeue - Remove an item from the queue /*{{{*/
403 // ---------------------------------------------------------------------
404 /* We return true if we hit something*/
405 bool pkgAcquire::Queue::Dequeue(Item *Owner)
406 {
407 bool Res = false;
408
409 QItem **I = &Items;
410 for (; *I != 0;)
411 {
412 if ((*I)->Owner == Owner)
413 {
414 QItem *Jnk= *I;
415 *I = (*I)->Next;
416 Owner->QueueCounter--;
417 delete Jnk;
418 Res = true;
419 }
420 else
421 I = &(*I)->Next;
422 }
423
424 return Res;
425 }
426 /*}}}*/
427 // Queue::Startup - Start the worker processes /*{{{*/
428 // ---------------------------------------------------------------------
429 /* */
430 bool pkgAcquire::Queue::Startup()
431 {
432 Shutdown();
433
434 URI U(Name);
435 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
436 if (Cnf == 0)
437 return false;
438
439 Workers = new Worker(this,Cnf,Owner->Log);
440 Owner->Add(Workers);
441 if (Workers->Start() == false)
442 return false;
443
444 return Cycle();
445 }
446 /*}}}*/
447 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
448 // ---------------------------------------------------------------------
449 /* */
450 bool pkgAcquire::Queue::Shutdown()
451 {
452 // Delete all of the workers
453 while (Workers != 0)
454 {
455 pkgAcquire::Worker *Jnk = Workers;
456 Workers = Workers->NextQueue;
457 Owner->Remove(Jnk);
458 delete Jnk;
459 }
460
461 return true;
462 }
463 /*}}}*/
464 // Queue::Finditem - Find a URI in the item list /*{{{*/
465 // ---------------------------------------------------------------------
466 /* */
467 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
468 {
469 for (QItem *I = Items; I != 0; I = I->Next)
470 if (I->URI == URI && I->Worker == Owner)
471 return I;
472 return 0;
473 }
474 /*}}}*/
475 // Queue::ItemDone - Item has been completed /*{{{*/
476 // ---------------------------------------------------------------------
477 /* The worker signals this which causes the item to be removed from the
478 queue. If this is the last queue instance then it is removed from the
479 main queue too.*/
480 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
481 {
482 if (Itm->Owner->QueueCounter <= 1)
483 Owner->Dequeue(Itm->Owner);
484 else
485 {
486 Dequeue(Itm->Owner);
487 Owner->Bump();
488 }
489
490 return Cycle();
491 }
492 /*}}}*/
493 // Queue::Cycle - Queue new items into the method /*{{{*/
494 // ---------------------------------------------------------------------
495 /* This locates a new idle item and sends it to the worker */
496 bool pkgAcquire::Queue::Cycle()
497 {
498 if (Items == 0 || Workers == 0)
499 return true;
500
501 // Look for a queable item
502 QItem *I = Items;
503 for (; I != 0; I = I->Next)
504 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
505 break;
506
507 // Nothing to do, queue is idle.
508 if (I == 0)
509 return true;
510
511 I->Worker = Workers;
512 I->Owner->Status = pkgAcquire::Item::StatFetching;
513 return Workers->QueueItem(I);
514 }
515 /*}}}*/
516 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
517 // ---------------------------------------------------------------------
518 /* */
519 void pkgAcquire::Queue::Bump()
520 {
521 }
522 /*}}}*/
523
524 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
525 // ---------------------------------------------------------------------
526 /* */
527 pkgAcquireStatus::pkgAcquireStatus()
528 {
529 Start();
530 }
531 /*}}}*/
532 // AcquireStatus::Pulse - Called periodically /*{{{*/
533 // ---------------------------------------------------------------------
534 /* This computes some internal state variables for the derived classes to
535 use. It generates the current downloaded bytes and total bytes to download
536 as well as the current CPS estimate. */
537 void pkgAcquireStatus::Pulse(pkgAcquire *Owner)
538 {
539 TotalBytes = 0;
540 CurrentBytes = 0;
541
542 // Compute the total number of bytes to fetch
543 unsigned int Unknown = 0;
544 unsigned int Count = 0;
545 for (pkgAcquire::Item **I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
546 I++, Count++)
547 {
548 TotalBytes += (*I)->FileSize;
549 if ((*I)->Complete == true)
550 CurrentBytes += (*I)->FileSize;
551 if ((*I)->FileSize == 0 && (*I)->Complete == false)
552 Unknown++;
553 }
554
555 // Compute the current completion
556 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
557 I = Owner->WorkerStep(I))
558 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
559 CurrentBytes += I->CurrentSize;
560
561 // Normalize the figures and account for unknown size downloads
562 if (TotalBytes <= 0)
563 TotalBytes = 1;
564 if (Unknown == Count)
565 TotalBytes = Unknown;
566 else
567 TotalBytes += TotalBytes/(Count - Unknown)*Unknown;
568
569 // Compute the CPS
570 struct timeval NewTime;
571 gettimeofday(&NewTime,0);
572 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
573 NewTime.tv_sec - Time.tv_sec > 6)
574 {
575 // Compute the delta time with full accuracy
576 long usdiff = NewTime.tv_usec - Time.tv_usec;
577 long sdiff = NewTime.tv_sec - Time.tv_sec;
578
579 // Borrow
580 if (usdiff < 0)
581 {
582 usdiff += 1000000;
583 sdiff--;
584 }
585
586 // Compute the CPS value
587 CurrentCPS = (CurrentBytes - LastBytes)/(sdiff + usdiff/1000000.0);
588 LastBytes = CurrentBytes;
589 ElapsedTime = NewTime.tv_sec - StartTime.tv_sec;
590 Time = NewTime;
591 }
592 }
593 /*}}}*/
594 // AcquireStatus::Start - Called when the download is started /*{{{*/
595 // ---------------------------------------------------------------------
596 /* We just reset the counters */
597 void pkgAcquireStatus::Start()
598 {
599 gettimeofday(&Time,0);
600 gettimeofday(&StartTime,0);
601 LastBytes = 0;
602 CurrentCPS = 0;
603 CurrentBytes = 0;
604 TotalBytes = 0;
605 FetchedBytes = 0;
606 ElapsedTime = 0;
607 }
608 /*}}}*/
609 // pkgAcquireStatus::Stop - Finished downloading /*{{{*/
610 // ---------------------------------------------------------------------
611 /* This accurately computes the elapsed time and the total overall CPS. */
612 void pkgAcquireStatus::Stop()
613 {
614 // Compute the CPS and elapsed time
615 struct timeval NewTime;
616 gettimeofday(&NewTime,0);
617
618 // Compute the delta time with full accuracy
619 long usdiff = NewTime.tv_usec - StartTime.tv_usec;
620 long sdiff = NewTime.tv_sec - StartTime.tv_sec;
621
622 // Borrow
623 if (usdiff < 0)
624 {
625 usdiff += 1000000;
626 sdiff--;
627 }
628
629 // Compute the CPS value
630 CurrentCPS = FetchedBytes/(sdiff + usdiff/1000000.0);
631 LastBytes = CurrentBytes;
632 ElapsedTime = sdiff;
633 }
634 /*}}}*/
635 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
636 // ---------------------------------------------------------------------
637 /* This is used to get accurate final transfer rate reporting. */
638 void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
639 {
640 FetchedBytes += Size - Resume;
641 }
642 /*}}}*/