Sync
authorArch Librarian <arch@canonical.com>
Mon, 20 Sep 2004 16:51:09 +0000 (16:51 +0000)
committerArch Librarian <arch@canonical.com>
Mon, 20 Sep 2004 16:51:09 +0000 (16:51 +0000)
Author: jgg
Date: 1998-10-22 04:56:38 GMT
Sync

14 files changed:
apt-pkg/acquire-item.cc
apt-pkg/acquire-item.h
apt-pkg/acquire-worker.cc
apt-pkg/acquire-worker.h
apt-pkg/acquire.cc
apt-pkg/acquire.h
apt-pkg/contrib/configuration.cc
apt-pkg/contrib/configuration.h
apt-pkg/contrib/fileutl.cc
apt-pkg/contrib/strutl.cc
apt-pkg/contrib/strutl.h
doc/examples/apt.conf
methods/file.cc
test/scratch.cc

index e92b611..e1049dd 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire-item.cc,v 1.2 1998/10/20 02:39:12 jgg Exp $
+// $Id: acquire-item.cc,v 1.3 1998/10/22 04:56:38 jgg Exp $
 /* ######################################################################
 
    Acquire Item - Item to acquire
@@ -19,6 +19,9 @@
 #include <apt-pkg/acquire-item.h>
 #include <apt-pkg/configuration.h>
 #include <strutl.h>
+
+#include <sys/stat.h>
+#include <unistd.h>
                                                                        /*}}}*/
 
 // Acquire::Item::Item - Constructor                                   /*{{{*/
@@ -45,23 +48,30 @@ pkgAcquire::Item::~Item()
 pkgAcqIndex::pkgAcqIndex(pkgAcquire *Owner,const pkgSourceList::Item *Location) :
              Item(Owner), Location(Location)
 {
-   QueueURI(Location->PackagesURI() + ".gz");
-   Description = Location->PackagesInfo();
+   DestFile = _config->FindDir("Dir::State::lists") + "partial/";
+   DestFile += URItoFileName(Location->PackagesURI());
+   
+   QueueURI(Location->PackagesURI() + ".gz",Location->PackagesInfo());
    
+   // Create the Release fetch class
    new pkgAcqIndexRel(Owner,Location);
 }
                                                                        /*}}}*/
-// pkgAcqIndex::ToFile - File to write the download to                 /*{{{*/
+// AcqIndex::Custom600Headers - Insert custom request headers          /*{{{*/
 // ---------------------------------------------------------------------
-/* */
-string pkgAcqIndex::ToFile()
+/* The only header we use is the last-modified header. */
+string pkgAcqIndex::Custom600Headers()
 {
-   string PartialDir = _config->FindFile("Dir::State::lists") + "/partial/";
+   string Final = _config->FindDir("Dir::State::lists");
+   Final += URItoFileName(Location->PackagesURI());
+   
+   struct stat Buf;
+   if (stat(Final.c_str(),&Buf) != 0)
+      return string();
    
-   return PartialDir + URItoFileName(Location->PackagesURI());
+   return "\nLast-Modified: " + TimeRFC1123(Buf.st_mtime);
 }
                                                                        /*}}}*/
-
 // AcqIndexRel::pkgAcqIndexRel - Constructor                           /*{{{*/
 // ---------------------------------------------------------------------
 /* The Release file is added to the queue */
@@ -69,17 +79,24 @@ pkgAcqIndexRel::pkgAcqIndexRel(pkgAcquire *Owner,
                               const pkgSourceList::Item *Location) :
                 Item(Owner), Location(Location)
 {
-   QueueURI(Location->ReleaseURI());
-   Description = Location->ReleaseInfo();
+   DestFile = _config->FindDir("Dir::State::lists") + "partial/";
+   DestFile += URItoFileName(Location->ReleaseURI());
+   
+   QueueURI(Location->ReleaseURI(),Location->ReleaseInfo());
 }
                                                                        /*}}}*/
-// AcqIndexRel::ToFile - File to write the download to                 /*{{{*/
+// AcqIndexRel::Custom600Headers - Insert custom request headers       /*{{{*/
 // ---------------------------------------------------------------------
-/* */
-string pkgAcqIndexRel::ToFile()
+/* The only header we use is the last-modified header. */
+string pkgAcqIndexRel::Custom600Headers()
 {
-   string PartialDir = _config->FindFile("Dir::State::lists") + "/partial/";
+   string Final = _config->FindDir("Dir::State::lists");
+   Final += URItoFileName(Location->ReleaseURI());
+   
+   struct stat Buf;
+   if (stat(Final.c_str(),&Buf) != 0)
+      return string();
    
-   return PartialDir + URItoFileName(Location->ReleaseURI());
+   return "\nLast-Modified: " + TimeRFC1123(Buf.st_mtime);
 }
                                                                        /*}}}*/
