More or less working acquire system
authorArch Librarian <arch@canonical.com>
Mon, 20 Sep 2004 16:51:22 +0000 (16:51 +0000)
committerArch Librarian <arch@canonical.com>
Mon, 20 Sep 2004 16:51:22 +0000 (16:51 +0000)
Author: jgg
Date: 1998-11-09 01:09:19 GMT
More or less working acquire system

apt-pkg/acquire-item.cc
apt-pkg/acquire-item.h
apt-pkg/acquire-method.cc
apt-pkg/acquire-worker.cc
apt-pkg/acquire-worker.h
apt-pkg/acquire.cc
apt-pkg/acquire.h

index 9f9d082..ce8c9d4 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire-item.cc,v 1.7 1998/11/05 07:21:35 jgg Exp $
+// $Id: acquire-item.cc,v 1.8 1998/11/09 01:09:19 jgg Exp $
 /* ######################################################################
 
    Acquire Item - Item to acquire
@@ -30,7 +30,8 @@
 // Acquire::Item::Item - Constructor                                   /*{{{*/
 // ---------------------------------------------------------------------
 /* */
-pkgAcquire::Item::Item(pkgAcquire *Owner) : Owner(Owner), QueueCounter(0)
+pkgAcquire::Item::Item(pkgAcquire *Owner) : Owner(Owner), FileSize(0),
+                       Complete(false), QueueCounter(0)
 {
    Owner->Add(this);
    Status = StatIdle;
@@ -59,6 +60,16 @@ void pkgAcquire::Item::Failed(string Message)
    }   
 }
                                                                        /*}}}*/
+// Acquire::Item::Start - Item has begun to download                   /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Item::Start(string Message,unsigned long Size)
+{
+   Status = StatFetching;
+   if (FileSize == 0 && Complete == false)
+      FileSize = Size;
+}
+                                                                       /*}}}*/
 // Acquire::Item::Done - Item downloaded OK                            /*{{{*/
 // ---------------------------------------------------------------------
 /* */
@@ -98,8 +109,19 @@ pkgAcqIndex::pkgAcqIndex(pkgAcquire *Owner,const pkgSourceList::Item *Location)
    
    DestFile = _config->FindDir("Dir::State::lists") + "partial/";
    DestFile += URItoFileName(Location->PackagesURI());
-   
-   QueueURI(Location->PackagesURI() + ".gz",Location->PackagesInfo());
+
+   // Create the item 
+   Desc.URI = Location->PackagesURI() + ".gz";
+   Desc.Description = Location->PackagesInfo();
+   Desc.Owner = this;
+
+   // Set the short description to the archive component
+   if (Location->Dist[Location->Dist.size() - 1] == '/')
+      Desc.ShortDesc = Location->Dist;
+   else
+      Desc.ShortDesc = Location->Dist + '/' + Location->Section;  
+      
+   QueueURI(Desc);
    
    // Create the Release fetch class
    new pkgAcqIndexRel(Owner,Location);
@@ -149,6 +171,7 @@ void pkgAcqIndex::Done(string Message,unsigned long Size,string MD5)
    }
 
    Erase = false;
+   Complete = true;
    
    // Handle the unzipd case
    string FileName = LookupTag(Message,"Alt-Filename");
@@ -159,8 +182,10 @@ void pkgAcqIndex::Done(string Message,unsigned long Size,string MD5)
         return;
       
       Decompression = true;
+      FileSize = 0;
       DestFile += ".decomp";
-      QueueURI("copy:" + FileName,string());
+      Desc.URI = "copy:" + FileName;
+      QueueURI(Desc);
       return;
    }
 
@@ -177,10 +202,13 @@ void pkgAcqIndex::Done(string Message,unsigned long Size,string MD5)
 
    if (FileName == DestFile)
       Erase = true;
+   else
+      FileSize = 0;
    
    Decompression = true;
    DestFile += ".decomp";
-   QueueURI("gzip:" + FileName,string());
+   Desc.URI = "gzip:" + FileName,Location->PackagesInfo();
+   QueueURI(Desc);
 }
                                                                        /*}}}*/
 
