More or less working acquire system
[ntk/apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.10 1998/11/09 01:09:25 jgg Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #ifdef __GNUG__
17 #pragma implementation "apt-pkg/acquire.h"
18 #endif
19 #include <apt-pkg/acquire.h>
20 #include <apt-pkg/acquire-item.h>
21 #include <apt-pkg/acquire-worker.h>
22 #include <apt-pkg/configuration.h>
23 #include <apt-pkg/error.h>
24 #include <strutl.h>
25
26 #include <sys/time.h>
27 /*}}}*/
28
29 // Acquire::pkgAcquire - Constructor /*{{{*/
30 // ---------------------------------------------------------------------
31 /* We grab some runtime state from the configuration space */
32 pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
33 {
34 Queues = 0;
35 Configs = 0;
36 Workers = 0;
37 ToFetch = 0;
38 Running = false;
39
40 string Mode = _config->Find("Acquire::Queue-Mode","host");
41 if (strcasecmp(Mode.c_str(),"host") == 0)
42 QueueMode = QueueHost;
43 if (strcasecmp(Mode.c_str(),"access") == 0)
44 QueueMode = QueueAccess;
45
46 Debug = _config->FindB("Debug::pkgAcquire",false);
47 }
48 /*}}}*/
49 // Acquire::~pkgAcquire - Destructor /*{{{*/
50 // ---------------------------------------------------------------------
51 /* Free our memory, clean up the queues (destroy the workers) */
52 pkgAcquire::~pkgAcquire()
53 {
54 while (Items.size() != 0)
55 delete Items[0];
56
57 while (Configs != 0)
58 {
59 MethodConfig *Jnk = Configs;
60 Configs = Configs->Next;
61 delete Jnk;
62 }
63
64 while (Queues != 0)
65 {
66 Queue *Jnk = Queues;
67 Queues = Queues->Next;
68 delete Jnk;
69 }
70 }
71 /*}}}*/
72 // Acquire::Add - Add a new item /*{{{*/
73 // ---------------------------------------------------------------------
74 /* This puts an item on the acquire list. This list is mainly for tracking
75 item status */
76 void pkgAcquire::Add(Item *Itm)
77 {
78 Items.push_back(Itm);
79 }
80 /*}}}*/
81 // Acquire::Remove - Remove a item /*{{{*/
82 // ---------------------------------------------------------------------
83 /* Remove an item from the acquire list. This is usually not used.. */
84 void pkgAcquire::Remove(Item *Itm)
85 {
86 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
87 {
88 if (*I == Itm)
89 Items.erase(I);
90 }
91 }
92 /*}}}*/
93 // Acquire::Add - Add a worker /*{{{*/
94 // ---------------------------------------------------------------------
95 /* A list of workers is kept so that the select loop can direct their FD
96 usage. */
97 void pkgAcquire::Add(Worker *Work)
98 {
99 Work->NextAcquire = Workers;
100 Workers = Work;
101 }
102 /*}}}*/
103 // Acquire::Remove - Remove a worker /*{{{*/
104 // ---------------------------------------------------------------------
105 /* A worker has died. This can not be done while the select loop is running
106 as it would require that RunFds could handling a changing list state and
107 it cant.. */
108 void pkgAcquire::Remove(Worker *Work)
109 {
110 if (Running == true)
111 abort();
112
113 Worker **I = &Workers;
114 for (; *I != 0;)
115 {
116 if (*I == Work)
117 *I = (*I)->NextAcquire;
118 else
119 I = &(*I)->NextAcquire;
120 }
121 }
122 /*}}}*/
123 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
124 // ---------------------------------------------------------------------
125 /* This is the entry point for an item. An item calls this function when
126 it is construction which creates a queue (based on the current queue
127 mode) and puts the item in that queue. If the system is running then
128 the queue might be started. */
129 void pkgAcquire::Enqueue(ItemDesc &Item)
130 {
131 // Determine which queue to put the item in
132 string Name = QueueName(Item.URI);
133 if (Name.empty() == true)
134 return;
135
136 // Find the queue structure
137 Queue *I = Queues;
138 for (; I != 0 && I->Name != Name; I = I->Next);
139 if (I == 0)
140 {
141 I = new Queue(Name,this);
142 I->Next = Queues;
143 Queues = I;
144
145 if (Running == true)
146 I->Startup();
147 }
148
149 Item.Owner->Status = Item::StatIdle;
150
151 // Queue it into the named queue
152 I->Enqueue(Item);
153 ToFetch++;
154
155 // Some trace stuff
156 if (Debug == true)
157 {
158 clog << "Fetching " << Item.URI << endl;
159 clog << " to " << Item.Owner->DestFile << endl;
160 clog << " Queue is: " << QueueName(Item.URI) << endl;
161 }
162 }
163 /*}}}*/
164 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
165 // ---------------------------------------------------------------------
166 /* This is called when an item is finished being fetched. It removes it
167 from all the queues */
168 void pkgAcquire::Dequeue(Item *Itm)
169 {
170 Queue *I = Queues;
171 bool Res = false;
172 for (; I != 0; I = I->Next)
173 Res |= I->Dequeue(Itm);
174
175 if (Debug == true)
176 clog << "Dequeuing " << Itm->DestFile << endl;
177 if (Res == true)
178 ToFetch--;
179 }
180 /*}}}*/
181 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
182 // ---------------------------------------------------------------------
183 /* The string returned depends on the configuration settings and the
184 method parameters. Given something like http://foo.org/bar it can
185 return http://foo.org or http */
186 string pkgAcquire::QueueName(string Uri)
187 {
188 URI U(Uri);
189
190 const MethodConfig *Config = GetConfig(U.Access);
191 if (Config == 0)
192 return string();
193
194 /* Single-Instance methods get exactly one queue per URI. This is
195 also used for the Access queue method */
196 if (Config->SingleInstance == true || QueueMode == QueueAccess)
197 return U.Access;
198
199 return U.Access + ':' + U.Host;
200 }
201 /*}}}*/
202 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
203 // ---------------------------------------------------------------------
204 /* This locates the configuration structure for an access method. If
205 a config structure cannot be found a Worker will be created to
206 retrieve it */
207 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
208 {
209 // Search for an existing config
210 MethodConfig *Conf;
211 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
212 if (Conf->Access == Access)
213 return Conf;
214
215 // Create the new config class
216 Conf = new MethodConfig;
217 Conf->Access = Access;
218 Conf->Next = Configs;
219 Configs = Conf;
220
221 // Create the worker to fetch the configuration
222 Worker Work(Conf);
223 if (Work.Start() == false)
224 return 0;
225
226 return Conf;
227 }
228 /*}}}*/
229 // Acquire::SetFds - Deal with readable FDs /*{{{*/
230 // ---------------------------------------------------------------------
231 /* Collect FDs that have activity monitors into the fd sets */
232 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
233 {
234 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
235 {
236 if (I->InReady == true && I->InFd >= 0)
237 {
238 if (Fd < I->InFd)
239 Fd = I->InFd;
240 FD_SET(I->InFd,RSet);
241 }
242 if (I->OutReady == true && I->OutFd >= 0)
243 {
244 if (Fd < I->OutFd)
245 Fd = I->OutFd;
246 FD_SET(I->OutFd,WSet);
247 }
248 }
249 }
250 /*}}}*/
251 // Acquire::RunFds - Deal with active FDs /*{{{*/
252 // ---------------------------------------------------------------------
253 /* Dispatch active FDs over to the proper workers. It is very important
254 that a worker never be erased while this is running! The queue class
255 should never erase a worker except during shutdown processing. */
256 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
257 {
258 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
259 {
260 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
261 I->InFdReady();
262 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
263 I->OutFdReady();
264 }
265 }
266 /*}}}*/
267 // Acquire::Run - Run the fetch sequence /*{{{*/
268 // ---------------------------------------------------------------------
269 /* This runs the queues. It manages a select loop for all of the
270 Worker tasks. The workers interact with the queues and items to
271 manage the actual fetch. */
272 bool pkgAcquire::Run()
273 {
274 Running = true;
275
276 for (Queue *I = Queues; I != 0; I = I->Next)
277 I->Startup();
278
279 // Run till all things have been acquired
280 struct timeval tv;
281 tv.tv_sec = 0;
282 tv.tv_usec = 500000;
283 while (ToFetch > 0)
284 {
285 fd_set RFds;
286 fd_set WFds;
287 int Highest = 0;
288 FD_ZERO(&RFds);
289 FD_ZERO(&WFds);
290 SetFds(Highest,&RFds,&WFds);
291
292 int Res = select(Highest+1,&RFds,&WFds,0,&tv);
293 if (Res < 0)
294 {
295 _error->Errno("select","Select has failed");
296 break;
297 }
298
299 RunFds(&RFds,&WFds);
300 if (_error->PendingError() == true)
301 break;
302
303 // Timeout, notify the log class
304 if (Res == 0 || (Log != 0 && Log->Update == true))
305 {
306 tv.tv_usec = 500000;
307 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
308 I->Pulse();
309 if (Log != 0)
310 Log->Pulse(this);
311 }
312 }
313
314 // Shut down the acquire bits
315 Running = false;
316 for (Queue *I = Queues; I != 0; I = I->Next)
317 I->Shutdown();
318
319 return _error->PendingError();
320 }
321 /*}}}*/
322 // Acquire::Bump - Called when an item is dequeued /*{{{*/
323 // ---------------------------------------------------------------------
324 /* This routine bumps idle queues in hopes that they will be able to fetch
325 the dequeued item */
326 void pkgAcquire::Bump()
327 {
328 for (Queue *I = Queues; I != 0; I = I->Next)
329 I->Bump();
330 }
331 /*}}}*/
332 // Acquire::WorkerStep - Step to the next worker /*{{{*/
333 // ---------------------------------------------------------------------
334 /* Not inlined to advoid including acquire-worker.h */
335 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
336 {
337 return I->NextAcquire;
338 };
339 /*}}}*/
340
341 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
342 // ---------------------------------------------------------------------
343 /* */
344 pkgAcquire::MethodConfig::MethodConfig()
345 {
346 SingleInstance = false;
347 PreScan = false;
348 Pipeline = false;
349 SendConfig = false;
350 Next = 0;
351 }
352 /*}}}*/
353
354 // Queue::Queue - Constructor /*{{{*/
355 // ---------------------------------------------------------------------
356 /* */
357 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
358 Owner(Owner)
359 {
360 Items = 0;
361 Next = 0;
362 Workers = 0;
363 }
364 /*}}}*/
365 // Queue::~Queue - Destructor /*{{{*/
366 // ---------------------------------------------------------------------
367 /* */
368 pkgAcquire::Queue::~Queue()
369 {
370 Shutdown();
371
372 while (Items != 0)
373 {
374 QItem *Jnk = Items;
375 Items = Items->Next;
376 delete Jnk;
377 }
378 }
379 /*}}}*/
380 // Queue::Enqueue - Queue an item to the queue /*{{{*/
381 // ---------------------------------------------------------------------
382 /* */
383 void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
384 {
385 // Create a new item
386 QItem *I = new QItem;
387 I->Next = Items;
388 Items = I;
389 *I = Item;
390
391 Item.Owner->QueueCounter++;
392 if (Items->Next == 0)
393 Cycle();
394 }
395 /*}}}*/
396 // Queue::Dequeue - Remove an item from the queue /*{{{*/
397 // ---------------------------------------------------------------------
398 /* We return true if we hit something*/
399 bool pkgAcquire::Queue::Dequeue(Item *Owner)
400 {
401 bool Res = false;
402
403 QItem **I = &Items;
404 for (; *I != 0;)
405 {
406 if ((*I)->Owner == Owner)
407 {
408 QItem *Jnk= *I;
409 *I = (*I)->Next;
410 Owner->QueueCounter--;
411 delete Jnk;
412 Res = true;
413 }
414 else
415 I = &(*I)->Next;
416 }
417
418 return Res;
419 }
420 /*}}}*/
421 // Queue::Startup - Start the worker processes /*{{{*/
422 // ---------------------------------------------------------------------
423 /* */
424 bool pkgAcquire::Queue::Startup()
425 {
426 Shutdown();
427
428 URI U(Name);
429 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
430 if (Cnf == 0)
431 return false;
432
433 Workers = new Worker(this,Cnf,Owner->Log);
434 Owner->Add(Workers);
435 if (Workers->Start() == false)
436 return false;
437
438 return Cycle();
439 }
440 /*}}}*/
441 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
442 // ---------------------------------------------------------------------
443 /* */
444 bool pkgAcquire::Queue::Shutdown()
445 {
446 // Delete all of the workers
447 while (Workers != 0)
448 {
449 pkgAcquire::Worker *Jnk = Workers;
450 Workers = Workers->NextQueue;
451 Owner->Remove(Jnk);
452 delete Jnk;
453 }
454
455 return true;
456 }
457 /*}}}*/
458 // Queue::Finditem - Find a URI in the item list /*{{{*/
459 // ---------------------------------------------------------------------
460 /* */
461 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
462 {
463 for (QItem *I = Items; I != 0; I = I->Next)
464 if (I->URI == URI && I->Worker == Owner)
465 return I;
466 return 0;
467 }
468 /*}}}*/
469 // Queue::ItemDone - Item has been completed /*{{{*/
470 // ---------------------------------------------------------------------
471 /* The worker signals this which causes the item to be removed from the
472 queue. If this is the last queue instance then it is removed from the
473 main queue too.*/
474 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
475 {
476 if (Itm->Owner->QueueCounter <= 1)
477 Owner->Dequeue(Itm->Owner);
478 else
479 {
480 Dequeue(Itm->Owner);
481 Owner->Bump();
482 }
483
484 return Cycle();
485 }
486 /*}}}*/
487 // Queue::Cycle - Queue new items into the method /*{{{*/
488 // ---------------------------------------------------------------------
489 /* This locates a new idle item and sends it to the worker */
490 bool pkgAcquire::Queue::Cycle()
491 {
492 if (Items == 0 || Workers == 0)
493 return true;
494
495 // Look for a queable item
496 QItem *I = Items;
497 for (; I != 0; I = I->Next)
498 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
499 break;
500
501 // Nothing to do, queue is idle.
502 if (I == 0)
503 return true;
504
505 I->Worker = Workers;
506 I->Owner->Status = pkgAcquire::Item::StatFetching;
507 return Workers->QueueItem(I);
508 }
509 /*}}}*/
510 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
511 // ---------------------------------------------------------------------
512 /* */
513 void pkgAcquire::Queue::Bump()
514 {
515 }
516 /*}}}*/