merge with debian/sid
[ntk/apt.git] / ftparchive / multicompress.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: multicompress.cc,v 1.4 2003/02/10 07:34:41 doogie Exp $
4 /* ######################################################################
5
6 MultiCompressor
7
8 This class is very complicated in order to optimize for the common
9 case of its use, writing a large set of compressed files that are
10 different from the old set. It spawns off compressors in parallel
11 to maximize compression throughput and has a separate task managing
12 the data going into the compressors.
13
14 ##################################################################### */
15 /*}}}*/
16 // Include Files /*{{{*/
17 #include <config.h>
18
19 #include <apt-pkg/strutl.h>
20 #include <apt-pkg/error.h>
21 #include <apt-pkg/md5.h>
22
23 #include <sys/types.h>
24 #include <sys/stat.h>
25 #include <utime.h>
26 #include <unistd.h>
27 #include <iostream>
28
29 #include "multicompress.h"
30 #include <apti18n.h>
31 /*}}}*/
32
33 using namespace std;
34
35
36 // MultiCompress::MultiCompress - Constructor /*{{{*/
37 // ---------------------------------------------------------------------
38 /* Setup the file outputs, compression modes and fork the writer child */
39 MultiCompress::MultiCompress(string const &Output,string const &Compress,
40 mode_t const &Permissions,bool const &Write) :
41 Permissions(Permissions)
42 {
43 Outputs = 0;
44 Outputter = -1;
45 Input = 0;
46 UpdateMTime = 0;
47
48 /* Parse the compression string, a space separated lists of compresison
49 types */
50 string::const_iterator I = Compress.begin();
51 for (; I != Compress.end();)
52 {
53 for (; I != Compress.end() && isspace(*I); ++I);
54
55 // Grab a word
56 string::const_iterator Start = I;
57 for (; I != Compress.end() && !isspace(*I); ++I);
58
59 // Find the matching compressor
60 std::vector<APT::Configuration::Compressor> Compressors = APT::Configuration::getCompressors();
61 std::vector<APT::Configuration::Compressor>::const_iterator Comp = Compressors.begin();
62 for (; Comp != Compressors.end(); ++Comp)
63 if (stringcmp(Start,I,Comp->Name.c_str()) == 0)
64 break;
65
66 // Hmm.. unknown.
67 if (Comp == Compressors.end())
68 {
69 _error->Warning(_("Unknown compression algorithm '%s'"),string(Start,I).c_str());
70 continue;
71 }
72
73 // Create and link in a new output
74 Files *NewOut = new Files;
75 NewOut->Next = Outputs;
76 Outputs = NewOut;
77 NewOut->CompressProg = *Comp;
78 NewOut->Output = Output+Comp->Extension;
79
80 struct stat St;
81 if (stat(NewOut->Output.c_str(),&St) == 0)
82 NewOut->OldMTime = St.st_mtime;
83 else
84 NewOut->OldMTime = 0;
85 }
86
87 if (Write == false)
88 return;
89
90 /* Open all the temp files now so we can report any errors. File is
91 made unreable to prevent people from touching it during creating. */
92 for (Files *I = Outputs; I != 0; I = I->Next)
93 I->TmpFile.Open(I->Output + ".new",FileFd::WriteEmpty,0600);
94 if (_error->PendingError() == true)
95 return;
96
97 if (Outputs == 0)
98 {
99 _error->Error(_("Compressed output %s needs a compression set"),Output.c_str());
100 return;
101 }
102
103 Start();
104 }
105 /*}}}*/
106 // MultiCompress::~MultiCompress - Destructor /*{{{*/
107 // ---------------------------------------------------------------------
108 /* Just erase the file linked list. */
109 MultiCompress::~MultiCompress()
110 {
111 Die();
112
113 for (; Outputs != 0;)
114 {
115 Files *Tmp = Outputs->Next;
116 delete Outputs;
117 Outputs = Tmp;
118 }
119 }
120 /*}}}*/
121 // MultiCompress::GetStat - Get stat information for compressed files /*{{{*/
122 // ---------------------------------------------------------------------
123 /* This checks each compressed file to make sure it exists and returns
124 stat information for a random file from the collection. False means
125 one or more of the files is missing. */
126 bool MultiCompress::GetStat(string const &Output,string const &Compress,struct stat &St)
127 {
128 /* Parse the compression string, a space separated lists of compresison
129 types */
130 string::const_iterator I = Compress.begin();
131 bool DidStat = false;
132 for (; I != Compress.end();)
133 {
134 for (; I != Compress.end() && isspace(*I); ++I);
135
136 // Grab a word
137 string::const_iterator Start = I;
138 for (; I != Compress.end() && !isspace(*I); ++I);
139
140 // Find the matching compressor
141 std::vector<APT::Configuration::Compressor> Compressors = APT::Configuration::getCompressors();
142 std::vector<APT::Configuration::Compressor>::const_iterator Comp = Compressors.begin();
143 for (; Comp != Compressors.end(); ++Comp)
144 if (stringcmp(Start,I,Comp->Name.c_str()) == 0)
145 break;
146
147 // Hmm.. unknown.
148 if (Comp == Compressors.end())
149 continue;
150
151 string Name = Output+Comp->Extension;
152 if (stat(Name.c_str(),&St) != 0)
153 return false;
154 DidStat = true;
155 }
156 return DidStat;
157 }
158 /*}}}*/
159 // MultiCompress::Start - Start up the writer child /*{{{*/
160 // ---------------------------------------------------------------------
161 /* Fork a child and setup the communication pipe. */
162 bool MultiCompress::Start()
163 {
164 // Create a data pipe
165 int Pipe[2] = {-1,-1};
166 if (pipe(Pipe) != 0)
167 return _error->Errno("pipe",_("Failed to create IPC pipe to subprocess"));
168 for (int I = 0; I != 2; I++)
169 SetCloseExec(Pipe[I],true);
170
171 // The child..
172 Outputter = fork();
173 if (Outputter == 0)
174 {
175 close(Pipe[1]);
176 Child(Pipe[0]);
177 if (_error->PendingError() == true)
178 {
179 _error->DumpErrors();
180 _exit(100);
181 }
182 _exit(0);
183 };
184
185 /* Tidy up the temp files, we open them in the constructor so as to
186 get proper error reporting. Close them now. */
187 for (Files *I = Outputs; I != 0; I = I->Next)
188 I->TmpFile.Close();
189
190 close(Pipe[0]);
191 Input = fdopen(Pipe[1],"w");
192 if (Input == 0)
193 return _error->Errno("fdopen",_("Failed to create FILE*"));
194
195 if (Outputter == -1)
196 return _error->Errno("fork",_("Failed to fork"));
197 return true;
198 }
199 /*}}}*/
200 // MultiCompress::Die - Clean up the writer /*{{{*/
201 // ---------------------------------------------------------------------
202 /* */
203 bool MultiCompress::Die()
204 {
205 if (Input == 0)
206 return true;
207
208 fclose(Input);
209 Input = 0;
210 bool Res = ExecWait(Outputter,_("Compress child"),false);
211 Outputter = -1;
212 return Res;
213 }
214 /*}}}*/
215 // MultiCompress::Finalize - Finish up writing /*{{{*/
216 // ---------------------------------------------------------------------
217 /* This is only necessary for statistics reporting. */
218 bool MultiCompress::Finalize(unsigned long long &OutSize)
219 {
220 OutSize = 0;
221 if (Input == 0 || Die() == false)
222 return false;
223
224 time_t Now;
225 time(&Now);
226
227 // Check the mtimes to see if the files were replaced.
228 bool Changed = false;
229 for (Files *I = Outputs; I != 0; I = I->Next)
230 {
231 struct stat St;
232 if (stat(I->Output.c_str(),&St) != 0)
233 return _error->Error(_("Internal error, failed to create %s"),
234 I->Output.c_str());
235
236 if (I->OldMTime != St.st_mtime)
237 Changed = true;
238 else
239 {
240 // Update the mtime if necessary
241 if (UpdateMTime > 0 &&
242 (Now - St.st_mtime > (signed)UpdateMTime || St.st_mtime > Now))
243 {
244 struct utimbuf Buf;
245 Buf.actime = Buf.modtime = Now;
246 utime(I->Output.c_str(),&Buf);
247 Changed = true;
248 }
249 }
250
251 // Force the file permissions
252 if (St.st_mode != Permissions)
253 chmod(I->Output.c_str(),Permissions);
254
255 OutSize += St.st_size;
256 }
257
258 if (Changed == false)
259 OutSize = 0;
260
261 return true;
262 }
263 /*}}}*/
264 // MultiCompress::OpenCompress - Open the compressor /*{{{*/
265 // ---------------------------------------------------------------------
266 /* This opens the compressor, either in compress mode or decompress
267 mode. FileFd is always the compressor input/output file,
268 OutFd is the created pipe, Input for Compress, Output for Decompress. */
269 bool MultiCompress::OpenCompress(APT::Configuration::Compressor const &Prog,
270 pid_t &Pid,int const &FileFd,int &OutFd,bool const &Comp)
271 {
272 Pid = -1;
273
274 // No compression
275 if (Prog.Binary.empty() == true)
276 {
277 OutFd = dup(FileFd);
278 return true;
279 }
280
281 // Create a data pipe
282 int Pipe[2] = {-1,-1};
283 if (pipe(Pipe) != 0)
284 return _error->Errno("pipe",_("Failed to create subprocess IPC"));
285 for (int J = 0; J != 2; J++)
286 SetCloseExec(Pipe[J],true);
287
288 if (Comp == true)
289 OutFd = Pipe[1];
290 else
291 OutFd = Pipe[0];
292
293 // The child..
294 Pid = ExecFork();
295 if (Pid == 0)
296 {
297 if (Comp == true)
298 {
299 dup2(FileFd,STDOUT_FILENO);
300 dup2(Pipe[0],STDIN_FILENO);
301 }
302 else
303 {
304 dup2(FileFd,STDIN_FILENO);
305 dup2(Pipe[1],STDOUT_FILENO);
306 }
307
308 SetCloseExec(STDOUT_FILENO,false);
309 SetCloseExec(STDIN_FILENO,false);
310
311 std::vector<char const*> Args;
312 Args.push_back(Prog.Binary.c_str());
313 std::vector<std::string> const * const addArgs =
314 (Comp == true) ? &(Prog.CompressArgs) : &(Prog.UncompressArgs);
315 for (std::vector<std::string>::const_iterator a = addArgs->begin();
316 a != addArgs->end(); ++a)
317 Args.push_back(a->c_str());
318 Args.push_back(NULL);
319
320 execvp(Args[0],(char **)&Args[0]);
321 cerr << _("Failed to exec compressor ") << Args[0] << endl;
322 _exit(100);
323 };
324 if (Comp == true)
325 close(Pipe[0]);
326 else
327 close(Pipe[1]);
328 return true;
329 }
330 /*}}}*/
331 // MultiCompress::OpenOld - Open an old file /*{{{*/
332 // ---------------------------------------------------------------------
333 /* This opens one of the original output files, possibly decompressing it. */
334 bool MultiCompress::OpenOld(int &Fd,pid_t &Proc)
335 {
336 Files *Best = Outputs;
337 for (Files *I = Outputs; I != 0; I = I->Next)
338 if (Best->CompressProg.Cost > I->CompressProg.Cost)
339 Best = I;
340
341 // Open the file
342 FileFd F(Best->Output,FileFd::ReadOnly);
343 if (_error->PendingError() == true)
344 return false;
345
346 // Decompress the file so we can read it
347 if (OpenCompress(Best->CompressProg,Proc,F.Fd(),Fd,false) == false)
348 return false;
349
350 return true;
351 }
352 /*}}}*/
353 // MultiCompress::CloseOld - Close the old file /*{{{*/
354 // ---------------------------------------------------------------------
355 /* */
356 bool MultiCompress::CloseOld(int Fd,pid_t Proc)
357 {
358 close(Fd);
359 if (Proc != -1)
360 if (ExecWait(Proc,_("decompressor"),false) == false)
361 return false;
362 return true;
363 }
364 /*}}}*/
365 // MultiCompress::Child - The writer child /*{{{*/
366 // ---------------------------------------------------------------------
367 /* The child process forks a bunch of compression children and takes
368 input on FD and passes it to all the compressor child. On the way it
369 computes the MD5 of the raw data. After this the raw data in the
370 original files is compared to see if this data is new. If the data
371 is new then the temp files are renamed, otherwise they are erased. */
372 bool MultiCompress::Child(int const &FD)
373 {
374 // Start the compression children.
375 for (Files *I = Outputs; I != 0; I = I->Next)
376 {
377 if (OpenCompress(I->CompressProg,I->CompressProc,I->TmpFile.Fd(),
378 I->Fd,true) == false)
379 return false;
380 }
381
382 /* Okay, now we just feed data from FD to all the other FDs. Also
383 stash a hash of the data to use later. */
384 SetNonBlock(FD,false);
385 unsigned char Buffer[32*1024];
386 unsigned long long FileSize = 0;
387 MD5Summation MD5;
388 while (1)
389 {
390 WaitFd(FD,false);
391 int Res = read(FD,Buffer,sizeof(Buffer));
392 if (Res == 0)
393 break;
394 if (Res < 0)
395 continue;
396
397 MD5.Add(Buffer,Res);
398 FileSize += Res;
399 for (Files *I = Outputs; I != 0; I = I->Next)
400 {
401 if (write(I->Fd,Buffer,Res) != Res)
402 {
403 _error->Errno("write",_("IO to subprocess/file failed"));
404 break;
405 }
406 }
407 }
408
409 // Close all the writers
410 for (Files *I = Outputs; I != 0; I = I->Next)
411 close(I->Fd);
412
413 // Wait for the compressors to exit
414 for (Files *I = Outputs; I != 0; I = I->Next)
415 {
416 if (I->CompressProc != -1)
417 ExecWait(I->CompressProc, I->CompressProg.Binary.c_str(), false);
418 }
419
420 if (_error->PendingError() == true)
421 return false;
422
423 /* Now we have to copy the files over, or erase them if they
424 have not changed. First find the cheapest decompressor */
425 bool Missing = false;
426 for (Files *I = Outputs; I != 0; I = I->Next)
427 {
428 if (I->OldMTime == 0)
429 {
430 Missing = true;
431 break;
432 }
433 }
434
435 // Check the MD5 of the lowest cost entity.
436 while (Missing == false)
437 {
438 int CompFd = -1;
439 pid_t Proc = -1;
440 if (OpenOld(CompFd,Proc) == false)
441 {
442 _error->Discard();
443 break;
444 }
445
446 // Compute the hash
447 MD5Summation OldMD5;
448 unsigned long long NewFileSize = 0;
449 while (1)
450 {
451 int Res = read(CompFd,Buffer,sizeof(Buffer));
452 if (Res == 0)
453 break;
454 if (Res < 0)
455 return _error->Errno("read",_("Failed to read while computing MD5"));
456 NewFileSize += Res;
457 OldMD5.Add(Buffer,Res);
458 }
459
460 // Tidy the compressor
461 if (CloseOld(CompFd,Proc) == false)
462 return false;
463
464 // Check the hash
465 if (OldMD5.Result() == MD5.Result() &&
466 FileSize == NewFileSize)
467 {
468 for (Files *I = Outputs; I != 0; I = I->Next)
469 {
470 I->TmpFile.Close();
471 if (unlink(I->TmpFile.Name().c_str()) != 0)
472 _error->Errno("unlink",_("Problem unlinking %s"),
473 I->TmpFile.Name().c_str());
474 }
475 return !_error->PendingError();
476 }
477 break;
478 }
479
480 // Finalize
481 for (Files *I = Outputs; I != 0; I = I->Next)
482 {
483 // Set the correct file modes
484 fchmod(I->TmpFile.Fd(),Permissions);
485
486 if (rename(I->TmpFile.Name().c_str(),I->Output.c_str()) != 0)
487 _error->Errno("rename",_("Failed to rename %s to %s"),
488 I->TmpFile.Name().c_str(),I->Output.c_str());
489 I->TmpFile.Close();
490 }
491
492 return !_error->PendingError();
493 }
494 /*}}}*/
495