Changeset 319


Ignore:
Timestamp:
03/16/10 11:16:40 (10 years ago)
Author:
sander
Message:

Better handling of multiple cron jobs

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/server/www/app/plugins/bean_stalk/vendors/shells/worker.php

    r317 r319  
    3232        public $pid = null; 
    3333 
    34         /** @var array A queue for cron jobs */ 
     34        /** @var array A queue for cron jobs. Note that it is reverse sorted. */ 
    3535        public $cron = array(); 
    3636 
     
    111111                $this->out('Stopping CakePHP background worker'); 
    112112                 
    113                 // Send a stop command to the worker 
    114                 $message = array('type' => 'stop'); 
    115                 $tube = trim(array_shift($this->tubes)); 
    116                 $this->beanstalk->put(0, 0, 30, serialize($message), $tube); 
     113                // First, check if the daemon is actually running 
     114                $pidfile = Configure::read('Worker.pidfile'); 
     115                if (!file_exists($pidfile)) { 
     116                        $this->out('The background worker is not running.'); 
     117                        $this->_stop(); 
     118                } 
     119 
     120                $pid = trim(file_get_contents($pidfile)); 
     121                if (posix_kill($pid, 0) || posix_get_last_error() <= 1) { 
     122                        // Dameon is running. Send a stop signal. 
     123                        $message = array('type' => 'stop'); 
     124                        $tube = trim(array_shift($this->tubes)); 
     125                        $this->beanstalk->put(0, 0, 30, serialize($message), $tube); 
     126 
     127                        // Exit 
     128                        $this->_stop(); 
     129                } 
     130 
     131                $this->out('The background worker is not running. Removing stale pidfile'); 
     132                @unlink($pidfile); 
    117133        } 
    118134 
     
    136152                if (is_array($tasks)) { 
    137153                        foreach ($tasks as $task_name => $task) { 
    138                                 $this->cron[time()] = $task_name; 
    139                         } 
    140  
    141                         ksort($this->cron); 
     154                                $this->cron[time()][] = $task_name; 
     155                        } 
     156 
    142157                        reset($this->cron); 
    143158                } 
     
    302317        /** 
    303318         * Look at the cron queue, execute any outstanding tasks 
     319         * Note that $this->cron is sorted in reversed. This is because array_shift renumbers 
     320         * the keys while array_pop does not. 
    304321         * @return int Time in seconds until the next task 
    305322         */ 
     
    311328 
    312329                $time = time(); 
     330                end($this->cron); 
     331 
    313332                while (key($this->cron) <= $time) { 
    314333                        // Read the task 
    315                         $task_name = array_shift($this->cron); 
    316                         $task = Configure::read('Cron.' . $task_name); 
    317  
    318                         // Execute the task 
    319                         App::import('Model', $task['Model']); 
    320                         $model = ClassRegistry::init($task['Model']); 
    321                         $model->create(); 
    322  
    323                         if (!method_exists($model, $task['method'])) { 
    324                                 $message = sprintf('%s->%s does not exist', $task['Model'], $task['method']); 
     334                        $task_names = array_pop($this->cron); 
     335                        foreach ($task_names as $task_name) { 
     336                                $task = Configure::read('Cron.' . $task_name); 
     337 
     338                                // Execute the task 
     339                                App::import('Model', $task['Model']); 
     340                                $model = ClassRegistry::init($task['Model']); 
     341                                $model->create(); 
     342 
     343                                if (!method_exists($model, $task['method'])) { 
     344                                        $message = sprintf('%s->%s does not exist', $task['Model'], $task['method']); 
     345                                        $this->log($message, LOG_DEBUG); 
     346                                        $this->out($message); 
     347                                } 
     348 
     349                                if (isset($task['args'])) { 
     350                                        call_user_func_array(array($model, $task['method']), $task['args']); 
     351                                } else { 
     352                                        call_user_func(array($model, $task['method'])); 
     353                                } 
     354                                unset($model); 
     355 
     356                                $message = sprintf('Executed cron %s->%s', $task['Model'], $task['method']); 
    325357                                $this->log($message, LOG_DEBUG); 
    326358                                $this->out($message); 
    327                         } 
    328  
    329                         if (isset($task['args'])) { 
    330                                 call_user_func_array(array($model, $task['method']), $task['args']); 
    331                         } else { 
    332                                 call_user_func(array($model, $task['method'])); 
    333                         } 
    334                         unset($model); 
    335  
    336                         $message = sprintf('Executed cron %s->%s', $task['Model'], $task['method']); 
    337                         $this->log($message, LOG_DEBUG); 
    338                         $this->out($message); 
    339  
    340                         // Reschedule 
    341                         $start = $time + $task['interval']; 
    342                         $this->cron[$start] = $task_name; 
    343                         ksort($this->cron); 
    344                         reset($this->cron); 
     359 
     360                                // Reschedule 
     361                                $start = $time + $task['interval']; 
     362                                $this->cron[$start][] = $task_name; 
     363                                krsort($this->cron, SORT_STRING); 
     364                                end($this->cron); 
     365                        } 
    345366 
    346367                        // Advance time 
     
    362383                $this->hr(); 
    363384                $this->out('Commands:'); 
    364                 $this->out("\n\trun\n\t\tRun the worker"); 
     385                $this->out("\n\trun [nofork]\n\t\tRun the worker"); 
    365386                $this->out("\n\tstop\n\t\tStop the worker"); 
    366387                $this->out("\n\tstatus\n\t\tShow status of the running worker"); 
Note: See TracChangeset for help on using the changeset viewer.