Sync
[ntk/apt.git] / apt-pkg / acquire-worker.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
c88edf1d 3// $Id: acquire-worker.cc,v 1.6 1998/10/24 04:57:58 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire Worker
7
3b5421b4
AL
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
11
0118833a
AL
12 ##################################################################### */
13 /*}}}*/
14// Include Files /*{{{*/
15#ifdef __GNUG__
16#pragma implementation "apt-pkg/acquire-worker.h"
3b5421b4 17#endif
0118833a 18#include <apt-pkg/acquire-worker.h>
0a8a80e5 19#include <apt-pkg/acquire-item.h>
3b5421b4
AL
20#include <apt-pkg/configuration.h>
21#include <apt-pkg/error.h>
22#include <apt-pkg/fileutl.h>
23#include <strutl.h>
24
25#include <unistd.h>
26#include <signal.h>
93641593 27#include <wait.h>
3b5421b4
AL
28 /*}}}*/
29
30// Worker::Worker - Constructor for Queue startup /*{{{*/
31// ---------------------------------------------------------------------
32/* */
0a8a80e5 33pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf)
3b5421b4
AL
34{
35 OwnerQ = Q;
0a8a80e5
AL
36 Config = Cnf;
37 Access = Cnf->Access;
38 CurrentItem = 0;
3b5421b4
AL
39
40 Construct();
41}
42 /*}}}*/
43// Worker::Worker - Constructor for method config startup /*{{{*/
44// ---------------------------------------------------------------------
45/* */
46pkgAcquire::Worker::Worker(MethodConfig *Cnf)
47{
48 OwnerQ = 0;
49 Config = Cnf;
50 Access = Cnf->Access;
0a8a80e5
AL
51 CurrentItem = 0;
52
3b5421b4
AL
53 Construct();
54}
55 /*}}}*/
56// Worker::Construct - Constructor helper /*{{{*/
57// ---------------------------------------------------------------------
58/* */
59void pkgAcquire::Worker::Construct()
60{
0a8a80e5
AL
61 NextQueue = 0;
62 NextAcquire = 0;
3b5421b4
AL
63 Process = -1;
64 InFd = -1;
65 OutFd = -1;
0a8a80e5
AL
66 OutReady = false;
67 InReady = false;
3b5421b4
AL
68 Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
69}
70 /*}}}*/
71// Worker::~Worker - Destructor /*{{{*/
72// ---------------------------------------------------------------------
73/* */
74pkgAcquire::Worker::~Worker()
75{
76 close(InFd);
77 close(OutFd);
78
79 if (Process > 0)
0a8a80e5 80 {
3b5421b4 81 kill(Process,SIGINT);
0a8a80e5
AL
82 if (waitpid(Process,0,0) != Process)
83 _error->Warning("I waited but nothing was there!");
84 }
3b5421b4
AL
85}
86 /*}}}*/
87// Worker::Start - Start the worker process /*{{{*/
88// ---------------------------------------------------------------------
89/* This forks the method and inits the communication channel */
90bool pkgAcquire::Worker::Start()
91{
92 // Get the method path
93 string Method = _config->FindDir("Dir::Bin::Methods") + Access;
94 if (FileExists(Method) == false)
95 return _error->Error("The method driver %s could not be found.",Method.c_str());
96
97 if (Debug == true)
98 clog << "Starting method '" << Method << '\'' << endl;
99
100 // Create the pipes
101 int Pipes[4] = {-1,-1,-1,-1};
102 if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
103 {
104 _error->Errno("pipe","Failed to create IPC pipe to subprocess");
105 for (int I = 0; I != 4; I++)
106 close(Pipes[I]);
107 return false;
108 }
109
110 // Fork off the process
111 Process = fork();
112 if (Process < 0)
113 {
114 cerr << "FATAL -> Failed to fork." << endl;
115 exit(100);
116 }
117
118 // Spawn the subprocess
119 if (Process == 0)
120 {
121 // Setup the FDs
122 dup2(Pipes[1],STDOUT_FILENO);
123 dup2(Pipes[2],STDIN_FILENO);
124 dup2(((filebuf *)clog.rdbuf())->fd(),STDERR_FILENO);
125 for (int I = 0; I != 4; I++)
126 close(Pipes[I]);
127 SetCloseExec(STDOUT_FILENO,false);
128 SetCloseExec(STDIN_FILENO,false);
129 SetCloseExec(STDERR_FILENO,false);
130
131 const char *Args[2];
132 Args[0] = Method.c_str();
133 Args[1] = 0;
134 execv(Args[0],(char **)Args);
135 cerr << "Failed to exec method " << Args[0] << endl;
136 exit(100);
137 }
138
139 // Fix up our FDs
140 InFd = Pipes[0];
141 OutFd = Pipes[3];
142 SetNonBlock(Pipes[0],true);
143 SetNonBlock(Pipes[3],true);
144 close(Pipes[1]);
145 close(Pipes[2]);
0a8a80e5
AL
146 OutReady = false;
147 InReady = true;
3b5421b4
AL
148
149 // Read the configuration data
150 if (WaitFd(InFd) == false ||
151 ReadMessages() == false)
152 return _error->Error("Method %s did not start correctly",Method.c_str());
153
154 RunMessages();
0a8a80e5 155 SendConfiguration();
3b5421b4
AL
156
157 return true;
158}
159 /*}}}*/
160// Worker::ReadMessages - Read all pending messages into the list /*{{{*/
161// ---------------------------------------------------------------------
0a8a80e5 162/* */
3b5421b4
AL
163bool pkgAcquire::Worker::ReadMessages()
164{
0a8a80e5
AL
165 if (::ReadMessages(InFd,MessageQueue) == false)
166 return MethodFailure();
3b5421b4
AL
167 return true;
168}
169 /*}}}*/
3b5421b4
AL
170// Worker::RunMessage - Empty the message queue /*{{{*/
171// ---------------------------------------------------------------------
172/* This takes the messages from the message queue and runs them through
173 the parsers in order. */
174bool pkgAcquire::Worker::RunMessages()
175{
176 while (MessageQueue.empty() == false)
177 {
178 string Message = MessageQueue.front();
179 MessageQueue.erase(MessageQueue.begin());
0a8a80e5
AL
180
181 if (Debug == true)
182 clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
3b5421b4
AL
183
184 // Fetch the message number
185 char *End;
186 int Number = strtol(Message.c_str(),&End,10);
187 if (End == Message.c_str())
188 return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
189
c88edf1d
AL
190 string URI = LookupTag(Message,"URI");
191 pkgAcquire::Queue::QItem *Itm = 0;
192 if (URI.empty() == false)
193 Itm = OwnerQ->FindItem(URI,this);
194
3b5421b4
AL
195 // Determine the message number and dispatch
196 switch (Number)
197 {
0a8a80e5 198 // 100 Capabilities
3b5421b4
AL
199 case 100:
200 if (Capabilities(Message) == false)
201 return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
202 break;
0a8a80e5
AL
203
204 // 101 Log
205 case 101:
206 if (Debug == true)
207 clog << " <- (log) " << LookupTag(Message,"Message") << endl;
208 break;
209
210 // 102 Status
211 case 102:
212 Status = LookupTag(Message,"Message");
213 break;
214
215 // 200 URI Start
216 case 200:
c88edf1d
AL
217 {
218 if (Itm == 0)
219 {
220 _error->Warning("Method gave invalid 200 URI Start message");
221 break;
222 }
223 CurrentItem = Itm;
224 CurrentSize = 0;
225 TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
226
227 break;
228 }
0a8a80e5
AL
229
230 // 201 URI Done
231 case 201:
c88edf1d
AL
232 {
233 if (Itm == 0)
234 {
235 _error->Warning("Method gave invalid 400 URI Failure message");
236 break;
237 }
238
239 Itm->Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
240 LookupTag(Message,"MD5-Hash"));
241 OwnerQ->ItemDone(Itm);
242 break;
243 }
0a8a80e5
AL
244
245 // 400 URI Failure
246 case 400:
c88edf1d
AL
247 {
248 if (Itm == 0)
249 {
250 _error->Warning("Method gave invalid 400 URI Failure message");
251 break;
252 }
253
254 Itm->Owner->Failed(Message);
255 OwnerQ->ItemDone(Itm);
256 break;
257 }
0a8a80e5
AL
258
259 // 401 General Failure
260 case 401:
261 _error->Error("Method %s General failure: %s",LookupTag(Message,"Message").c_str());
262 break;
3b5421b4
AL
263 }
264 }
265 return true;
266}
267 /*}}}*/
268// Worker::Capabilities - 100 Capabilities handler /*{{{*/
269// ---------------------------------------------------------------------
270/* This parses the capabilities message and dumps it into the configuration
271 structure. */
272bool pkgAcquire::Worker::Capabilities(string Message)
273{
274 if (Config == 0)
275 return true;
276
277 Config->Version = LookupTag(Message,"Version");
278 Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
279 Config->PreScan = StringToBool(LookupTag(Message,"Pre-Scan"),false);
0a8a80e5
AL
280 Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
281 Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
3b5421b4
AL
282
283 // Some debug text
284 if (Debug == true)
285 {
286 clog << "Configured access method " << Config->Access << endl;
0a8a80e5
AL
287 clog << "Version:" << Config->Version << " SingleInstance:" <<
288 Config->SingleInstance << " PreScan: " << Config->PreScan <<
289 " Pipeline:" << Config->Pipeline << " SendConfig:" <<
290 Config->SendConfig << endl;
3b5421b4
AL
291 }
292
293 return true;
294}
0118833a 295 /*}}}*/
0a8a80e5
AL
296// Worker::SendConfiguration - Send the config to the method /*{{{*/
297// ---------------------------------------------------------------------
298/* */
299bool pkgAcquire::Worker::SendConfiguration()
300{
301 if (Config->SendConfig == false)
302 return true;
303
304 if (OutFd == -1)
305 return false;
306
307 string Message = "601 Configuration\n";
308 Message.reserve(2000);
309
310 /* Write out all of the configuration directives by walking the
311 configuration tree */
312 const Configuration::Item *Top = _config->Tree(0);
313 for (; Top != 0;)
314 {
315 if (Top->Value.empty() == false)
316 {
317 string Line = "Config-Item: " + Top->FullTag() + "=";
318 Line += QuoteString(Top->Value,"\n") + '\n';
319 Message += Line;
320 }
321
322 if (Top->Child != 0)
323 {
324 Top = Top->Child;
325 continue;
326 }
327
328 while (Top != 0 && Top->Next == 0)
329 Top = Top->Parent;
330 if (Top != 0)
331 Top = Top->Next;
332 }
333 Message += '\n';
334
335 if (Debug == true)
336 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
337 OutQueue += Message;
338 OutReady = true;
339
340 return true;
341}
342 /*}}}*/
343// Worker::QueueItem - Add an item to the outbound queue /*{{{*/
344// ---------------------------------------------------------------------
345/* Send a URI Acquire message to the method */
346bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
347{
348 if (OutFd == -1)
349 return false;
350
351 string Message = "600 URI Acquire\n";
352 Message.reserve(300);
353 Message += "URI: " + Item->URI;
354 Message += "\nFilename: " + Item->Owner->DestFile;
355 Message += Item->Owner->Custom600Headers();
356 Message += "\n\n";
357
358 if (Debug == true)
359 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
360 OutQueue += Message;
361 OutReady = true;
362
363 return true;
364}
365 /*}}}*/
366// Worker::OutFdRead - Out bound FD is ready /*{{{*/
367// ---------------------------------------------------------------------
368/* */
369bool pkgAcquire::Worker::OutFdReady()
370{
371 int Res = write(OutFd,OutQueue.begin(),OutQueue.length());
372 if (Res <= 0)
373 return MethodFailure();
374
375 // Hmm.. this should never happen.
376 if (Res < 0)
377 return true;
378
379 OutQueue.erase(0,Res);
380 if (OutQueue.empty() == true)
381 OutReady = false;
382
383 return true;
384}
385 /*}}}*/
386// Worker::InFdRead - In bound FD is ready /*{{{*/
387// ---------------------------------------------------------------------
388/* */
389bool pkgAcquire::Worker::InFdReady()
390{
391 if (ReadMessages() == false)
392 return false;
393 RunMessages();
394 return true;
395}
396 /*}}}*/
397// Worker::MethodFailure - Called when the method fails /*{{{*/
398// ---------------------------------------------------------------------
399/* This is called when the method is belived to have failed, probably because
400 read returned -1. */
401bool pkgAcquire::Worker::MethodFailure()
402{
403 cerr << "Method " << Access << " has died unexpectedly!" << endl;
404 if (waitpid(Process,0,0) != Process)
405 _error->Warning("I waited but nothing was there!");
406 Process = -1;
407 close(InFd);
408 close(OutFd);
409 InFd = -1;
410 OutFd = -1;
411 OutReady = false;
412 InReady = false;
413 OutQueue = string();
414 MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
415
416 return false;
417}
418 /*}}}*/