@@ -194,7 +222,18 @@ pkgAcqIndexRel::pkgAcqIndexRel(pkgAcquire *Owner,
    DestFile = _config->FindDir("Dir::State::lists") + "partial/";
    DestFile += URItoFileName(Location->ReleaseURI());
    
-   QueueURI(Location->ReleaseURI(),Location->ReleaseInfo());
+   // Create the item
+   Desc.URI = Location->ReleaseURI();
+   Desc.Description = Location->ReleaseInfo();
+   Desc.Owner = this;
+
+   // Set the short description to the archive component
+   if (Location->Dist[Location->Dist.size() - 1] == '/')
+      Desc.ShortDesc = Location->Dist;
+   else
+      Desc.ShortDesc = Location->Dist + '/' + Location->Section;  
+      
+   QueueURI(Desc);
 }
                                                                        /*}}}*/
 // AcqIndexRel::Custom600Headers - Insert custom request headers       /*{{{*/
@@ -229,6 +268,8 @@ void pkgAcqIndexRel::Done(string Message,unsigned long Size,string MD5)
       return;
    }
 
+   Complete = true;
+   
    // The files timestamp matches
    if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true)
       return;
@@ -236,7 +277,9 @@ void pkgAcqIndexRel::Done(string Message,unsigned long Size,string MD5)
    // We have to copy it into place
    if (FileName != DestFile)
    {
-      QueueURI("copy:" + FileName,string());
+      FileSize = 0;
+      Desc.URI = "copy:" + FileName;
+      QueueURI(Desc);
       return;
    }
    
index 9ead29c..d50efc0 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire-item.h,v 1.5 1998/11/05 07:21:36 jgg Exp $
+// $Id: acquire-item.h,v 1.6 1998/11/09 01:09:21 jgg Exp $
 /* ######################################################################
 
    Acquire Item - Item to acquire
@@ -30,8 +30,8 @@ class pkgAcquire::Item
    protected:
    
    pkgAcquire *Owner;
-   inline void QueueURI(string URI,string Description) 
-                 {Owner->Enqueue(this,URI,Description);};
+   inline void QueueURI(ItemDesc &Item)
+                 {Owner->Enqueue(Item);};
    
    void Rename(string From,string To);
    
@@ -40,6 +40,8 @@ class pkgAcquire::Item
    // State of the item
    enum {StatIdle, StatFetching, StatDone, StatError} Status;
    string ErrorText;
+   unsigned long FileSize;
+   bool Complete;
    
    // Number of queues we are inserted into
    unsigned int QueueCounter;
@@ -49,6 +51,7 @@ class pkgAcquire::Item
    
    virtual void Failed(string Message);
    virtual void Done(string Message,unsigned long Size,string Md5Hash);
+   virtual void Start(string Message,unsigned long Size);
 
    virtual string Custom600Headers() {return string();};
       
@@ -64,6 +67,7 @@ class pkgAcqIndex : public pkgAcquire::Item
    const pkgSourceList::Item *Location;
    bool Decompression;
    bool Erase;
+   pkgAcquire::ItemDesc Desc;
    
    public:
    
@@ -79,6 +83,7 @@ class pkgAcqIndexRel : public pkgAcquire::Item
    protected:
    
    const pkgSourceList::Item *Location;
+   pkgAcquire::ItemDesc Desc;
    
    public:
    
index 6810159..f1a2477 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire-method.cc,v 1.4 1998/11/05 07:21:38 jgg Exp $
+// $Id: acquire-method.cc,v 1.5 1998/11/09 01:09:22 jgg Exp $
 /* ######################################################################
 
    Acquire Method
@@ -314,7 +314,7 @@ void pkgAcqMethod::Status(const char *Format,...)
 
    // sprintf the description
    char S[1024];
-   unsigned int Len = snprintf(S,sizeof(S),"101 Status\nURI: %s\n"
+   unsigned int Len = snprintf(S,sizeof(S),"102 Status\nURI: %s\n"
                               "Message: ",CurrentURI.c_str());
 
    vsnprintf(S+Len,sizeof(S)-Len,Format,args);
index 2cbab77..718944d 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire-worker.cc,v 1.10 1998/11/05 07:21:39 jgg Exp $
+// $Id: acquire-worker.cc,v 1.11 1998/11/09 01:09:23 jgg Exp $
 /* ######################################################################
 
    Acquire Worker 
@@ -22,6 +22,7 @@
 #include <apt-pkg/fileutl.h>
 #include <strutl.h>
 
+#include <sys/stat.h>
 #include <unistd.h>
 #include <signal.h>
 #include <wait.h>
@@ -30,7 +31,8 @@
 // Worker::Worker - Constructor for Queue startup                      /*{{{*/
 // ---------------------------------------------------------------------
 /* */
-pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf)
+pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
+                          pkgAcquireStatus *Log) : Log(Log)
 {
    OwnerQ = Q;
    Config = Cnf;
@@ -221,10 +223,15 @@ bool pkgAcquire::Worker::RunMessages()
               _error->Error("Method gave invalid 200 URI Start message");
               break;
            }
+           
            CurrentItem = Itm;
            CurrentSize = 0;
            TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
+           Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
            
+           if (Log != 0)
+              Log->Fetch(*Itm);
+
            break;
         }
         
@@ -238,9 +245,21 @@ bool pkgAcquire::Worker::RunMessages()
            }
 
            pkgAcquire::Item *Owner = Itm->Owner;
+           pkgAcquire::ItemDesc Desc = *Itm;
            OwnerQ->ItemDone(Itm);
            Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
                                          LookupTag(Message,"MD5-Hash"));
+           ItemDone();
+           
+           // Log that we are done
+           if (Log != 0)
+           {
+              if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
+                  StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
+                 Log->IMSHit(Desc);
+              else
+                 Log->Done(Desc);
+           }       
            break;
         }       
         
@@ -254,8 +273,14 @@ bool pkgAcquire::Worker::RunMessages()
            }
 
            pkgAcquire::Item *Owner = Itm->Owner;
+           pkgAcquire::ItemDesc Desc = *Itm;
            OwnerQ->ItemDone(Itm);
            Owner->Failed(Message);
+           ItemDone();
+           
+           if (Log != 0)
+              Log->Fail(Desc);
+           
            break;
         }       
         
@@ -419,3 +444,29 @@ bool pkgAcquire::Worker::MethodFailure()
    return false;
 }
                                                                        /*}}}*/
+// Worker::Pulse - Called periodically                                         /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Worker::Pulse()
+{
+   if (CurrentItem == 0)
+      return;
+
+   
+   struct stat Buf;
+   if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
+      return;
+   CurrentSize = Buf.st_size;
+}
+                                                                       /*}}}*/
+// Worker::ItemDone - Called when the current item is finished         /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Worker::ItemDone()
+{
+   CurrentItem = 0;
+   CurrentSize = 0;
+   TotalSize = 0;
+   Status = string();
+}
+                                                                       /*}}}*/
index ad1ea91..95ba340 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire-worker.h,v 1.6 1998/10/30 07:53:36 jgg Exp $
+// $Id: acquire-worker.h,v 1.7 1998/11/09 01:09:24 jgg Exp $
 /* ######################################################################
 
    Acquire Worker - Worker process manager
@@ -33,6 +33,7 @@ class pkgAcquire::Worker
    
    // The access association
    Queue *OwnerQ;
+   pkgAcquireStatus *Log;
    MethodConfig *Config;
    string Access;
       
@@ -62,6 +63,7 @@ class pkgAcquire::Worker
    bool SendConfiguration();
 
    bool MethodFailure();
+   void ItemDone();
    
    public:
    
@@ -70,12 +72,13 @@ class pkgAcquire::Worker
    string Status;
    unsigned long CurrentSize;
    unsigned long TotalSize;
-
+      
    // Load the method and do the startup 
    bool QueueItem(pkgAcquire::Queue::QItem *Item);
-   bool Start();   
+   bool Start();
+   void Pulse();
    
-   Worker(Queue *OwnerQ,MethodConfig *Config);
+   Worker(Queue *OwnerQ,MethodConfig *Config,pkgAcquireStatus *Log);
    Worker(MethodConfig *Config);
    ~Worker();
 };
index ed64c5e..e5972d5 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire.cc,v 1.9 1998/11/06 02:52:20 jgg Exp $
+// $Id: acquire.cc,v 1.10 1998/11/09 01:09:25 jgg Exp $
 /* ######################################################################
 
    Acquire - File Acquiration
 #include <apt-pkg/configuration.h>
 #include <apt-pkg/error.h>
 #include <strutl.h>
+
+#include <sys/time.h>
                                                                        /*}}}*/
 
 // Acquire::pkgAcquire - Constructor                                   /*{{{*/
 // ---------------------------------------------------------------------
 /* We grab some runtime state from the configuration space */