index 6ab8859..8b2d6e9 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire-item.h,v 1.1 1998/10/15 06:59:59 jgg Exp $
+// $Id: acquire-item.h,v 1.2 1998/10/22 04:56:39 jgg Exp $
 /* ######################################################################
 
    Acquire Item - Item to acquire
@@ -30,16 +30,20 @@ class pkgAcquire::Item
    protected:
    
    pkgAcquire *Owner;
-   inline void QueueURI(string URI) {Owner->Enqueue(this,URI);};
+   inline void QueueURI(string URI,string Description) 
+                 {Owner->Enqueue(this,URI,Description);};
    
    public:
 
+   // Number of queues we are inserted into
    unsigned int QueueCounter;
-   string Description;
    
-   virtual string ToFile() = 0;
-   virtual void Failed() {};
+   // File to write the fetch into
+   string DestFile;
    
+   virtual void Failed() {};
+   virtual string Custom600Headers() {return string();};
+      
    Item(pkgAcquire *Owner);
    virtual ~Item();
 };
@@ -53,7 +57,7 @@ class pkgAcqIndex : public pkgAcquire::Item
    
    public:
    
-   virtual string ToFile();
+   virtual string Custom600Headers();
 
    pkgAcqIndex(pkgAcquire *Owner,const pkgSourceList::Item *Location);
 };
@@ -67,10 +71,9 @@ class pkgAcqIndexRel : public pkgAcquire::Item
    
    public:
    
-   virtual string ToFile();
-
+   virtual string Custom600Headers();
+   
    pkgAcqIndexRel(pkgAcquire *Owner,const pkgSourceList::Item *Location);
 };
 
-
 #endif
index 688c5e2..756b309 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire-worker.cc,v 1.3 1998/10/20 04:33:12 jgg Exp $
+// $Id: acquire-worker.cc,v 1.4 1998/10/22 04:56:40 jgg Exp $
 /* ######################################################################
 
    Acquire Worker 
@@ -16,6 +16,7 @@
 #pragma implementation "apt-pkg/acquire-worker.h"
 #endif
 #include <apt-pkg/acquire-worker.h>
+#include <apt-pkg/acquire-item.h>
 #include <apt-pkg/configuration.h>
 #include <apt-pkg/error.h>
 #include <apt-pkg/fileutl.h>
 // Worker::Worker - Constructor for Queue startup                      /*{{{*/
 // ---------------------------------------------------------------------
 /* */
-pkgAcquire::Worker::Worker(Queue *Q,string Acc)
+pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf)
 {
    OwnerQ = Q;
-   Config = 0;
-   Access = Acc;
+   Config = Cnf;
+   Access = Cnf->Access;
+   CurrentItem = 0;
 
    Construct();   
 }
@@ -46,7 +48,8 @@ pkgAcquire::Worker::Worker(MethodConfig *Cnf)
    OwnerQ = 0;
    Config = Cnf;
    Access = Cnf->Access;
-
+   CurrentItem = 0;
+   
    Construct();   
 }
                                                                        /*}}}*/
@@ -55,10 +58,13 @@ pkgAcquire::Worker::Worker(MethodConfig *Cnf)
 /* */
 void pkgAcquire::Worker::Construct()
 {
-   Next = 0;
+   NextQueue = 0;
+   NextAcquire = 0;
    Process = -1;
    InFd = -1;
    OutFd = -1;
+   OutReady = false;
+   InReady = false;
    Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
 }
                                                                        /*}}}*/
@@ -71,7 +77,11 @@ pkgAcquire::Worker::~Worker()
    close(OutFd);
    
    if (Process > 0)
+   {
       kill(Process,SIGINT);
+      if (waitpid(Process,0,0) != Process)
+        _error->Warning("I waited but nothing was there!");
+   }   
 }
                                                                        /*}}}*/
 // Worker::Start - Start the worker process                            /*{{{*/
@@ -133,6 +143,8 @@ bool pkgAcquire::Worker::Start()
    SetNonBlock(Pipes[3],true);
    close(Pipes[1]);
    close(Pipes[2]);
+   OutReady = false;
+   InReady = true;
    
    // Read the configuration data
    if (WaitFd(InFd) == false ||
@@ -140,70 +152,18 @@ bool pkgAcquire::Worker::Start()
       return _error->Error("Method %s did not start correctly",Method.c_str());
 
    RunMessages();
+   SendConfiguration();
    
    return true;
 }
                                                                        /*}}}*/
 // Worker::ReadMessages - Read all pending messages into the list      /*{{{*/
 // ---------------------------------------------------------------------
-/* This pulls full messages from the input FD into the message buffer. 
-   It assumes that messages will not pause during transit so no
-   fancy buffering is used. */
+/* */
 bool pkgAcquire::Worker::ReadMessages()
 {
-   char Buffer[4000];
-   char *End = Buffer;
-   
-   while (1)
-   {
-      int Res = read(InFd,End,sizeof(Buffer) - (End-Buffer));
-      
-      // Process is dead, this is kind of bad..
-      if (Res == 0)
-      {
-        if (waitpid(Process,0,0) != Process)
-           _error->Warning("I waited but nothing was there!");
-        Process = -1;
-        close(InFd);
-        close(OutFd);
-        InFd = -1;
-        OutFd = -1;
-        return false;
-      }
-      
-      // No data
-      if (Res == -1)
-        return true;
-      
-      End += Res;
-      
-      // Look for the end of the message
-      for (char *I = Buffer; I < End; I++)
-      {
-        if (I[0] != '\n' || I[1] != '\n')
-           continue;
-        
-        // Pull the message out
-        string Message(Buffer,0,I-Buffer);
-
-        // Fix up the buffer
-        for (; I < End && *I == '\n'; I++);
-        End -= I-Buffer;        
-        memmove(Buffer,I,End-Buffer);
-        I = Buffer;
-
-        if (Debug == true)
-           clog << "Message " << Access << ':' << QuoteString(Message,"\n") << endl;
-        
-        MessageQueue.push_back(Message);
-      }
-      if (End == Buffer)
-        return true;
-
-      if (WaitFd(InFd) == false)
-        return false;
-   }
-   
+   if (::ReadMessages(InFd,MessageQueue) == false)
+      return MethodFailure();
    return true;
 }
                                                                        /*}}}*/
