1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.5 1998/10/23 00:49:58 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
16 #pragma implementation "apt-pkg/acquire-worker.h"
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
30 // Worker::Worker - Constructor for Queue startup /*{{{*/
31 // ---------------------------------------------------------------------
33 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
)
43 // Worker::Worker - Constructor for method config startup /*{{{*/
44 // ---------------------------------------------------------------------
46 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
)
56 // Worker::Construct - Constructor helper /*{{{*/
57 // ---------------------------------------------------------------------
59 void pkgAcquire::Worker::Construct()
68 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
71 // Worker::~Worker - Destructor /*{{{*/
72 // ---------------------------------------------------------------------
74 pkgAcquire::Worker::~Worker()
82 if (waitpid(Process
,0,0) != Process
)
83 _error
->Warning("I waited but nothing was there!");
87 // Worker::Start - Start the worker process /*{{{*/
88 // ---------------------------------------------------------------------
89 /* This forks the method and inits the communication channel */
90 bool pkgAcquire::Worker::Start()
92 // Get the method path
93 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
94 if (FileExists(Method
) == false)
95 return _error
->Error("The method driver %s could not be found.",Method
.c_str());
98 clog
<< "Starting method '" << Method
<< '\'' << endl
;
101 int Pipes
[4] = {-1,-1,-1,-1};
102 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
104 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
105 for (int I
= 0; I
!= 4; I
++)
110 // Fork off the process
114 cerr
<< "FATAL -> Failed to fork." << endl
;
118 // Spawn the subprocess
122 dup2(Pipes
[1],STDOUT_FILENO
);
123 dup2(Pipes
[2],STDIN_FILENO
);
124 dup2(((filebuf
*)clog
.rdbuf())->fd(),STDERR_FILENO
);
125 for (int I
= 0; I
!= 4; I
++)
127 SetCloseExec(STDOUT_FILENO
,false);
128 SetCloseExec(STDIN_FILENO
,false);
129 SetCloseExec(STDERR_FILENO
,false);
132 Args
[0] = Method
.c_str();
134 execv(Args
[0],(char **)Args
);
135 cerr
<< "Failed to exec method " << Args
[0] << endl
;
142 SetNonBlock(Pipes
[0],true);
143 SetNonBlock(Pipes
[3],true);
149 // Read the configuration data
150 if (WaitFd(InFd
) == false ||
151 ReadMessages() == false)
152 return _error
->Error("Method %s did not start correctly",Method
.c_str());
160 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
161 // ---------------------------------------------------------------------
163 bool pkgAcquire::Worker::ReadMessages()
165 if (::ReadMessages(InFd
,MessageQueue
) == false)
166 return MethodFailure();
170 // Worker::RunMessage - Empty the message queue /*{{{*/
171 // ---------------------------------------------------------------------
172 /* This takes the messages from the message queue and runs them through
173 the parsers in order. */
174 bool pkgAcquire::Worker::RunMessages()
176 while (MessageQueue
.empty() == false)
178 string Message
= MessageQueue
.front();
179 MessageQueue
.erase(MessageQueue
.begin());
182 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
184 // Fetch the message number
186 int Number
= strtol(Message
.c_str(),&End
,10);
187 if (End
== Message
.c_str())
188 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
190 // Determine the message number and dispatch
195 if (Capabilities(Message
) == false)
196 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
202 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
207 Status
= LookupTag(Message
,"Message");
222 // 401 General Failure
224 _error
->Error("Method %s General failure: %s",LookupTag(Message
,"Message").c_str());
231 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
232 // ---------------------------------------------------------------------
233 /* This parses the capabilities message and dumps it into the configuration
235 bool pkgAcquire::Worker::Capabilities(string Message
)
240 Config
->Version
= LookupTag(Message
,"Version");
241 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
242 Config
->PreScan
= StringToBool(LookupTag(Message
,"Pre-Scan"),false);
243 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
244 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
249 clog
<< "Configured access method " << Config
->Access
<< endl
;
250 clog
<< "Version:" << Config
->Version
<< " SingleInstance:" <<
251 Config
->SingleInstance
<< " PreScan: " << Config
->PreScan
<<
252 " Pipeline:" << Config
->Pipeline
<< " SendConfig:" <<
253 Config
->SendConfig
<< endl
;
259 // Worker::SendConfiguration - Send the config to the method /*{{{*/
260 // ---------------------------------------------------------------------
262 bool pkgAcquire::Worker::SendConfiguration()
264 if (Config
->SendConfig
== false)
270 string Message
= "601 Configuration\n";
271 Message
.reserve(2000);
273 /* Write out all of the configuration directives by walking the
274 configuration tree */
275 const Configuration::Item
*Top
= _config
->Tree(0);
278 if (Top
->Value
.empty() == false)
280 string Line
= "Config-Item: " + Top
->FullTag() + "=";
281 Line
+= QuoteString(Top
->Value
,"\n") + '\n';
291 while (Top
!= 0 && Top
->Next
== 0)
299 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
306 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
307 // ---------------------------------------------------------------------
308 /* Send a URI Acquire message to the method */
309 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
314 string Message
= "600 URI Acquire\n";
315 Message
.reserve(300);
316 Message
+= "URI: " + Item
->URI
;
317 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
318 Message
+= Item
->Owner
->Custom600Headers();
322 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
329 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
330 // ---------------------------------------------------------------------
332 bool pkgAcquire::Worker::OutFdReady()
334 int Res
= write(OutFd
,OutQueue
.begin(),OutQueue
.length());
336 return MethodFailure();
338 // Hmm.. this should never happen.
342 OutQueue
.erase(0,Res
);
343 if (OutQueue
.empty() == true)
349 // Worker::InFdRead - In bound FD is ready /*{{{*/
350 // ---------------------------------------------------------------------
352 bool pkgAcquire::Worker::InFdReady()
354 if (ReadMessages() == false)
360 // Worker::MethodFailure - Called when the method fails /*{{{*/
361 // ---------------------------------------------------------------------
362 /* This is called when the method is belived to have failed, probably because
364 bool pkgAcquire::Worker::MethodFailure()
366 cerr
<< "Method " << Access
<< " has died unexpectedly!" << endl
;
367 if (waitpid(Process
,0,0) != Process
)
368 _error
->Warning("I waited but nothing was there!");
377 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());