-pkgAcquire::pkgAcquire()
+pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
 {
    Queues = 0;
    Configs = 0;
@@ -85,7 +87,7 @@ void pkgAcquire::Remove(Item *Itm)
    {
       if (*I == Itm)
         Items.erase(I);
-   }   
+   }
 }
                                                                        /*}}}*/
 // Acquire::Add - Add a worker                                         /*{{{*/
@@ -124,10 +126,10 @@ void pkgAcquire::Remove(Worker *Work)
    it is construction which creates a queue (based on the current queue
    mode) and puts the item in that queue. If the system is running then
    the queue might be started. */
-void pkgAcquire::Enqueue(Item *Itm,string URI,string Description)
+void pkgAcquire::Enqueue(ItemDesc &Item)
 {
    // Determine which queue to put the item in
-   string Name = QueueName(URI);
+   string Name = QueueName(Item.URI);
    if (Name.empty() == true)
       return;
 
@@ -144,18 +146,18 @@ void pkgAcquire::Enqueue(Item *Itm,string URI,string Description)
         I->Startup();
    }
 
-   Itm->Status = Item::StatIdle;
+   Item.Owner->Status = Item::StatIdle;
    
    // Queue it into the named queue
-   I->Enqueue(Itm,URI,Description);
+   I->Enqueue(Item);
    ToFetch++;
          
    // Some trace stuff
    if (Debug == true)
    {
-      clog << "Fetching " << URI << endl;
-      clog << " to " << Itm->DestFile << endl;
-      clog << " Queue is: " << QueueName(URI) << endl;
+      clog << "Fetching " << Item.URI << endl;
+      clog << " to " << Item.Owner->DestFile << endl;
+      clog << " Queue is: " << QueueName(Item.URI) << endl;
    }
 }
                                                                        /*}}}*/
@@ -275,6 +277,9 @@ bool pkgAcquire::Run()
       I->Startup();
    
    // Run till all things have been acquired
+   struct timeval tv;
+   tv.tv_sec = 0;
+   tv.tv_usec = 500000; 
    while (ToFetch > 0)
    {
       fd_set RFds;
@@ -284,15 +289,26 @@ bool pkgAcquire::Run()
       FD_ZERO(&WFds);
       SetFds(Highest,&RFds,&WFds);
       
-      if (select(Highest+1,&RFds,&WFds,0,0) <= 0)
+      int Res = select(Highest+1,&RFds,&WFds,0,&tv);
+      if (Res < 0)
       {
-        Running = false;
-        return _error->Errno("select","Select has failed");
+        _error->Errno("select","Select has failed");
+        break;
       }
             
       RunFds(&RFds,&WFds);
       if (_error->PendingError() == true)
         break;
+      
+      // Timeout, notify the log class
+      if (Res == 0 || (Log != 0 && Log->Update == true))
+      {
+        tv.tv_usec = 500000;
+        for (Worker *I = Workers; I != 0; I = I->NextAcquire)
+           I->Pulse();
+        if (Log != 0)
+           Log->Pulse(this);
+      }      
    }   
 
    // Shut down the acquire bits
@@ -313,6 +329,14 @@ void pkgAcquire::Bump()
       I->Bump();
 }
                                                                        /*}}}*/
+// Acquire::WorkerStep - Step to the next worker                       /*{{{*/
+// ---------------------------------------------------------------------
+/* Not inlined to advoid including acquire-worker.h */
+pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
+{
+   return I->NextAcquire;
+};
+                                                                       /*}}}*/
 
 // Acquire::MethodConfig::MethodConfig - Constructor                   /*{{{*/
 // ---------------------------------------------------------------------
@@ -356,19 +380,15 @@ pkgAcquire::Queue::~Queue()
 // Queue::Enqueue - Queue an item to the queue                         /*{{{*/
 // ---------------------------------------------------------------------
 /* */
-void pkgAcquire::Queue::Enqueue(Item *Owner,string URI,string Description)
+void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
 {
    // Create a new item
-   QItem *I = new QItem;   
+   QItem *I = new QItem;
    I->Next = Items;
    Items = I;
+   *I = Item;
    
-   // Fill it in
-   Items->Owner = Owner;
-   Items->URI = URI;
-   Items->Description = Description;
-   Owner->QueueCounter++;
-   
+   Item.Owner->QueueCounter++;   
    if (Items->Next == 0)
       Cycle();
 }