@@ -218,6 +178,9 @@ bool pkgAcquire::Worker::RunMessages()
    {
       string Message = MessageQueue.front();
       MessageQueue.erase(MessageQueue.begin());
+
+      if (Debug == true)
+        clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
       
       // Fetch the message number
       char *End;
@@ -228,10 +191,39 @@ bool pkgAcquire::Worker::RunMessages()
       // Determine the message number and dispatch
       switch (Number)
       {
+        // 100 Capabilities
         case 100:
         if (Capabilities(Message) == false)
            return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
         break;
+        
+        // 101 Log
+        case 101:
+        if (Debug == true)
+           clog << " <- (log) " << LookupTag(Message,"Message") << endl;
+        break;
+        
+        // 102 Status
+        case 102:
+        Status = LookupTag(Message,"Message");
+        break;
+           
+        // 200 URI Start
+        case 200:
+        break;
+        
+        // 201 URI Done
+        case 201:
+        break;
+        
+        // 400 URI Failure
+        case 400:
+        break;
+        
+        // 401 General Failure
+        case 401:
+        _error->Error("Method %s General failure: %s",LookupTag(Message,"Message").c_str());
+        break;
       }      
    }
    return true;
@@ -249,15 +241,142 @@ bool pkgAcquire::Worker::Capabilities(string Message)
    Config->Version = LookupTag(Message,"Version");
    Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
    Config->PreScan = StringToBool(LookupTag(Message,"Pre-Scan"),false);
+   Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
+   Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
 
    // Some debug text
    if (Debug == true)
    {
       clog << "Configured access method " << Config->Access << endl;
-      clog << "Version: " << Config->Version << " SingleInstance: " << 
-        Config->SingleInstance << " PreScan: " << Config->PreScan << endl;
+      clog << "Version:" << Config->Version << " SingleInstance:" <<
+        Config->SingleInstance << " PreScan: " << Config->PreScan <<
+        " Pipeline:" << Config->Pipeline << " SendConfig:" << 
+        Config->SendConfig << endl;
    }
    
    return true;
 }
                                                                        /*}}}*/
+// Worker::SendConfiguration - Send the config to the method           /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Worker::SendConfiguration()
+{
+   if (Config->SendConfig == false)
+      return true;
+
+   if (OutFd == -1)
+      return false;
+   
+   string Message = "601 Configuration\n";
+   Message.reserve(2000);
+
+   /* Write out all of the configuration directives by walking the 
+      configuration tree */
+   const Configuration::Item *Top = _config->Tree(0);
+   for (; Top != 0;)
+   {
+      if (Top->Value.empty() == false)
+      {
+        string Line = "Config-Item: " + Top->FullTag() + "=";
+        Line += QuoteString(Top->Value,"\n") + '\n';
+        Message += Line;
+      }
+      
+      if (Top->Child != 0)
+      {
+        Top = Top->Child;
+        continue;
+      }
+      
+      while (Top != 0 && Top->Next == 0)
+        Top = Top->Parent;
+      if (Top != 0)
+        Top = Top->Next;
+   }   
+   Message += '\n';
+
+   if (Debug == true)
+      clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
+   OutQueue += Message;
+   OutReady = true; 
+   
+   return true;
+}
+                                                                       /*}}}*/
+// Worker::QueueItem - Add an item to the outbound queue               /*{{{*/
+// ---------------------------------------------------------------------
+/* Send a URI Acquire message to the method */
+bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
+{
+   if (OutFd == -1)
+      return false;
+   
+   string Message = "600 URI Acquire\n";
+   Message.reserve(300);
+   Message += "URI: " + Item->URI;
+   Message += "\nFilename: " + Item->Owner->DestFile;
+   Message += Item->Owner->Custom600Headers();
+   Message += "\n\n";
+   
+   if (Debug == true)
+      clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
+   OutQueue += Message;
+   OutReady = true;
+   
+   return true;
+}
+                                                                       /*}}}*/
+// Worker::OutFdRead - Out bound FD is ready                           /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Worker::OutFdReady()
+{
+   int Res = write(OutFd,OutQueue.begin(),OutQueue.length());
+   if (Res <= 0)
+      return MethodFailure();
+
+   // Hmm.. this should never happen.
+   if (Res < 0)
+      return true;
+   
+   OutQueue.erase(0,Res);
+   if (OutQueue.empty() == true)
+      OutReady = false;
+   
+   return true;
+}
+                                                                       /*}}}*/
+// Worker::InFdRead - In bound FD is ready                             /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Worker::InFdReady()
+{
+   if (ReadMessages() == false)
+      return false;
+   RunMessages();
+   return true;
+}
+                                                                       /*}}}*/
+// Worker::MethodFailure - Called when the method fails                        /*{{{*/
+// ---------------------------------------------------------------------
+/* This is called when the method is belived to have failed, probably because
+   read returned -1. */
+bool pkgAcquire::Worker::MethodFailure()
+{
+   cerr << "Method " << Access << " has died unexpectedly!" << endl;
+   if (waitpid(Process,0,0) != Process)
+      _error->Warning("I waited but nothing was there!");
+   Process = -1;
+   close(InFd);
+   close(OutFd);
+   InFd = -1;
+   OutFd = -1;
+   OutReady = false;
+   InReady = false;
+   OutQueue = string();
+   MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
+   
+   return false;
+}
+                                                                       /*}}}*/
index 2807237..d128ec8 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire-worker.h,v 1.2 1998/10/20 02:39:14 jgg Exp $
+// $Id: acquire-worker.h,v 1.3 1998/10/22 04:56:42 jgg Exp $
 /* ######################################################################
 
    Acquire Worker - Worker process manager
 // Interfacing to the method process
 class pkgAcquire::Worker
 {
+   friend pkgAcquire;
+   
    protected:
    friend Queue;
 
-   Worker *Next;
+   /* Linked list starting at a Queue and a linked list starting
+      at Acquire */
+   Worker *NextQueue;
+   Worker *NextAcquire;
    
    // The access association
    Queue *OwnerQ;
