Sync
[ntk/apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.4 1998/10/24 04:58:01 jgg Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #ifdef __GNUG__
17 #pragma implementation "apt-pkg/acquire.h"
18 #endif
19 #include <apt-pkg/acquire.h>
20 #include <apt-pkg/acquire-item.h>
21 #include <apt-pkg/acquire-worker.h>
22 #include <apt-pkg/configuration.h>
23 #include <apt-pkg/error.h>
24 #include <strutl.h>
25 /*}}}*/
26
27 // Acquire::pkgAcquire - Constructor /*{{{*/
28 // ---------------------------------------------------------------------
29 /* */
30 pkgAcquire::pkgAcquire()
31 {
32 Queues = 0;
33 Configs = 0;
34 Workers = 0;
35 ToFetch = 0;
36
37 string Mode = _config->Find("Acquire::Queue-Mode","host");
38 if (strcasecmp(Mode.c_str(),"host") == 0)
39 QueueMode = QueueHost;
40 if (strcasecmp(Mode.c_str(),"access") == 0)
41 QueueMode = QueueAccess;
42
43 Debug = _config->FindB("Debug::pkgAcquire",false);
44 }
45 /*}}}*/
46 // Acquire::~pkgAcquire - Destructor /*{{{*/
47 // ---------------------------------------------------------------------
48 /* Free our memory */
49 pkgAcquire::~pkgAcquire()
50 {
51 while (Items.size() != 0)
52 delete Items[0];
53
54 while (Configs != 0)
55 {
56 MethodConfig *Jnk = Configs;
57 Configs = Configs->Next;
58 delete Jnk;
59 }
60
61 while (Queues != 0)
62 {
63 Queue *Jnk = Queues;
64 Queues = Queues->Next;
65 delete Jnk;
66 }
67 }
68 /*}}}*/
69 // Acquire::Add - Add a new item /*{{{*/
70 // ---------------------------------------------------------------------
71 /* */
72 void pkgAcquire::Add(Item *Itm)
73 {
74 Items.push_back(Itm);
75 }
76 /*}}}*/
77 // Acquire::Remove - Remove a item /*{{{*/
78 // ---------------------------------------------------------------------
79 /* */
80 void pkgAcquire::Remove(Item *Itm)
81 {
82 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
83 {
84 if (*I == Itm)
85 Items.erase(I);
86 }
87 }
88 /*}}}*/
89 // Acquire::Add - Add a worker /*{{{*/
90 // ---------------------------------------------------------------------
91 /* */
92 void pkgAcquire::Add(Worker *Work)
93 {
94 Work->NextAcquire = Workers;
95 Workers = Work;
96 }
97 /*}}}*/
98 // Acquire::Remove - Remove a worker /*{{{*/
99 // ---------------------------------------------------------------------
100 /* */
101 void pkgAcquire::Remove(Worker *Work)
102 {
103 Worker **I = &Workers;
104 for (; *I != 0;)
105 {
106 if (*I == Work)
107 *I = (*I)->NextAcquire;
108 else
109 I = &(*I)->NextAcquire;
110 }
111 }
112 /*}}}*/
113 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
114 // ---------------------------------------------------------------------
115 /* */
116 void pkgAcquire::Enqueue(Item *Itm,string URI,string Description)
117 {
118 // Determine which queue to put the item in
119 string Name = QueueName(URI);
120 if (Name.empty() == true)
121 return;
122
123 // Find the queue structure
124 Queue *I = Queues;
125 for (; I != 0 && I->Name != Name; I = I->Next);
126 if (I == 0)
127 {
128 I = new Queue(Name,this);
129 I->Next = Queues;
130 Queues = I;
131 }
132
133 // Queue it into the named queue
134 I->Enqueue(Itm,URI,Description);
135 ToFetch++;
136
137 // Some trace stuff
138 if (Debug == true)
139 {
140 clog << "Fetching " << URI << endl;
141 clog << " to " << Itm->DestFile << endl;
142 clog << " Queue is: " << QueueName(URI) << endl;
143 }
144 }
145 /*}}}*/
146 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
147 // ---------------------------------------------------------------------
148 /* */
149 void pkgAcquire::Dequeue(Item *Itm)
150 {
151 Queue *I = Queues;
152 for (; I != 0; I = I->Next)
153 I->Dequeue(Itm);
154 ToFetch--;
155 }
156 /*}}}*/
157 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
158 // ---------------------------------------------------------------------
159 /* The string returned depends on the configuration settings and the
160 method parameters. Given something like http://foo.org/bar it can
161 return http://foo.org or http */
162 string pkgAcquire::QueueName(string URI)
163 {
164 const MethodConfig *Config = GetConfig(URIAccess(URI));
165 if (Config == 0)
166 return string();
167
168 /* Single-Instance methods get exactly one queue per URI. This is
169 also used for the Access queue method */
170 if (Config->SingleInstance == true || QueueMode == QueueAccess)
171 return URIAccess(URI);
172
173 // Host based queue
174 string::iterator I = URI.begin();
175 for (; I < URI.end() && *I != ':'; I++);
176 for (; I < URI.end() && (*I == '/' || *I == ':'); I++);
177 for (; I < URI.end() && *I != '/'; I++);
178
179 return string(URI,0,I - URI.begin());
180 }
181 /*}}}*/
182 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
183 // ---------------------------------------------------------------------
184 /* This locates the configuration structure for an access method. If
185 a config structure cannot be found a Worker will be created to
186 retrieve it */
187 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
188 {
189 // Search for an existing config
190 MethodConfig *Conf;
191 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
192 if (Conf->Access == Access)
193 return Conf;
194
195 // Create the new config class
196 Conf = new MethodConfig;
197 Conf->Access = Access;
198 Conf->Next = Configs;
199 Configs = Conf;
200
201 // Create the worker to fetch the configuration
202 Worker Work(Conf);
203 if (Work.Start() == false)
204 return 0;
205
206 return Conf;
207 }
208 /*}}}*/
209 // Acquire::SetFds - Deal with readable FDs /*{{{*/
210 // ---------------------------------------------------------------------
211 /* Collect FDs that have activity monitors into the fd sets */
212 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
213 {
214 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
215 {
216 if (I->InReady == true && I->InFd >= 0)
217 {
218 if (Fd < I->InFd)
219 Fd = I->InFd;
220 FD_SET(I->InFd,RSet);
221 }
222 if (I->OutReady == true && I->OutFd >= 0)
223 {
224 if (Fd < I->OutFd)
225 Fd = I->OutFd;
226 FD_SET(I->OutFd,WSet);
227 }
228 }
229 }
230 /*}}}*/
231 // Acquire::RunFds - Deal with active FDs /*{{{*/
232 // ---------------------------------------------------------------------
233 /* Dispatch active FDs over to the proper workers */
234 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
235 {
236 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
237 {
238 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
239 I->InFdReady();
240 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
241 I->OutFdReady();
242 }
243 }
244 /*}}}*/
245 // Acquire::Run - Run the fetch sequence /*{{{*/
246 // ---------------------------------------------------------------------
247 /* This runs the queues. It manages a select loop for all of the
248 Worker tasks. The workers interact with the queues and items to
249 manage the actual fetch. */
250 bool pkgAcquire::Run()
251 {
252 for (Queue *I = Queues; I != 0; I = I->Next)
253 I->Startup();
254
255 // Run till all things have been acquired
256 while (ToFetch > 0)
257 {
258 fd_set RFds;
259 fd_set WFds;
260 int Highest = 0;
261 FD_ZERO(&RFds);
262 FD_ZERO(&WFds);
263 SetFds(Highest,&RFds,&WFds);
264
265 if (select(Highest+1,&RFds,&WFds,0,0) <= 0)
266 return _error->Errno("select","Select has failed");
267
268 RunFds(&RFds,&WFds);
269 }
270
271 for (Queue *I = Queues; I != 0; I = I->Next)
272 I->Shutdown();
273
274 return true;
275 }
276 /*}}}*/
277
278 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
279 // ---------------------------------------------------------------------
280 /* */
281 pkgAcquire::MethodConfig::MethodConfig()
282 {
283 SingleInstance = false;
284 PreScan = false;
285 Pipeline = false;
286 SendConfig = false;
287 Next = 0;
288 }
289 /*}}}*/
290
291 // Queue::Queue - Constructor /*{{{*/
292 // ---------------------------------------------------------------------
293 /* */
294 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
295 Owner(Owner)
296 {
297 Items = 0;
298 Next = 0;
299 Workers = 0;
300 }
301 /*}}}*/
302 // Queue::~Queue - Destructor /*{{{*/
303 // ---------------------------------------------------------------------
304 /* */
305 pkgAcquire::Queue::~Queue()
306 {
307 Shutdown();
308
309 while (Items != 0)
310 {
311 QItem *Jnk = Items;
312 Items = Items->Next;
313 delete Jnk;
314 }
315 }
316 /*}}}*/
317 // Queue::Enqueue - Queue an item to the queue /*{{{*/
318 // ---------------------------------------------------------------------
319 /* */
320 void pkgAcquire::Queue::Enqueue(Item *Owner,string URI,string Description)
321 {
322 // Create a new item
323 QItem *I = new QItem;
324 I->Next = Items;
325 Items = I;
326
327 // Fill it in
328 Items->Owner = Owner;
329 Items->URI = URI;
330 Items->Description = Description;
331 Owner->QueueCounter++;
332 }
333 /*}}}*/
334 // Queue::Dequeue - Remove an item from the queue /*{{{*/
335 // ---------------------------------------------------------------------
336 /* */
337 void pkgAcquire::Queue::Dequeue(Item *Owner)
338 {
339 QItem **I = &Items;
340 for (; *I != 0;)
341 {
342 if ((*I)->Owner == Owner)
343 {
344 QItem *Jnk= *I;
345 *I = (*I)->Next;
346 Owner->QueueCounter--;
347 delete Jnk;
348 }
349 else
350 I = &(*I)->Next;
351 }
352 }
353 /*}}}*/
354 // Queue::Startup - Start the worker processes /*{{{*/
355 // ---------------------------------------------------------------------
356 /* */
357 bool pkgAcquire::Queue::Startup()
358 {
359 Shutdown();
360
361 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(URIAccess(Name));
362 if (Cnf == 0)
363 return false;
364
365 Workers = new Worker(this,Cnf);
366 Owner->Add(Workers);
367 if (Workers->Start() == false)
368 return false;
369
370 Items->Worker = Workers;
371 Workers->QueueItem(Items);
372
373 return true;
374 }
375 /*}}}*/
376 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
377 // ---------------------------------------------------------------------
378 /* */
379 bool pkgAcquire::Queue::Shutdown()
380 {
381 // Delete all of the workers
382 while (Workers != 0)
383 {
384 pkgAcquire::Worker *Jnk = Workers;
385 Workers = Workers->NextQueue;
386 Owner->Remove(Jnk);
387 delete Jnk;
388 }
389
390 return true;
391 }
392 /*}}}*/
393 // Queue::Finditem - Find a URI in the item list /*{{{*/
394 // ---------------------------------------------------------------------
395 /* */
396 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
397 {
398 for (QItem *I = Items; I != 0; I = I->Next)
399 if (I->URI == URI && I->Worker == Owner)
400 return I;
401 return 0;
402 }
403 /*}}}*/
404 // Queue::ItemDone - Item has been completed /*{{{*/
405 // ---------------------------------------------------------------------
406 /* The worker signals this which causes the item to be removed from the
407 queue. */
408 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
409 {
410 Dequeue(Itm->Owner);
411
412 if (Items == 0)
413 return true;
414
415 Items->Worker = Workers;
416 Items->Owner->Status = pkgAcquire::Item::StatFetching;
417 return Workers->QueueItem(Items);
418 }
419 /*}}}*/