@@ -410,7 +430,7 @@ bool pkgAcquire::Queue::Startup()
    if (Cnf == 0)
       return false;
    
-   Workers = new Worker(this,Cnf);
+   Workers = new Worker(this,Cnf,Owner->Log);
    Owner->Add(Workers);
    if (Workers->Start() == false)
       return false;
index a4ea45b..0d8803c 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire.h,v 1.8 1998/11/05 07:21:41 jgg Exp $
+// $Id: acquire.h,v 1.9 1998/11/09 01:09:26 jgg Exp $
 /* ######################################################################
 
    Acquire - File Acquiration
@@ -41,6 +41,7 @@
 
 #include <unistd.h>
 
+class pkgAcquireStatus;
 class pkgAcquire
 {   
    public:
@@ -49,6 +50,7 @@ class pkgAcquire
    class Queue;
    class Worker;
    struct MethodConfig;
+   struct ItemDesc;
    friend Item;
    friend Queue;
    
@@ -61,8 +63,9 @@ class pkgAcquire
    Queue *Queues;
    Worker *Workers;
    MethodConfig *Configs;
+   pkgAcquireStatus *Log;
    unsigned long ToFetch;
-   
+
    // Configurable parameters for the schedular
    enum {QueueHost,QueueAccess} QueueMode;
    bool Debug;
@@ -73,7 +76,7 @@ class pkgAcquire
    void Add(Worker *Work);
    void Remove(Worker *Work);
    
-   void Enqueue(Item *Item,string URI,string Description);
+   void Enqueue(ItemDesc &Item);
    void Dequeue(Item *Item);
    string QueueName(string URI);
 
@@ -88,11 +91,26 @@ class pkgAcquire
 
    MethodConfig *GetConfig(string Access);
    bool Run();
+
+   // Simple iteration mechanism
+   inline Worker *WorkersBegin() {return Workers;};
+   Worker *WorkerStep(Worker *I);
+   inline Item **ItemsBegin() {return Items.begin();};
+   inline Item **ItemsEnd() {return Items.end();};
    
-   pkgAcquire();
+   pkgAcquire(pkgAcquireStatus *Log = 0);
    ~pkgAcquire();
 };
 
+// Description of an Item+URI
+struct pkgAcquire::ItemDesc
+{
+   string URI;
+   string Description;
+   string ShortDesc;
+   Item *Owner;
+};
+
 // List of possible items queued for download.
 class pkgAcquire::Queue
 {
@@ -102,15 +120,19 @@ class pkgAcquire::Queue
    protected:
 
    // Queued item
-   struct QItem 
+   struct QItem : pkgAcquire::ItemDesc
    {
-      QItem *Next;
-      
-      string URI;
-      string Description;
-      Item *Owner;
+      QItem *Next;      
       pkgAcquire::Worker *Worker;
-   };   
+      
+      void operator =(pkgAcquire::ItemDesc const &I)
+      {
+        URI = I.URI;
+        Description = I.Description;
+        ShortDesc = I.ShortDesc;
+        Owner = I.Owner;
+      };
+   };
    
    // Name of the queue
    string Name;
@@ -123,11 +145,12 @@ class pkgAcquire::Queue
    public:
    
    // Put an item into this queue
-   void Enqueue(Item *Owner,string URI,string Description);
+   void Enqueue(ItemDesc &Item);
    bool Dequeue(Item *Owner);
 
    // Find a Queued item
    QItem *FindItem(string URI,pkgAcquire::Worker *Owner);
+   bool ItemStart(QItem *Itm,unsigned long Size);
    bool ItemDone(QItem *Itm);
    
    bool Startup();
@@ -155,4 +178,21 @@ struct pkgAcquire::MethodConfig
    MethodConfig();
 };
 
+class pkgAcquireStatus
+{
+   public:
+
+   bool Update;
+   
+   // Each of these is called by the workers when an event occures
+   virtual void IMSHit(pkgAcquire::ItemDesc &Itm) {};
+   virtual void Fetch(pkgAcquire::ItemDesc &Itm) {};
+   virtual void Done(pkgAcquire::ItemDesc &Itm) {};
+   virtual void Fail(pkgAcquire::ItemDesc &Itm) {};   
+   virtual void Pulse(pkgAcquire *Owner) {};
+   
+   pkgAcquireStatus() : Update(false) {};
+   virtual ~pkgAcquireStatus() {};
+};
+
 #endif