@@ -35,27 +40,40 @@ class pkgAcquire::Worker
    pid_t Process;
    int InFd;
    int OutFd;
+   bool InReady;
+   bool OutReady;
    
    // Various internal things
    bool Debug;
    vector<string> MessageQueue;
-
+   string OutQueue;
+   
    // Private constructor helper
    void Construct();
    
    // Message handling things
    bool ReadMessages();
    bool RunMessages();
+   bool InFdReady();
+   bool OutFdReady();
    
    // The message handlers
    bool Capabilities(string Message);
+   bool SendConfiguration();
+
+   bool MethodFailure();
    
    public:
    
+   pkgAcquire::Queue::QItem *CurrentItem;
+   
+   string Status;
+   
    // Load the method and do the startup 
+   bool QueueItem(pkgAcquire::Queue::QItem *Item);
    bool Start();   
    
-   Worker(Queue *OwnerQ,string Access);
+   Worker(Queue *OwnerQ,MethodConfig *Config);
    Worker(MethodConfig *Config);
    ~Worker();
 };
index ad5016b..80fee9a 100644 (file)
@@ -1,10 +1,15 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire.cc,v 1.2 1998/10/20 02:39:15 jgg Exp $
+// $Id: acquire.cc,v 1.3 1998/10/22 04:56:43 jgg Exp $
 /* ######################################################################
 
    Acquire - File Acquiration
 
+   The core element for the schedual system is the concept of a named
+   queue. Each queue is unique and each queue has a name derived from the
+   URI. The degree of paralization can be controled by how the queue
+   name is derived from the URI.
+   
    ##################################################################### */
                                                                        /*}}}*/
 // Include Files                                                       /*{{{*/
@@ -14,6 +19,8 @@
 #include <apt-pkg/acquire.h>
 #include <apt-pkg/acquire-item.h>
 #include <apt-pkg/acquire-worker.h>
+#include <apt-pkg/configuration.h>
+#include <apt-pkg/error.h>
 #include <strutl.h>
                                                                        /*}}}*/
 
@@ -24,6 +31,16 @@ pkgAcquire::pkgAcquire()
 {
    Queues = 0;
    Configs = 0;
+   Workers = 0;
+   ToFetch = 0;
+   
+   string Mode = _config->Find("Acquire::Queue-Mode","host");
+   if (strcasecmp(Mode.c_str(),"host") == 0)
+      QueueMode = QueueHost;
+   if (strcasecmp(Mode.c_str(),"access") == 0)
+      QueueMode = QueueAccess;   
+
+   Debug = _config->FindB("Debug::pkgAcquire",false);
 }
                                                                        /*}}}*/
 // Acquire::~pkgAcquire        - Destructor                                    /*{{{*/
@@ -40,6 +57,13 @@ pkgAcquire::~pkgAcquire()
       Configs = Configs->Next;
       delete Jnk;
    }   
+
+   while (Queues != 0)
+   {
+      Queue *Jnk = Queues;
+      Queues = Queues->Next;
+      delete Jnk;
+   }   
 }
                                                                        /*}}}*/
 // Acquire::Add - Add a new item                                       /*{{{*/
@@ -62,23 +86,97 @@ void pkgAcquire::Remove(Item *Itm)
    }   
 }
                                                                        /*}}}*/
+// Acquire::Add - Add a worker                                         /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Add(Worker *Work)
+{
+   Work->NextAcquire = Workers;
+   Workers = Work;
+}
+                                                                       /*}}}*/
+// Acquire::Remove - Remove a worker                                   /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Remove(Worker *Work)
+{
+   Worker **I = &Workers;
+   for (; *I != 0;)
+   {
+      if (*I == Work)
+        *I = (*I)->NextAcquire;
+      else
+        I = &(*I)->NextAcquire;
+   }
+}
+                                                                       /*}}}*/
 // Acquire::Enqueue - Queue an URI for fetching                                /*{{{*/
 // ---------------------------------------------------------------------
 /* */
