allow Open() and OpenDescriptor() to be called with a Compressor
[ntk/apt.git] / ftparchive / multicompress.cc
CommitLineData
b2e465d6
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
dc738e7a 3// $Id: multicompress.cc,v 1.4 2003/02/10 07:34:41 doogie Exp $
b2e465d6
AL
4/* ######################################################################
5
6 MultiCompressor
7
8 This class is very complicated in order to optimize for the common
9 case of its use, writing a large set of compressed files that are
10 different from the old set. It spawns off compressors in parallel
11 to maximize compression throughput and has a separate task managing
12 the data going into the compressors.
13
14 ##################################################################### */
15 /*}}}*/
16// Include Files /*{{{*/
ea542140
DK
17#include <config.h>
18
699b209e 19#include <apt-pkg/fileutl.h>
b2e465d6
AL
20#include <apt-pkg/strutl.h>
21#include <apt-pkg/error.h>
22#include <apt-pkg/md5.h>
ea542140 23
b2e465d6
AL
24#include <sys/types.h>
25#include <sys/stat.h>
26#include <utime.h>
27#include <unistd.h>
ea542140
DK
28#include <iostream>
29
30#include "multicompress.h"
31#include <apti18n.h>
b2e465d6
AL
32 /*}}}*/
33
812f4169
AL
34using namespace std;
35
b2e465d6
AL
36
37// MultiCompress::MultiCompress - Constructor /*{{{*/
38// ---------------------------------------------------------------------
39/* Setup the file outputs, compression modes and fork the writer child */
9209ec47
DK
40MultiCompress::MultiCompress(string const &Output,string const &Compress,
41 mode_t const &Permissions,bool const &Write) :
42 Permissions(Permissions)
b2e465d6
AL
43{
44 Outputs = 0;
45 Outputter = -1;
46 Input = 0;
47 UpdateMTime = 0;
03bef784 48
b2e465d6
AL
49 /* Parse the compression string, a space separated lists of compresison
50 types */
51 string::const_iterator I = Compress.begin();
52 for (; I != Compress.end();)
53 {
f7f0d6c7 54 for (; I != Compress.end() && isspace(*I); ++I);
b2e465d6
AL
55
56 // Grab a word
57 string::const_iterator Start = I;
f7f0d6c7 58 for (; I != Compress.end() && !isspace(*I); ++I);
b2e465d6
AL
59
60 // Find the matching compressor
03bef784
DK
61 std::vector<APT::Configuration::Compressor> Compressors = APT::Configuration::getCompressors();
62 std::vector<APT::Configuration::Compressor>::const_iterator Comp = Compressors.begin();
63 for (; Comp != Compressors.end(); ++Comp)
64 if (stringcmp(Start,I,Comp->Name.c_str()) == 0)
b2e465d6
AL
65 break;
66
67 // Hmm.. unknown.
03bef784 68 if (Comp == Compressors.end())
b2e465d6 69 {
db0db9fe 70 _error->Warning(_("Unknown compression algorithm '%s'"),string(Start,I).c_str());
b2e465d6
AL
71 continue;
72 }
73
74 // Create and link in a new output
75 Files *NewOut = new Files;
76 NewOut->Next = Outputs;
77 Outputs = NewOut;
03bef784 78 NewOut->CompressProg = *Comp;
b2e465d6
AL
79 NewOut->Output = Output+Comp->Extension;
80
81 struct stat St;
82 if (stat(NewOut->Output.c_str(),&St) == 0)
83 NewOut->OldMTime = St.st_mtime;
84 else
85 NewOut->OldMTime = 0;
86 }
87
88 if (Write == false)
89 return;
90
91 /* Open all the temp files now so we can report any errors. File is
92 made unreable to prevent people from touching it during creating. */
93 for (Files *I = Outputs; I != 0; I = I->Next)
94 I->TmpFile.Open(I->Output + ".new",FileFd::WriteEmpty,0600);
95 if (_error->PendingError() == true)
96 return;
97
98 if (Outputs == 0)
99 {
dc738e7a 100 _error->Error(_("Compressed output %s needs a compression set"),Output.c_str());
b2e465d6
AL
101 return;
102 }
103
104 Start();
105}
106 /*}}}*/
107// MultiCompress::~MultiCompress - Destructor /*{{{*/
108// ---------------------------------------------------------------------
109/* Just erase the file linked list. */
110MultiCompress::~MultiCompress()
111{
112 Die();
113
114 for (; Outputs != 0;)
115 {
116 Files *Tmp = Outputs->Next;
117 delete Outputs;
118 Outputs = Tmp;
119 }
120}
121 /*}}}*/
122// MultiCompress::GetStat - Get stat information for compressed files /*{{{*/
123// ---------------------------------------------------------------------
124/* This checks each compressed file to make sure it exists and returns
125 stat information for a random file from the collection. False means
126 one or more of the files is missing. */
9209ec47 127bool MultiCompress::GetStat(string const &Output,string const &Compress,struct stat &St)
b2e465d6
AL
128{
129 /* Parse the compression string, a space separated lists of compresison
130 types */
131 string::const_iterator I = Compress.begin();
132 bool DidStat = false;
133 for (; I != Compress.end();)
134 {
f7f0d6c7 135 for (; I != Compress.end() && isspace(*I); ++I);
b2e465d6
AL
136
137 // Grab a word
138 string::const_iterator Start = I;
f7f0d6c7 139 for (; I != Compress.end() && !isspace(*I); ++I);
b2e465d6
AL
140
141 // Find the matching compressor
03bef784
DK
142 std::vector<APT::Configuration::Compressor> Compressors = APT::Configuration::getCompressors();
143 std::vector<APT::Configuration::Compressor>::const_iterator Comp = Compressors.begin();
144 for (; Comp != Compressors.end(); ++Comp)
145 if (stringcmp(Start,I,Comp->Name.c_str()) == 0)
b2e465d6
AL
146 break;
147
148 // Hmm.. unknown.
03bef784 149 if (Comp == Compressors.end())
b2e465d6
AL
150 continue;
151
152 string Name = Output+Comp->Extension;
153 if (stat(Name.c_str(),&St) != 0)
154 return false;
155 DidStat = true;
156 }
157 return DidStat;
158}
159 /*}}}*/
160// MultiCompress::Start - Start up the writer child /*{{{*/
161// ---------------------------------------------------------------------
162/* Fork a child and setup the communication pipe. */
163bool MultiCompress::Start()
164{
165 // Create a data pipe
166 int Pipe[2] = {-1,-1};
167 if (pipe(Pipe) != 0)
dc738e7a 168 return _error->Errno("pipe",_("Failed to create IPC pipe to subprocess"));
b2e465d6
AL
169 for (int I = 0; I != 2; I++)
170 SetCloseExec(Pipe[I],true);
171
172 // The child..
173 Outputter = fork();
174 if (Outputter == 0)
175 {
176 close(Pipe[1]);
177 Child(Pipe[0]);
178 if (_error->PendingError() == true)
179 {
180 _error->DumpErrors();
181 _exit(100);
182 }
183 _exit(0);
184 };
185
186 /* Tidy up the temp files, we open them in the constructor so as to
187 get proper error reporting. Close them now. */
188 for (Files *I = Outputs; I != 0; I = I->Next)
189 I->TmpFile.Close();
190
191 close(Pipe[0]);
192 Input = fdopen(Pipe[1],"w");
193 if (Input == 0)
dc738e7a 194 return _error->Errno("fdopen",_("Failed to create FILE*"));
b2e465d6
AL
195
196 if (Outputter == -1)
dc738e7a 197 return _error->Errno("fork",_("Failed to fork"));
b2e465d6
AL
198 return true;
199}
200 /*}}}*/
201// MultiCompress::Die - Clean up the writer /*{{{*/
202// ---------------------------------------------------------------------
203/* */
204bool MultiCompress::Die()
205{
206 if (Input == 0)
207 return true;
208
209 fclose(Input);
210 Input = 0;
db0db9fe 211 bool Res = ExecWait(Outputter,_("Compress child"),false);
b2e465d6
AL
212 Outputter = -1;
213 return Res;
214}
215 /*}}}*/
216// MultiCompress::Finalize - Finish up writing /*{{{*/
217// ---------------------------------------------------------------------
218/* This is only necessary for statistics reporting. */
650faab0 219bool MultiCompress::Finalize(unsigned long long &OutSize)
b2e465d6
AL
220{
221 OutSize = 0;
222 if (Input == 0 || Die() == false)
223 return false;
224
225 time_t Now;
226 time(&Now);
227
228 // Check the mtimes to see if the files were replaced.
229 bool Changed = false;
230 for (Files *I = Outputs; I != 0; I = I->Next)
231 {
232 struct stat St;
233 if (stat(I->Output.c_str(),&St) != 0)
db0db9fe 234 return _error->Error(_("Internal error, failed to create %s"),
b2e465d6
AL
235 I->Output.c_str());
236
237 if (I->OldMTime != St.st_mtime)
238 Changed = true;
239 else
240 {
241 // Update the mtime if necessary
242 if (UpdateMTime > 0 &&
243 (Now - St.st_mtime > (signed)UpdateMTime || St.st_mtime > Now))
244 {
245 struct utimbuf Buf;
246 Buf.actime = Buf.modtime = Now;
247 utime(I->Output.c_str(),&Buf);
248 Changed = true;
249 }
250 }
251
252 // Force the file permissions
253 if (St.st_mode != Permissions)
254 chmod(I->Output.c_str(),Permissions);
255
256 OutSize += St.st_size;
257 }
258
259 if (Changed == false)
260 OutSize = 0;
261
262 return true;
263}
264 /*}}}*/
b2e465d6
AL
265// MultiCompress::OpenOld - Open an old file /*{{{*/
266// ---------------------------------------------------------------------
267/* This opens one of the original output files, possibly decompressing it. */
3826564e 268bool MultiCompress::OpenOld(int &Fd,pid_t &Proc)
b2e465d6
AL
269{
270 Files *Best = Outputs;
271 for (Files *I = Outputs; I != 0; I = I->Next)
03bef784 272 if (Best->CompressProg.Cost > I->CompressProg.Cost)
b2e465d6
AL
273 Best = I;
274
275 // Open the file
276 FileFd F(Best->Output,FileFd::ReadOnly);
277 if (_error->PendingError() == true)
278 return false;
279
280 // Decompress the file so we can read it
699b209e 281 if (ExecCompressor(Best->CompressProg,&Proc,F.Fd(),Fd,false) == false)
b2e465d6
AL
282 return false;
283
284 return true;
285}
286 /*}}}*/
287// MultiCompress::CloseOld - Close the old file /*{{{*/
288// ---------------------------------------------------------------------
289/* */
3826564e 290bool MultiCompress::CloseOld(int Fd,pid_t Proc)
b2e465d6
AL
291{
292 close(Fd);
293 if (Proc != -1)
dc738e7a 294 if (ExecWait(Proc,_("decompressor"),false) == false)
b2e465d6
AL
295 return false;
296 return true;
297}
298 /*}}}*/
299// MultiCompress::Child - The writer child /*{{{*/
300// ---------------------------------------------------------------------
301/* The child process forks a bunch of compression children and takes
c6474fb6 302 input on FD and passes it to all the compressor child. On the way it
b2e465d6
AL
303 computes the MD5 of the raw data. After this the raw data in the
304 original files is compared to see if this data is new. If the data
305 is new then the temp files are renamed, otherwise they are erased. */
9209ec47 306bool MultiCompress::Child(int const &FD)
b2e465d6
AL
307{
308 // Start the compression children.
309 for (Files *I = Outputs; I != 0; I = I->Next)
310 {
699b209e
DK
311 if (ExecCompressor(I->CompressProg,&(I->CompressProc),I->TmpFile.Fd(),
312 I->Fd,true) == false)
b2e465d6
AL
313 return false;
314 }
315
316 /* Okay, now we just feed data from FD to all the other FDs. Also
317 stash a hash of the data to use later. */
318 SetNonBlock(FD,false);
319 unsigned char Buffer[32*1024];
650faab0 320 unsigned long long FileSize = 0;
b2e465d6
AL
321 MD5Summation MD5;
322 while (1)
323 {
324 WaitFd(FD,false);
325 int Res = read(FD,Buffer,sizeof(Buffer));
326 if (Res == 0)
327 break;
328 if (Res < 0)
329 continue;
330
331 MD5.Add(Buffer,Res);
332 FileSize += Res;
333 for (Files *I = Outputs; I != 0; I = I->Next)
334 {
335 if (write(I->Fd,Buffer,Res) != Res)
336 {
dc738e7a 337 _error->Errno("write",_("IO to subprocess/file failed"));
b2e465d6
AL
338 break;
339 }
340 }
341 }
342
343 // Close all the writers
344 for (Files *I = Outputs; I != 0; I = I->Next)
345 close(I->Fd);
346
347 // Wait for the compressors to exit
348 for (Files *I = Outputs; I != 0; I = I->Next)
349 {
350 if (I->CompressProc != -1)
03bef784 351 ExecWait(I->CompressProc, I->CompressProg.Binary.c_str(), false);
b2e465d6
AL
352 }
353
354 if (_error->PendingError() == true)
355 return false;
356
357 /* Now we have to copy the files over, or erase them if they
358 have not changed. First find the cheapest decompressor */
359 bool Missing = false;
360 for (Files *I = Outputs; I != 0; I = I->Next)
361 {
362 if (I->OldMTime == 0)
363 {
364 Missing = true;
365 break;
366 }
367 }
368
369 // Check the MD5 of the lowest cost entity.
370 while (Missing == false)
371 {
372 int CompFd = -1;
3826564e 373 pid_t Proc = -1;
b2e465d6
AL
374 if (OpenOld(CompFd,Proc) == false)
375 {
376 _error->Discard();
377 break;
378 }
379
380 // Compute the hash
381 MD5Summation OldMD5;
650faab0 382 unsigned long long NewFileSize = 0;
b2e465d6
AL
383 while (1)
384 {
385 int Res = read(CompFd,Buffer,sizeof(Buffer));
386 if (Res == 0)
387 break;
388 if (Res < 0)
dc738e7a 389 return _error->Errno("read",_("Failed to read while computing MD5"));
b2e465d6
AL
390 NewFileSize += Res;
391 OldMD5.Add(Buffer,Res);
392 }
393
394 // Tidy the compressor
395 if (CloseOld(CompFd,Proc) == false)
396 return false;
397
398 // Check the hash
399 if (OldMD5.Result() == MD5.Result() &&
400 FileSize == NewFileSize)
401 {
402 for (Files *I = Outputs; I != 0; I = I->Next)
403 {
404 I->TmpFile.Close();
405 if (unlink(I->TmpFile.Name().c_str()) != 0)
dc738e7a 406 _error->Errno("unlink",_("Problem unlinking %s"),
b2e465d6
AL
407 I->TmpFile.Name().c_str());
408 }
409 return !_error->PendingError();
410 }
411 break;
412 }
413
414 // Finalize
415 for (Files *I = Outputs; I != 0; I = I->Next)
416 {
417 // Set the correct file modes
418 fchmod(I->TmpFile.Fd(),Permissions);
419
420 if (rename(I->TmpFile.Name().c_str(),I->Output.c_str()) != 0)
dc738e7a 421 _error->Errno("rename",_("Failed to rename %s to %s"),
b2e465d6
AL
422 I->TmpFile.Name().c_str(),I->Output.c_str());
423 I->TmpFile.Close();
424 }
425
426 return !_error->PendingError();
427}
428 /*}}}*/
429