Stable acquire code
[ntk/apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.5 1998/10/26 07:11:47 jgg Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #ifdef __GNUG__
17 #pragma implementation "apt-pkg/acquire.h"
18 #endif
19 #include <apt-pkg/acquire.h>
20 #include <apt-pkg/acquire-item.h>
21 #include <apt-pkg/acquire-worker.h>
22 #include <apt-pkg/configuration.h>
23 #include <apt-pkg/error.h>
24 #include <strutl.h>
25 /*}}}*/
26
27 // Acquire::pkgAcquire - Constructor /*{{{*/
28 // ---------------------------------------------------------------------
29 /* */
30 pkgAcquire::pkgAcquire()
31 {
32 Queues = 0;
33 Configs = 0;
34 Workers = 0;
35 ToFetch = 0;
36 Running = false;
37
38 string Mode = _config->Find("Acquire::Queue-Mode","host");
39 if (strcasecmp(Mode.c_str(),"host") == 0)
40 QueueMode = QueueHost;
41 if (strcasecmp(Mode.c_str(),"access") == 0)
42 QueueMode = QueueAccess;
43
44 Debug = _config->FindB("Debug::pkgAcquire",false);
45 }
46 /*}}}*/
47 // Acquire::~pkgAcquire - Destructor /*{{{*/
48 // ---------------------------------------------------------------------
49 /* Free our memory */
50 pkgAcquire::~pkgAcquire()
51 {
52 while (Items.size() != 0)
53 delete Items[0];
54
55 while (Configs != 0)
56 {
57 MethodConfig *Jnk = Configs;
58 Configs = Configs->Next;
59 delete Jnk;
60 }
61
62 while (Queues != 0)
63 {
64 Queue *Jnk = Queues;
65 Queues = Queues->Next;
66 delete Jnk;
67 }
68 }
69 /*}}}*/
70 // Acquire::Add - Add a new item /*{{{*/
71 // ---------------------------------------------------------------------
72 /* */
73 void pkgAcquire::Add(Item *Itm)
74 {
75 Items.push_back(Itm);
76 }
77 /*}}}*/
78 // Acquire::Remove - Remove a item /*{{{*/
79 // ---------------------------------------------------------------------
80 /* */
81 void pkgAcquire::Remove(Item *Itm)
82 {
83 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
84 {
85 if (*I == Itm)
86 Items.erase(I);
87 }
88 }
89 /*}}}*/
90 // Acquire::Add - Add a worker /*{{{*/
91 // ---------------------------------------------------------------------
92 /* */
93 void pkgAcquire::Add(Worker *Work)
94 {
95 Work->NextAcquire = Workers;
96 Workers = Work;
97 }
98 /*}}}*/
99 // Acquire::Remove - Remove a worker /*{{{*/
100 // ---------------------------------------------------------------------
101 /* */
102 void pkgAcquire::Remove(Worker *Work)
103 {
104 Worker **I = &Workers;
105 for (; *I != 0;)
106 {
107 if (*I == Work)
108 *I = (*I)->NextAcquire;
109 else
110 I = &(*I)->NextAcquire;
111 }
112 }
113 /*}}}*/
114 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
115 // ---------------------------------------------------------------------
116 /* */
117 void pkgAcquire::Enqueue(Item *Itm,string URI,string Description)
118 {
119 // Determine which queue to put the item in
120 string Name = QueueName(URI);
121 if (Name.empty() == true)
122 return;
123
124 // Find the queue structure
125 Queue *I = Queues;
126 for (; I != 0 && I->Name != Name; I = I->Next);
127 if (I == 0)
128 {
129 I = new Queue(Name,this);
130 I->Next = Queues;
131 Queues = I;
132 }
133
134 // Queue it into the named queue
135 I->Enqueue(Itm,URI,Description);
136 ToFetch++;
137
138 if (Running == true)
139 I->Startup();
140
141 // Some trace stuff
142 if (Debug == true)
143 {
144 clog << "Fetching " << URI << endl;
145 clog << " to " << Itm->DestFile << endl;
146 clog << " Queue is: " << QueueName(URI) << endl;
147 }
148 }
149 /*}}}*/
150 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
151 // ---------------------------------------------------------------------
152 /* */
153 void pkgAcquire::Dequeue(Item *Itm)
154 {
155 Queue *I = Queues;
156 for (; I != 0; I = I->Next)
157 I->Dequeue(Itm);
158 ToFetch--;
159 }
160 /*}}}*/
161 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
162 // ---------------------------------------------------------------------
163 /* The string returned depends on the configuration settings and the
164 method parameters. Given something like http://foo.org/bar it can
165 return http://foo.org or http */
166 string pkgAcquire::QueueName(string URI)
167 {
168 const MethodConfig *Config = GetConfig(URIAccess(URI));
169 if (Config == 0)
170 return string();
171
172 /* Single-Instance methods get exactly one queue per URI. This is
173 also used for the Access queue method */
174 if (Config->SingleInstance == true || QueueMode == QueueAccess)
175 return URIAccess(URI);
176
177 // Host based queue
178 string::iterator I = URI.begin();
179 for (; I < URI.end() && *I != ':'; I++);
180 for (; I < URI.end() && (*I == '/' || *I == ':'); I++);
181 for (; I < URI.end() && *I != '/'; I++);
182
183 return string(URI,0,I - URI.begin());
184 }
185 /*}}}*/
186 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
187 // ---------------------------------------------------------------------
188 /* This locates the configuration structure for an access method. If
189 a config structure cannot be found a Worker will be created to
190 retrieve it */
191 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
192 {
193 // Search for an existing config
194 MethodConfig *Conf;
195 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
196 if (Conf->Access == Access)
197 return Conf;
198
199 // Create the new config class
200 Conf = new MethodConfig;
201 Conf->Access = Access;
202 Conf->Next = Configs;
203 Configs = Conf;
204
205 // Create the worker to fetch the configuration
206 Worker Work(Conf);
207 if (Work.Start() == false)
208 return 0;
209
210 return Conf;
211 }
212 /*}}}*/
213 // Acquire::SetFds - Deal with readable FDs /*{{{*/
214 // ---------------------------------------------------------------------
215 /* Collect FDs that have activity monitors into the fd sets */
216 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
217 {
218 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
219 {
220 if (I->InReady == true && I->InFd >= 0)
221 {
222 if (Fd < I->InFd)
223 Fd = I->InFd;
224 FD_SET(I->InFd,RSet);
225 }
226 if (I->OutReady == true && I->OutFd >= 0)
227 {
228 if (Fd < I->OutFd)
229 Fd = I->OutFd;
230 FD_SET(I->OutFd,WSet);
231 }
232 }
233 }
234 /*}}}*/
235 // Acquire::RunFds - Deal with active FDs /*{{{*/
236 // ---------------------------------------------------------------------
237 /* Dispatch active FDs over to the proper workers */
238 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
239 {
240 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
241 {
242 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
243 I->InFdReady();
244 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
245 I->OutFdReady();
246 }
247 }
248 /*}}}*/
249 // Acquire::Run - Run the fetch sequence /*{{{*/
250 // ---------------------------------------------------------------------
251 /* This runs the queues. It manages a select loop for all of the
252 Worker tasks. The workers interact with the queues and items to
253 manage the actual fetch. */
254 bool pkgAcquire::Run()
255 {
256 Running = true;
257
258 for (Queue *I = Queues; I != 0; I = I->Next)
259 I->Startup();
260
261 // Run till all things have been acquired
262 while (ToFetch > 0)
263 {
264 fd_set RFds;
265 fd_set WFds;
266 int Highest = 0;
267 FD_ZERO(&RFds);
268 FD_ZERO(&WFds);
269 SetFds(Highest,&RFds,&WFds);
270
271 if (select(Highest+1,&RFds,&WFds,0,0) <= 0)
272 {
273 Running = false;
274 return _error->Errno("select","Select has failed");
275 }
276
277 RunFds(&RFds,&WFds);
278 }
279
280 for (Queue *I = Queues; I != 0; I = I->Next)
281 I->Shutdown();
282
283 Running = false;
284 return true;
285 }
286 /*}}}*/
287
288 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
289 // ---------------------------------------------------------------------
290 /* */
291 pkgAcquire::MethodConfig::MethodConfig()
292 {
293 SingleInstance = false;
294 PreScan = false;
295 Pipeline = false;
296 SendConfig = false;
297 Next = 0;
298 }
299 /*}}}*/
300
301 // Queue::Queue - Constructor /*{{{*/
302 // ---------------------------------------------------------------------
303 /* */
304 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
305 Owner(Owner)
306 {
307 Items = 0;
308 Next = 0;
309 Workers = 0;
310 }
311 /*}}}*/
312 // Queue::~Queue - Destructor /*{{{*/
313 // ---------------------------------------------------------------------
314 /* */
315 pkgAcquire::Queue::~Queue()
316 {
317 Shutdown();
318
319 while (Items != 0)
320 {
321 QItem *Jnk = Items;
322 Items = Items->Next;
323 delete Jnk;
324 }
325 }
326 /*}}}*/
327 // Queue::Enqueue - Queue an item to the queue /*{{{*/
328 // ---------------------------------------------------------------------
329 /* */
330 void pkgAcquire::Queue::Enqueue(Item *Owner,string URI,string Description)
331 {
332 // Create a new item
333 QItem *I = new QItem;
334 I->Next = Items;
335 Items = I;
336
337 // Fill it in
338 Items->Owner = Owner;
339 Items->URI = URI;
340 Items->Description = Description;
341 Owner->QueueCounter++;
342 }
343 /*}}}*/
344 // Queue::Dequeue - Remove an item from the queue /*{{{*/
345 // ---------------------------------------------------------------------
346 /* */
347 void pkgAcquire::Queue::Dequeue(Item *Owner)
348 {
349 QItem **I = &Items;
350 for (; *I != 0;)
351 {
352 if ((*I)->Owner == Owner)
353 {
354 QItem *Jnk= *I;
355 *I = (*I)->Next;
356 Owner->QueueCounter--;
357 delete Jnk;
358 }
359 else
360 I = &(*I)->Next;
361 }
362 }
363 /*}}}*/
364 // Queue::Startup - Start the worker processes /*{{{*/
365 // ---------------------------------------------------------------------
366 /* */
367 bool pkgAcquire::Queue::Startup()
368 {
369 Shutdown();
370
371 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(URIAccess(Name));
372 if (Cnf == 0)
373 return false;
374
375 Workers = new Worker(this,Cnf);
376 Owner->Add(Workers);
377 if (Workers->Start() == false)
378 return false;
379
380 Items->Worker = Workers;
381 Workers->QueueItem(Items);
382
383 return true;
384 }
385 /*}}}*/
386 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
387 // ---------------------------------------------------------------------
388 /* */
389 bool pkgAcquire::Queue::Shutdown()
390 {
391 // Delete all of the workers
392 while (Workers != 0)
393 {
394 pkgAcquire::Worker *Jnk = Workers;
395 Workers = Workers->NextQueue;
396 Owner->Remove(Jnk);
397 delete Jnk;
398 }
399
400 return true;
401 }
402 /*}}}*/
403 // Queue::Finditem - Find a URI in the item list /*{{{*/
404 // ---------------------------------------------------------------------
405 /* */
406 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
407 {
408 for (QItem *I = Items; I != 0; I = I->Next)
409 if (I->URI == URI && I->Worker == Owner)
410 return I;
411 return 0;
412 }
413 /*}}}*/
414 // Queue::ItemDone - Item has been completed /*{{{*/
415 // ---------------------------------------------------------------------
416 /* The worker signals this which causes the item to be removed from the
417 queue. */
418 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
419 {
420 Dequeue(Itm->Owner);
421
422 if (Items == 0)
423 return true;
424
425 Items->Worker = Workers;
426 Items->Owner->Status = pkgAcquire::Item::StatFetching;
427 return Workers->QueueItem(Items);
428 }
429 /*}}}*/