-void pkgAcquire::Enqueue(Item *Item,string URI)
+void pkgAcquire::Enqueue(Item *Itm,string URI,string Description)
 {
-   cout << "Fetching " << URI << endl;
-   cout << "   to " << Item->ToFile() << endl;
-   cout << " Queue is: " << QueueName(URI) << endl;
+   // Determine which queue to put the item in
+   string Name = QueueName(URI);
+   if (Name.empty() == true)
+      return;
+
+   // Find the queue structure
+   Queue *I = Queues;
+   for (; I != 0 && I->Name != Name; I = I->Next);
+   if (I == 0)
+   {
+      I = new Queue(Name,this);
+      I->Next = Queues;
+      Queues = I;
+   }
+   
+   // Queue it into the named queue
+   I->Enqueue(Itm,URI,Description);
+   ToFetch++;
+      
+   // Some trace stuff
+   if (Debug == true)
+   {
+      clog << "Fetching " << URI << endl;
+      clog << " to " << Itm->DestFile << endl;
+      clog << " Queue is: " << QueueName(URI) << endl;
+   }
 }
                                                                        /*}}}*/
-// Acquire::QueueName - Return the name of the queue for this URI      /*{{{*/
+// Acquire::Dequeue - Remove an item from all queues                   /*{{{*/
 // ---------------------------------------------------------------------
 /* */
+void pkgAcquire::Dequeue(Item *Itm)
+{
+   Queue *I = Queues;
+   for (; I != 0; I = I->Next)
+      I->Dequeue(Itm);
+   ToFetch--;
+}
+                                                                       /*}}}*/
+// Acquire::QueueName - Return the name of the queue for this URI      /*{{{*/
+// ---------------------------------------------------------------------
+/* The string returned depends on the configuration settings and the
+   method parameters. Given something like http://foo.org/bar it can
+   return http://foo.org or http */
 string pkgAcquire::QueueName(string URI)
 {
    const MethodConfig *Config = GetConfig(URIAccess(URI));
-   return string();
+   if (Config == 0)
+      return string();
+   
+   /* Single-Instance methods get exactly one queue per URI. This is
+      also used for the Access queue method  */
+   if (Config->SingleInstance == true || QueueMode == QueueAccess)
+      return URIAccess(URI);
+      
+   // Host based queue 
+   string::iterator I = URI.begin();
+   for (; I < URI.end() && *I != ':'; I++);
+   for (; I < URI.end() && (*I == '/' || *I == ':'); I++);
+   for (; I < URI.end() && *I != '/'; I++);
+       
+   return string(URI,0,I - URI.begin());
 }
                                                                        /*}}}*/
 // Acquire::GetConfig - Fetch the configuration information            /*{{{*/
@@ -86,7 +184,7 @@ string pkgAcquire::QueueName(string URI)
 /* This locates the configuration structure for an access method. If 
    a config structure cannot be found a Worker will be created to
    retrieve it */
-const pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
+pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
 {
    // Search for an existing config
    MethodConfig *Conf;
@@ -108,6 +206,74 @@ const pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
    return Conf;
 }
                                                                        /*}}}*/
+// Acquire::SetFds - Deal with readable FDs                            /*{{{*/
+// ---------------------------------------------------------------------
+/* Collect FDs that have activity monitors into the fd sets */
+void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
+{
+   for (Worker *I = Workers; I != 0; I = I->NextAcquire)
+   {
+      if (I->InReady == true && I->InFd >= 0)
+      {
+        if (Fd < I->InFd)
+           Fd = I->InFd;
+        FD_SET(I->InFd,RSet);
+      }
+      if (I->OutReady == true && I->OutFd >= 0)
+      {
+        if (Fd < I->OutFd)
+           Fd = I->OutFd;
+        FD_SET(I->OutFd,WSet);
+      }
+   }
+}
+                                                                       /*}}}*/
+// Acquire::RunFds - Deal with active FDs                              /*{{{*/
+// ---------------------------------------------------------------------
+/* Dispatch active FDs over to the proper workers */
+void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
+{
+   for (Worker *I = Workers; I != 0; I = I->NextAcquire)
+   {
+      if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
+        I->InFdReady();
+      if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
+        I->OutFdReady();
+   }
+}
+                                                                       /*}}}*/
+// Acquire::Run - Run the fetch sequence                               /*{{{*/
+// ---------------------------------------------------------------------
+/* This runs the queues. It manages a select loop for all of the
+   Worker tasks. The workers interact with the queues and items to
+   manage the actual fetch. */
+bool pkgAcquire::Run()
+{
+   for (Queue *I = Queues; I != 0; I = I->Next)
+      I->Startup();
+   
+   // Run till all things have been acquired
+   while (ToFetch > 0)
+   {
+      fd_set RFds;
+      fd_set WFds;
+      int Highest = 0;
+      FD_ZERO(&RFds);
+      FD_ZERO(&WFds);
+      SetFds(Highest,&RFds,&WFds);
+      
+      if (select(Highest+1,&RFds,&WFds,0,0) <= 0)
+        return _error->Errno("select","Select has failed");
+      
+      RunFds(&RFds,&WFds);
+   }   
+   
+   for (Queue *I = Queues; I != 0; I = I->Next)
+      I->Shutdown();
+
+   return true;
+}
+                                                                       /*}}}*/
 
 // Acquire::MethodConfig::MethodConfig - Constructor                   /*{{{*/
 // ---------------------------------------------------------------------
@@ -116,5 +282,110 @@ pkgAcquire::MethodConfig::MethodConfig()
 {
    SingleInstance = false;
    PreScan = false;
+   Pipeline = false;
+   SendConfig = false;
+   Next = 0;
+}
+                                                                       /*}}}*/
+
+// Queue::Queue - Constructor                                          /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name), 
+            Owner(Owner)
+{
+   Items = 0;
+   Next = 0;
+   Workers = 0;
+}
+                                                                       /*}}}*/
+// Queue::~Queue - Destructor                                          /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+pkgAcquire::Queue::~Queue()
+{
+   Shutdown();
+   
+   while (Items != 0)
+   {
+      QItem *Jnk = Items;
+      Items = Items->Next;
+      delete Jnk;
+   }
+}
+                                                                       /*}}}*/
+// Queue::Enqueue - Queue an item to the queue                         /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Queue::Enqueue(Item *Owner,string URI,string Description)
+{
+   // Create a new item
+   QItem *I = new QItem;   
+   I->Next = Items;
+   Items = I;
+   
+   // Fill it in
+   Items->Owner = Owner;
+   Items->URI = URI;
+   Items->Description = Description;
+   Owner->QueueCounter++;
+}
+                                                                       /*}}}*/
+// Queue::Dequeue - Remove and item from the queue                     /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Queue::Dequeue(Item *Owner)
+{
+   QItem **I = &Items;
+   for (; *I != 0;)
+   {
+      if ((*I)->Owner == Owner)
+      {
+        QItem *Jnk= *I;
+        *I = (*I)->Next;
+        Owner->QueueCounter--;
+        delete Jnk;
+      }
+      else
+        I = &(*I)->Next;
+   }
+}
+                                                                       /*}}}*/
+// Queue::Startup - Start the worker processes                         /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Queue::Startup()
+{
+   Shutdown();
+   
+   pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(URIAccess(Name));
+   if (Cnf == 0)
+      return false;
+   
+   Workers = new Worker(this,Cnf);
+   Owner->Add(Workers);
+   if (Workers->Start() == false)
+      return false;
+      
+   Workers->QueueItem(Items);
+   
+   return true;
+}
+                                                                       /*}}}*/
+// Queue::Shutdown - Shutdown the worker processes                     /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Queue::Shutdown()
+{
+   // Delete all of the workers
+   while (Workers != 0)
+   {
+      pkgAcquire::Worker *Jnk = Workers;
+      Workers = Workers->NextQueue;
+      Owner->Remove(Jnk);
+      delete Jnk;
+   }
+   
+   return true;
 }
                                                                        /*}}}*/
index 355eb3c..cea7c88 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: acquire.h,v 1.2 1998/10/20 02:39:16 jgg Exp $
+// $Id: acquire.h,v 1.3 1998/10/22 04:56:44 jgg Exp $
 /* ######################################################################
 
    Acquire - File Acquiration
@@ -39,6 +39,8 @@
 #pragma interface "apt-pkg/acquire.h"
 #endif 
 
+#include <unistd.h>
+
 class pkgAcquire
 {   
    public:
@@ -48,21 +50,40 @@ class pkgAcquire
    class Worker;
    struct MethodConfig;
    friend Item;
+   friend Queue;
    
    protected:
    
+   // List of items to fetch
    vector<Item *> Items;
+   
+   // List of active queues and fetched method configuration parameters
    Queue *Queues;
+   Worker *Workers;
    MethodConfig *Configs;
+   unsigned long ToFetch;
+   
+   // Configurable parameters for the schedular
+   enum {QueueHost,QueueAccess} QueueMode;
+   bool Debug;
    
    void Add(Item *Item);
    void Remove(Item *Item);
-   void Enqueue(Item *Item,string URI);
+   void Add(Worker *Work);
+   void Remove(Worker *Work);
+   
+   void Enqueue(Item *Item,string URI,string Description);
+   void Dequeue(Item *Item);
+   string QueueName(string URI);
+
+   // FDSET managers for derived classes
+   void SetFds(int &Fd,fd_set *RSet,fd_set *WSet);
+   void RunFds(fd_set *RSet,fd_set *WSet);   
    
    public:
 
-   const MethodConfig *GetConfig(string Access);
-   string QueueName(string URI);
+   MethodConfig *GetConfig(string Access);
+   bool Run();
    
    pkgAcquire();
    ~pkgAcquire();
@@ -75,12 +96,36 @@ class pkgAcquire::Queue
    Queue *Next;
    
    protected:
-   
-   string URIMatch;
 
-   vector<Item *> Items;
+   // Queued item
+   struct QItem 
+   {
+      QItem *Next;
+      
+      string URI;
+      string Description;
+      Item *Owner;
+   };   
+   
+   // Name of the queue
+   string Name;
+
+   // Items queued into this queue
+   QItem *Items;
+   pkgAcquire::Worker *Workers;
+   pkgAcquire *Owner;
    
    public:
+   
+   // Put an item into this queue
+   void Enqueue(Item *Owner,string URI,string Description);
+   void Dequeue(Item *Owner);
+
+   bool Startup();
+   bool Shutdown();
+   
+   Queue(string Name,pkgAcquire *Owner);
+   ~Queue();
 };
 
 // Configuration information from each method
@@ -93,6 +138,8 @@ struct pkgAcquire::MethodConfig
    string Version;
    bool SingleInstance;
    bool PreScan;
+   bool Pipeline;
+   bool SendConfig;
    
    MethodConfig();
 };
index 82418f9..fa07ed3 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: configuration.cc,v 1.7 1998/10/20 02:39:26 jgg Exp $
+// $Id: configuration.cc,v 1.8 1998/10/22 04:56:45 jgg Exp $
 /* ######################################################################
 
    Configuration Class
@@ -33,7 +33,7 @@ Configuration::Configuration()
    Root = new Item;
 }
                                                                        /*}}}*/
-// Configuration::Lookup - Lookup a single item                                                                        /*{{{*/
+// Configuration::Lookup - Lookup a single item                                /*{{{*/
 // ---------------------------------------------------------------------
 /* This will lookup a single item by name below another item. It is a 
    helper function for the main lookup function */
@@ -66,6 +66,9 @@ Configuration::Item *Configuration::Lookup(Item *Head,const char *S,
    new items */
 Configuration::Item *Configuration::Lookup(const char *Name,bool Create)
 {
+   if (Name == 0)
+      return Root->Child;
+   
    const char *Start = Name;
    const char *End = Start + strlen(Name);
    const char *TagEnd = Name;
@@ -210,6 +213,17 @@ bool Configuration::Exists(const char *Name)
 }
                                                                        /*}}}*/
 
+// Configuration::Item::FullTag - Return the fully scoped tag          /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+string Configuration::Item::FullTag() const
+{
+   if (Parent == 0 || Parent->Parent == 0)
+      return Tag;
+   return Parent->FullTag() + "::" + Tag;
+}
+                                                                       /*}}}*/
+
 // ReadConfigFile - Read a configuration file                          /*{{{*/
 // ---------------------------------------------------------------------
 /* The configuration format is very much like the named.conf format
index c98b0bb..14c80e4 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: configuration.h,v 1.5 1998/10/20 02:39:27 jgg Exp $
+// $Id: configuration.h,v 1.6 1998/10/22 04:56:46 jgg Exp $
 /* ######################################################################
 
    Configuration Class
@@ -39,6 +39,9 @@ class Configuration
       Item *Parent;
       Item *Child;
       Item *Next;
+      
+      string FullTag() const;
+      
       Item() : Child(0), Next(0) {};
    };
    Item *Root;
@@ -61,6 +64,8 @@ class Configuration
    inline bool Exists(string Name) {return Exists(Name.c_str());};
    bool Exists(const char *Name);
       
+   inline const Item *Tree(const char *Name) {return Lookup(Name,false);};
+   
    Configuration();
 };
 
index bfc674c..3d5c468 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: fileutl.cc,v 1.10 1998/10/20 04:33:16 jgg Exp $
+// $Id: fileutl.cc,v 1.11 1998/10/22 04:56:47 jgg Exp $
 /* ######################################################################
    
    File Utilities
@@ -136,8 +136,8 @@ void SetCloseExec(int Fd,bool Close)
 /* */
 void SetNonBlock(int Fd,bool Block)
 {   
-   int Flags = fcntl(Fd,F_GETFL);
-   if (fcntl(Fd,F_SETFL,(Flags  & ~O_NONBLOCK) | (Block == false)?0:O_NONBLOCK) != 0)
+   int Flags = fcntl(Fd,F_GETFL) & (~O_NONBLOCK);
+   if (fcntl(Fd,F_SETFL,Flags | ((Block == false)?0:O_NONBLOCK)) != 0)
    {
       cerr << "FATAL -> Could not set non-blocking flag " << strerror(errno) << endl;
       exit(100);
@@ -153,8 +153,10 @@ bool WaitFd(int Fd)
    fd_set Set;
    FD_ZERO(&Set);
    FD_SET(Fd,&Set);
+
    if (select(Fd+1,&Set,0,0,0) <= 0)
       return false;
+
    return true;
 }
                                                                        /*}}}*/
index c615f62..04a3c7b 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: strutl.cc,v 1.5 1998/10/20 02:39:30 jgg Exp $
+// $Id: strutl.cc,v 1.6 1998/10/22 04:56:48 jgg Exp $
 /* ######################################################################
 
    String Util - Some usefull string functions.
                                                                        /*}}}*/
 // Includes                                                            /*{{{*/
 #include <strutl.h>
+#include <apt-pkg/fileutl.h>
+
 #include <ctype.h>
 #include <string.h>
 #include <stdio.h>
+#include <time.h>
                                                                        /*}}}*/
 
 // strstrip - Remove white space from the front and back of a string   /*{{{*/
@@ -310,7 +313,7 @@ string URIAccess(string URI)
 {
    string::size_type Pos = URI.find(':');
    if (Pos == string::npos)
-      return string();
+      return URI;
    return string(URI,0,Pos);
 }
                                                                        /*}}}*/
@@ -472,3 +475,71 @@ int StringToBool(string Text,int Default = -1)
    return Default;
 }
                                                                        /*}}}*/
+// TimeRFC1123 - Convert a time_t into RFC1123 format                  /*{{{*/
+// ---------------------------------------------------------------------
+/* This converts a time_t into a string time representation that is
+   year 2000 complient and timezone neutral */
+string TimeRFC1123(time_t Date)
+{
+   struct tm Conv = *gmtime(&Date);
+   char Buf[300];
+
+   const char *Day[] = {"Sun","Mon","Tue","Wed","Thu","Fri","Sat"};
+   const char *Month[] = {"Jan","Feb","Mar","Apr","May","Jun","Jul",
+                          "Aug","Sep","Oct","Nov","Dec"};
+
+   sprintf(Buf,"%s, %02i %s %i %02i:%02i:%02i GMT",Day[Conv.tm_wday],
+          Conv.tm_mday,Month[Conv.tm_mon],Conv.tm_year+1900,Conv.tm_hour,
+          Conv.tm_min,Conv.tm_sec);
+   return Buf;
+}
+                                                                       /*}}}*/
+// ReadMessages - Read messages from the FD                            /*{{{*/
+// ---------------------------------------------------------------------
+/* This pulls full messages from the input FD into the message buffer. 
+   It assumes that messages will not pause during transit so no
+   fancy buffering is used. */
+bool ReadMessages(int Fd, vector<string> &List)
+{
+   char Buffer[4000];
+   char *End = Buffer;
+   
+   while (1)
+   {
+      int Res = read(Fd,End,sizeof(Buffer) - (End-Buffer));
+      
+      // Process is dead, this is kind of bad..
+      if (Res == 0)
+        return false;
+      
+      // No data
+      if (Res <= 0)
+        return true;
+
+      End += Res;
+      
+      // Look for the end of the message
+      for (char *I = Buffer; I < End; I++)
+      {
+        if (I[0] != '\n' || I[1] != '\n')
+           continue;
+        
+        // Pull the message out
+        string Message(Buffer,0,I-Buffer);
+
+        // Fix up the buffer
+        for (; I < End && *I == '\n'; I++);
+        End -= I-Buffer;        
+        memmove(Buffer,I,End-Buffer);
+        I = Buffer;
+        
+        List.push_back(Message);
+      }
+      if (End == Buffer)
+        return true;
+
+      if (WaitFd(Fd) == false)
+        return false;
+   }   
+}
+                                                                       /*}}}*/
index 38aca57..fca36fc 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode: cpp; mode: fold -*-
 // Description                                                         /*{{{*/
-// $Id: strutl.h,v 1.5 1998/10/20 02:39:31 jgg Exp $
+// $Id: strutl.h,v 1.6 1998/10/22 04:56:49 jgg Exp $
 /* ######################################################################
 
    String Util - These are some usefull string functions
@@ -20,6 +20,7 @@
 
 #include <stdlib.h>
 #include <string>
+#include <vector>
 
 char *_strstrip(char *String);
 char *_strtabexpand(char *String,size_t Len);
@@ -32,12 +33,14 @@ string SubstVar(string Str,string Subst,string Contents);
 string Base64Encode(string Str);
 string URItoFileName(string URI);
 string URIAccess(string URI);
+string TimeRFC1123(time_t Date);
+string LookupTag(string Message,const char *Tag,const char *Default = 0);
+int StringToBool(string Text,int Default = -1);
+bool ReadMessages(int Fd, vector<string> &List);
 
 int stringcmp(const char *A,const char *AEnd,const char *B,const char *BEnd);
 inline int stringcmp(const char *A,const char *AEnd,const char *B) {return stringcmp(A,AEnd,B,B+strlen(B));};
 int stringcasecmp(const char *A,const char *AEnd,const char *B,const char *BEnd);
 inline int stringcasecmp(const char *A,const char *AEnd,const char *B) {return stringcasecmp(A,AEnd,B,B+strlen(B));};
-string LookupTag(string Message,const char *Tag,const char *Default = 0);
-int StringToBool(string Text,int Default = -1);
 
 #endif
index 8cd9c3b..ab1bfbc 100644 (file)
@@ -1,4 +1,4 @@
-// $Id: apt.conf,v 1.3 1998/10/20 02:41:06 jgg Exp $
+// $Id: apt.conf,v 1.4 1998/10/22 04:56:50 jgg Exp $
 /* This file is an index of all APT configuration directives. It should
    NOT actually be used as a real config file, though it is a completely
    valid file.
@@ -16,6 +16,11 @@ APT {
   };
 };
 
+Acquire
+{
+  Queue-Mode "access";       // host|access
+};
+
 Dir 
 {
   
@@ -50,5 +55,6 @@ DSelect {
 
 Debug {
   pkgProblemResolver "true";
+  pkgAcquire "false";
   pkgAcquire::Worker "true";
 }
index 017222d..71c5643 100644 (file)
@@ -4,6 +4,7 @@ int main()
 {
    printf("100 Capabilities\n"
          "Version: 1.0\n"
-         "Pre-Scan: true\n\n"
-         "Version: 1.0\n\n");
+         "Pre-Scan: true\n\n");
+   fflush(stdout);
+   sleep(10);
 }
index 577ab5f..a8817bc 100644 (file)
@@ -1,9 +1,12 @@
 #include <apt-pkg/acquire-item.h>
 #include <apt-pkg/init.h>
 #include <apt-pkg/error.h>
+#include <signal.h>
 
 int main()
 {
+   signal(SIGPIPE,SIG_IGN);
+   
    pkgInitialize(*_config);
    
    pkgSourceList List;
@@ -17,6 +20,8 @@ int main()
       if (_error->PendingError() == true)
         break;
    }
+
+   Fetcher.Run();
    
    _error->DumpErrors();
 }