source: trunk/server/www/app/plugins/bean_stalk/vendors/beanstalk-1.2.1/src/BeanStalk.class.php @ 200

Last change on this file since 200 was 200, checked in by sander, 11 years ago

Log beanstalk job ID

File size: 42.6 KB
Line 
1<?php
2
3/**
4* BeanStalk - A PHP Client Library for the beanstalkd in-memory workqueue server
5*
6* Read more about beanstalkd at http://xph.us/software/beanstalkd/
7* This library is compatible with all versions of beanstalkd from 1.0 to 2.0,
8* excluding 2.0 itself.
9*
10* NOTE: The library depends on syck (http://whytheluckystiff.net/syck/) for the
11* optional parsing of the YAML output produced by some of beanstalkd's commands,
12* namely the stats* and list* commands. Syck and its associated PHP extension
13* must be installed in order to facilitate the 'auto_unyaml' option in the open()
14* factory method.
15*
16* Copyright (c) 2008 Verkkostadi Technologies, Inc
17*
18* Permission is hereby granted, free of charge, to any person obtaining a copy
19* of this software and associated documentation files (the "Software"), to deal
20* in the Software without restriction, including without limitation the rights
21* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
22* copies of the Software, and to permit persons to whom the Software is
23* furnished to do so, subject to the following conditions:
24*
25* The above copyright notice and this permission notice shall be included in
26* all copies or substantial portions of the Software.
27*
28* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
29* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
30* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
31* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
32* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
33* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
34* THE SOFTWARE.
35*
36* @author Tim Gunter <tim@vstadi.com>
37* @license http://www.opensource.org/licenses/mit-license.php
38* @version 1.2.1
39*/
40
41/**
42* Class: BeanStalk
43*
44* Top level control structure for managing a cluster of beanstalks
45*
46* @package BeanStalk
47*/
48class BeanStalk
49{
50    const DEBUG = false;
51    const VERSION = "1.2.0";
52   
53    private $servers;
54    private $server_numerics;
55    private $select_mode;
56   
57    private $connection_timeout;
58    private $connection_retries;
59    private $peek_usleep;
60    private $auto_unyaml;
61   
62    private $reserver;
63    private $nextserver;
64    private $lastserver;
65    private $pool_size;
66    private $internal;
67
68    private $last_insert_id;
69   
70    /**
71    * BeanStalk Constructor
72    *
73    * @param mixed $in_connection_settings
74    * @return BeanStalk
75    */
76    private function __construct($in_connection_settings)
77    {       
78        $this->connection_timeout = $in_connection_settings['connection_timeout'];
79        $this->connection_retries = $in_connection_settings['connection_retries'];
80        $this->peek_usleep = $in_connection_settings['peek_usleep'];
81        $this->auto_unyaml = $in_connection_settings['auto_unyaml'];
82       
83        $this->log = false;
84        if (isset($in_connection_settings['log']) && $in_connection_settings['log'])
85            $this->log = $in_connection_settings['log'];
86       
87        $this->pool_size = 0;
88        $this->lastserver = -1;
89        $this->internal = 0;
90       
91        $this->servers = array(); $this->pool_size = 0;
92        foreach ($in_connection_settings['servers'] as $server)
93            $this->add_server($server);
94       
95        if (!$this->pool_size)
96            throw new BeanQueueNoValidServersException();
97       
98        $this->select_mode = $in_connection_settings['select'];
99       
100        $split = split(' ',$this->select_mode);
101        $reserver = '_reserve_'.array_pop($split);
102        $nextserver = implode(' ',$split);
103       
104        if (!method_exists($this,$reserver))
105            throw new BeanQueueInvalidSelectorBadReserverException();
106           
107        $this->reserver = array(&$this,$reserver);
108        $this->nextserver = $nextserver;
109
110    }
111   
112    /**
113    * BeanStalk Factory
114    *
115    * @param mixed $in_connection_settings
116    * @return BeanStalk
117    */
118    public static function open($in_connection_settings)
119    {
120        $defaults = array(
121            'servers'               => array(),
122            'select'                => 'random wait',
123            'connection_timeout'    => 0.5,
124            'peek_usleep'           => 2500,
125            'connection_retries'    => 3,
126            'auto_unyaml'           => true
127        );
128       
129        $settings = array_merge($defaults, $in_connection_settings);
130       
131        if (!sizeof($settings['servers']))
132            throw new BeanQueueNoServersSuppliedException();
133           
134        return new BeanStalk($settings);
135    }
136   
137    public function add_server($in_server_str)
138    {
139        // Don't index invalid servers
140        try
141        {
142            $this->servers[$in_server_str] = new BeanQueue($in_server_str, $this->connection_timeout, $this->connection_retries, $this->auto_unyaml, $this->log);
143            $this->server_numerics[$this->pool_size] = $in_server_str;
144            $this->pool_size++;
145            return true;
146        }
147        catch (BeanQueueInvalidServerException $e){}
148        return false;
149    }
150   
151    public function remove_server($in_server_str)
152    {
153        if (isset($this->servers[$in_server_str]))
154        {
155            unset($this->server[$in_server_str]);
156            unset($this->server[array_search($in_server_str,$this->server_numerics)]);
157            $this->server_numerics = array_values($this->server_numerics);
158            $this->pool_size--;
159        }
160        return false;
161    }
162   
163    /**
164    * Get reference to an active BeanQueue
165    *
166    * @param mixed $in_server_str
167    * @return BeanQueue
168    */
169    public function &get_server($in_server_str)
170    {
171        if (isset($this->servers[$in_server_str]) && $this->servers[$in_server_str]->alive('fast'))
172            return $this->servers[$in_server_str];
173        return false;
174    }
175   
176    public function &get_nth($in_server_num)
177    {
178        return $this->server_numerics[$in_server_num];
179    }
180   
181    private function &next_server()
182    {
183        while (1)
184        {
185            if (!$this->pool_size)
186                return false;
187               
188            switch ($this->nextserver)
189            {
190                case 'random':
191                    $this->lastserver = mt_rand(0,$this->pool_size-1);
192                    break;
193                   
194                case 'sequential':
195                    if ($this->lastserver >= $this->pool_size-1)
196                        $this->lastserver = 0;
197                    else
198                        $this->lastserver++;
199                    break;
200            }
201            $server_name = $this->get_nth($this->lastserver);
202            $server = $this->get_server($server_name);
203            if ($server !== false)
204                break;
205
206        }
207        return $server;
208    }
209   
210    private function reserver()
211    {
212        return call_user_func($this->reserver);
213    }
214   
215    /**
216    * Wait mode reservation
217    *
218    * Pick a server and then wait for a job to become available.
219    */
220    private function &_reserve_wait()
221    {
222        return $this->next_server();
223    }
224   
225    /**
226    * Peek mode reservation
227    *
228    * Iterate over all servers until a job is found on one of them.
229    */
230    private function &_reserve_peek()
231    {
232        // Loop through servers until a job is found
233        $server = null;
234        while (1)
235        {
236            $server = $this->next_server();
237            if ($server)
238            {
239                // If this server has a job...
240                $job = $server->reserve_with_timeout(0);
241                if (BeanQueueJob::check($job))
242                {
243                    $job->release(0,0);
244                    break;
245                }
246            }
247        }
248       
249        if ($server)
250            return $server;
251        return false;
252    }
253   
254    /**
255    * Fire a method on the next server
256    *
257    * Use the specified reserver() to grab a server reference, then do this command
258    * on it, passing the supplied args along too.
259    *
260    * @param mixed $in_command method to execute
261    * @param mixed $in_args arguments to pass
262    * @return mixed
263    */
264    private function _do_next_server($in_command, $in_args=array())
265    {
266        return $this->_do_my_server($this->reserver(), $in_command, $in_args);
267    }
268   
269    private function _do_my_server(&$in_server, $in_command, $in_args=array())
270    {
271        if (BeanStalk::DEBUG) echo __METHOD__."\n";
272        if ($in_server === false)
273            return false;
274       
275        if (!is_callable(array($in_server,$in_command)))
276            return false;
277       
278        if (!is_array($in_args))
279            $in_args = array($in_args);
280       
281        return call_user_func_array(array($in_server, $in_command), $in_args);
282    }
283   
284    private function reset()
285    {
286        $this->internal = 0;
287        return false;
288    }
289   
290    private function &next()
291    {
292        $f = false; $t = true;
293        while (1)
294        {
295            if ($this->internal >= $this->pool_size)
296                return $f;
297           
298            $current = $this->internal++;
299            $server_name = $this->get_nth($current);
300            $server = $this->get_server($server_name);
301            if ($server === false)
302                continue;
303           
304            if ($server->alive() === true)
305                return $server;
306        }
307    }
308   
309    /**
310    * REAL COMMANDS
311    */
312   
313    public function __call($in_method, $in_args)
314    {
315        if (BeanStalk::DEBUG) echo __METHOD__."({$in_method})\n";
316        switch ($in_method)
317        {
318            // Specific server methods
319            case 'peek':
320            case 'peek_ready':
321            case 'kick':
322                $servername = array_shift($in_args);
323                $server = $this->get_server($servername);
324                return $this->_do_my_server($server, $in_method, $in_args);
325                break;
326           
327            // Broadcast to all servers
328            case 'watch':
329            //case 'ignore':
330            case 'use_tube':
331                $res = array();
332                foreach ($this->servers as $servername => $server)
333                {
334                    $res[$servername] = $this->_do_my_server($server, $in_method, $in_args);
335                }
336                return $res;
337                break;
338               
339            // Thanks Kevin :p
340            default:
341                trigger_error("Fatal error: Call to undefined method ".__CLASS__."::$in_method", E_USER_ERROR);
342                break;
343        }
344    }
345   
346    public function put()
347    {
348        if (BeanStalk::DEBUG) echo __METHOD__."\n";
349        $server = $this->next_server(); $args = func_get_args();
350        $result = $this->_do_my_server($server, 'put', $args);
351
352        if ($result == BeanQueue::OPERATION_OK) {
353                $this->last_insert_id = $server->last_insert_id();
354        }
355
356        return $result;
357    }
358   
359    /**
360    * Reserve a job
361    *
362    * Picks a server according to the reserver() method, and then blocks while waiting for a job.
363    *
364    * @param $in_wait int seconds to wait for a job to become available
365    *
366    * @return BeanQueueJob or false
367    */
368    public function reserve($in_circulate_pool=true)
369    {
370        if (BeanStalk::DEBUG) echo __METHOD__."\n";
371       
372        do
373        {
374            $reserve = $this->reserver();
375            if ($reserve === false)
376                return false;
377       
378            if ($reserve instanceof BeanQueue)
379            {
380                $job = $reserve->reserve();
381                if ($job instanceof BeanQueueJob)
382                    return $job;
383            }
384           
385            if ($reserve instanceof BeanQueueJob)
386                return $reserve;
387               
388        } while ($in_circulate_pool);
389        return false;
390    }
391   
392    public function reserve_with_timeout($in_wait=0, $in_circulate_pool=true, $in_wait_is_cumulative=false)
393    {
394        if (BeanStalk::DEBUG) echo __METHOD__."\n";
395        $solo_wait = ($in_wait_is_cumulative) ? ceil($in_wait / $this->pool_size) : $in_wait;
396        $this->reset();
397        while (($server = @$this->next()) !== false)
398        {
399            $result = $server->reserve_with_timeout($solo_wait);
400            if ($result instanceof BeanQueueJob)
401                return $result;
402        }
403        return false;
404    }
405   
406    public function stats(&$out_stats)
407    {
408        if (BeanStalk::DEBUG) echo __METHOD__."\n";
409        $out_stats = array(); $this->reset();
410        $results = array();
411        while (($server = @$this->next()) !== false)
412        {
413            $result = $server->stats($srvstats);
414            $results[$server->identify()] = $result;
415            $out_stats[$server->identify()] = ($result == BeanQueue::OPERATION_OK) ? $srvstats : false;
416        }
417        return BeanQueue::OPERATION_OK;
418    }
419   
420    public function ignore($in_tube, &$out_reply)
421    {
422        if (BeanStalk::DEBUG) echo __METHOD__."\n";
423        $out_reply = array(); $this->reset();
424        $results = array();
425        while (($server = @$this->next()) !== false)
426        {
427            $result = $server->ignore($in_tube, $ign_reply);
428            $results[$server->identify()] = $result;
429            $out_reply[$server->identify()] = ($result == BeanQueue::OPERATION_OK) ? $ign_reply : false;
430        }
431        return BeanQueue::OPERATION_OK;
432    }
433   
434    public function stats_job($in_job_id, &$out_stats)
435    {
436        if (BeanStalk::DEBUG) echo __METHOD__."\n";
437        $out_stats = array(); $this->reset();
438        $results = array();
439        while (($server = @$this->next()) !== false)
440        {
441            $result = $server->stats_job($in_job_id, $srvstats);
442            $results[$server->identify()] = $result;
443            $out_stats[$server->identify()] = ($result == BeanQueue::OPERATION_OK) ? $srvstats : false;
444        }
445        return BeanQueue::OPERATION_OK;
446    }
447   
448    public function stats_tube($in_tube, &$out_stats)
449    {
450        if (BeanStalk::DEBUG) echo __METHOD__."\n";
451        $out_stats = array(); $this->reset();
452        $results = array();
453        while (($server = @$this->next()) !== false)
454        {
455            $result = $server->stats_tube($in_tube, $srvstats);
456            $results[$server->identify()] = $result;
457            $out_stats[$server->identify()] = ($result == BeanQueue::OPERATION_OK) ? $srvstats : false;
458        }
459        return BeanQueue::OPERATION_OK;
460    }
461   
462    public function list_tubes(&$out_tubes)
463    {
464        if (BeanStalk::DEBUG) echo __METHOD__."\n";
465        $out_tubes = array(); $this->reset();
466        $results = array();
467        while (($server = @$this->next()) !== false)
468        {
469            $result = $server->list_tubes($srvtube);
470            $results[$server->identify()] = $result;
471            $out_tubes[$server->identify()] = ($result == BeanQueue::OPERATION_OK) ? $srvtube : false;
472        }
473        return BeanQueue::OPERATION_OK;
474    }
475   
476    public function list_tubes_watched(&$out_tubes)
477    {
478        if (BeanStalk::DEBUG) echo __METHOD__."\n";
479        $out_tubes = array(); $this->reset();
480        $results = array();
481        while (($server = @$this->next()) !== false)
482        {
483            $result = $server->list_tubes_watched($srvtube);
484            $results[$server->identify()] = $result;
485            $out_tubes[$server->identify()] = ($result == BeanQueue::OPERATION_OK) ? $srvtube : false;
486        }
487        return BeanQueue::OPERATION_OK;
488    }
489
490    public function last_insert_id()
491    {
492            return $this->last_insert_id;
493    }
494   
495    /**
496    * Delete a job
497    *
498    * @param BeanQueueJob $in_job job to delete
499    * @return integer operation status
500    */
501    public static function delete(&$in_job)
502    {
503        if (BeanStalk::DEBUG) echo __METHOD__."\n";
504        return $in_job->delete();
505    }
506   
507    /**
508    * Release a job
509    *
510    * @param BeanQueueJob $in_job job to release
511    * @param integer $in_pri new priority
512    * @param integer $in_delay delay before job becomes ready
513    * @return integer operation status
514    */
515    public static function release(&$in_job, $in_pri, $in_delay)
516    {
517        if (BeanStalk::DEBUG) echo __METHOD__."\n";
518        return $in_job->release($in_pri, $in_delay);
519    }
520   
521    /**
522    * Bury a job
523    *
524    * @param BeanQueueJob $in_job job to bury
525    * @param integer $in_pri new priority
526    * @return integer operation status
527    */
528    public static function bury(&$in_job, $in_pri)
529    {
530        if (BeanStalk::DEBUG) echo __METHOD__."\n";
531        return $in_job->bury($in_pri);
532    }
533   
534}
535
536/**
537* Class: BeanQueue
538*
539* Represents and interfaces with a beanstalkd socket.
540*
541* @package BeanStalk
542*/
543class BeanQueue
544{
545   
546    const MAX_READ_BUF = 16384;
547    const MSG_DELIM = "\r\n";
548    const READ_ERROR = false;
549   
550    const OPERATION_OK = 1;
551    const MODE_DRAINING = 2;
552    const MODE_NORMAL = 4;
553    const ERROR_OOM = 8;
554    const ERROR_INTERNAL = 16;
555    const ERROR_BAD_FORMAT = 32;
556    const ERROR_UNKNOWN_COMMAND = 64;
557    const ERROR_LAST_TUBE = 128;
558    const ERROR_NOT_FOUND = 256;
559    const ERROR_DEADLINE_SOON = 512;
560    const ERROR_EXPECTED_CRLF = 1024;
561    const ERROR_JOB_TOO_BIG = 2048;
562    const ERROR_BURIED = 4096;
563    const ERROR_BAD_PRIORITY = 8192;
564    const ERROR_BAD_DELAY = 16384;
565    const ERROR_BAD_TTR = 32768;
566    const ERROR_NOT_CONNECTED = 65536;
567    const ERROR_TIMED_OUT = 131072;
568   
569    private $alive;                 // Connected and ready?
570    private $ip;                    // Server IP
571    private $port;                  // Server Port
572   
573    private $timeout;               // fsockopen timeout
574    private $max_retries;           // Connection retry max
575    private $auto_unyaml;           // Automatically unyaml stats and lists?
576    private $log;                   // Path to this instance's log file
577   
578    private $retries;               // Current total connection retries
579    private $socket;                // Reference to socket
580    private $rbuf;                  // Read buffer
581    private $rbuflen;
582    private $mode;                  // Server job mode. self::MODE_NORMAL or self::MODE_DRAINING
583   
584    private $tube;                  // Currently used tube
585   
586    private $recovery;
587    private $preparing;
588
589    private $last_insert_id;
590   
591    public function __construct($in_server_str, $in_conn_to, $in_conn_r, $in_auto_unyaml, $in_logfile)
592    {
593        $server = split(':',$in_server_str);
594       
595        if (ip2long($server[0]) === false)
596            throw new BeanQueueInvalidServerException();
597           
598        if ($server[1] < 1 || $server[1] > 65536)
599            throw new BeanQueueInvalidServerException();
600           
601        $this->ip = $server[0];
602        $this->port = $server[1];
603        $this->retries = 0;
604        $this->timeout = $in_conn_to;
605        $this->max_retries = $in_conn_r;
606        $this->auto_unyaml = $in_auto_unyaml;
607        $this->log = $in_logfile;
608        $this->tube = 'default';
609        $this->recovery = array(
610            'use'       => '',
611            'ignore'    => array(),
612            'watch'     => array()
613        );
614       
615        $this->alive = null;
616        $this->rbuf = null;
617        $this->rbuflen = 0;
618    }
619   
620    private function check_reply($in_reply, $in_expr_arr, &$writeback)
621    {
622        if ($in_reply == 'OUT_OF_MEMORY')
623            return self::ERROR_OOM;
624        if ($in_reply == 'INTERNAL_ERROR')
625            return self::ERROR_INTERNAL;
626        if ($in_reply == 'UNKNOWN_COMMAND')
627            return self::ERROR_UNKNOWN_COMMAND;
628        if ($in_reply == 'BAD_FORMAT')
629            return self::ERROR_BAD_FORMAT;
630        if ($in_reply == 'DRAINING')
631            return $this->mode = self::MODE_DRAINING;
632       
633        if (!is_array($in_expr_arr))
634        {
635            $in_expr_arr = array(
636                $in_expr_arr => self::OPERATION_OK
637            );
638        }
639       
640        foreach ($in_expr_arr as $in_expr => $in_return)
641        {
642            if (preg_match($in_expr, $in_reply, $writeback))
643            {
644                array_shift($writeback);
645                return $in_return;
646            }
647        }
648       
649        return self::ERROR_UNKNOWN_COMMAND;
650    }
651   
652    /**
653    * Return a server ID string
654    *
655    * Concatenates the IP and port together with a colon to form the server
656    * identification string <IP>:<port>.
657    *
658    * Useful for storing per-server lists such as the result of aggregate stats commands
659    *
660    * @return string server ID string
661    */
662    public function identify()
663    {
664        return $this->ip.':'.$this->port;
665    }
666   
667    private function unyaml($in_string)
668    {
669        if ($this->auto_unyaml && $in_string)
670            return syck_load($in_string);
671        return $in_string;
672    }
673   
674    /**
675    * Get stats for this server
676    *
677    * stats: general stats on basically everything
678    * stats-job <job-id>: stats on a certain job
679    * stats-tube <tube-name>: stats for a certain tube
680    *
681    * These functions all set the $stats writeback variable to the result,
682    * if one exists. The return value of the function will indicate whether
683    * or not to read thta writeback.
684    *
685    * @param reference $stats writeback reference to store the resulting stats
686    * @return integer operation status
687    */
688    public function stats(&$stats)
689    {
690        if (BeanStalk::DEBUG) echo __METHOD__."\n";
691        $this->safe_send_message('stats');
692        $res = $this->check_reply($this->safe_read_message(), '/OK (\d+)/', $data);
693        if ($res == self::OPERATION_OK)
694            $stats = $this->unyaml(trim($this->safe_read_blob($data[0])));
695        else
696            $stats = false;
697        return $res;
698    }
699   
700    public function stats_job($in_job_id, &$stats)
701    {
702        if (BeanStalk::DEBUG) echo __METHOD__."\n";
703        $this->safe_send_message("stats-job {$in_job_id}");
704        $res = $this->check_reply($this->safe_read_message(), array(
705            '/OK (\d+)/'        => self::OPERATION_OK,
706            '/NOT_FOUND/'       => self::ERROR_NOT_FOUND
707        ), $data);
708        if ($res == self::OPERATION_OK)
709            $stats = $this->unyaml(trim($this->safe_read_blob($data[0])));
710        else
711            $stats = false;
712        return $res;
713    }
714   
715    public function stats_tube($in_tube, &$out_stats)
716    {
717        if (BeanStalk::DEBUG) echo __METHOD__."\n";
718        $this->safe_send_message("stats-tube {$in_tube}");
719        $res = $this->check_reply($this->safe_read_message(), array(
720            '/OK (\d+)/'        => self::OPERATION_OK,
721            '/NOT_FOUND/'       => self::ERROR_NOT_FOUND
722        ), $data);
723        if ($res == self::OPERATION_OK)
724            $out_stats = $this->unyaml(trim($this->safe_read_blob($data[0])));
725        else
726            $out_stats = false;
727        return $res;
728    }
729   
730    /**
731    * The list-tubes command returns a list of all existing tubes. Its form is:
732    *
733    * @param reference $out_tubes writeback reference to store the resulting tube list
734    * @return integer operation status
735    */
736    public function list_tubes(&$out_tubes)
737    {
738        if (BeanStalk::DEBUG) echo __METHOD__."\n";
739        $this->safe_send_message("list-tubes");
740        $res = $this->check_reply($this->safe_read_message(), '/OK (\d+)/', $data);
741        if ($res == self::OPERATION_OK)
742            $out_tubes = $this->unyaml(trim($this->safe_read_blob($data[0])));
743        else
744            $out_tubes = false;
745       
746        return $res;
747    }
748   
749    /**
750    * The list-tubes-watched command returns a list of all tubes current being watched. Its form is:
751    *
752    * @param reference $out_tubes writeback reference to store the resulting tube list
753    * @return integer operation status
754    */
755    public function list_tubes_watched(&$out_tubes)
756    {
757        if (BeanStalk::DEBUG) echo __METHOD__."\n";
758        $this->safe_send_message("list-tubes-watched");
759        $res = $this->check_reply($this->safe_read_message(), '/OK (\d+)/', $data);
760        if ($res == self::OPERATION_OK)
761            $out_tubes = $this->unyaml(trim($this->safe_read_blob($data[0])));
762        else
763            $out_tubes = false;
764       
765        return $res;
766    }
767   
768    /**
769    * Set the active tube on this server
770    *
771    * @param mixed $in_tube tube to switch to
772    * @return integer operation status
773    */
774    public function use_tube($in_tube)
775    {
776        if (BeanStalk::DEBUG) echo __METHOD__."\n";
777        $this->safe_send_message("use {$in_tube}");
778        $reply = $this->safe_read_message();
779        $res = $this->check_reply($reply, "/USING {$in_tube}/", $data);
780        if ($res == self::OPERATION_OK)
781        {
782            $this->tube = $in_tube;
783            $this->recovery['use'] = $this->tube;
784        }
785       
786        return $res;
787    }
788   
789    /**
790    * Write a job to this server
791    *
792    * @param integer $in_pri job priority
793    * @param integer $in_delay delay till job becomes ready
794    * @param integer $in_ttr time in seconds that a processor will have to process a job
795    * @param mixed $in_job job body
796    * @param string $in_tube temporary tube to insert this job into
797    * @return integer operation status
798    */
799    public function put($in_pri, $in_delay, $in_ttr, $in_job, $in_tube=null)
800    {
801        // If we are draining, NO PUT!
802        if ($this->mode == self::MODE_DRAINING)
803            return self::MODE_DRAINING;
804       
805        if ($in_pri < 0 || $in_pri > 4294967294)
806            return self::ERROR_BAD_PRIORITY;
807        if (!is_numeric($in_delay))
808            return self::ERROR_BAD_DELAY;
809        if (!is_numeric($in_ttr))
810            return self::ERROR_BAD_TTR;
811       
812        // Switch to another tube first?
813        if (!is_null($in_tube))
814        {
815            $old_tube = $this->tube;
816            $this->use_tube($in_tube);
817        }
818       
819        // Do real 'put' here.
820        $bytes = strlen($in_job);
821        $this->safe_send_message("put {$in_pri} {$in_delay} {$in_ttr} {$bytes}\r\n{$in_job}");
822        $real = $this->safe_read_message();
823        $res = $this->check_reply($real, array(
824            '/INSERTED (\d+)/'          => self::OPERATION_OK,
825            '/BURIED (\d+)/'            => self::ERROR_BURIED,
826            '/EXPECTED_CRLF/'           => self::ERROR_EXPECTED_CRLF,
827            '/JOB_TOO_BIG/'             => self::ERROR_JOB_TOO_BIG
828        ), $data);
829       
830        if ($res == self::OPERATION_OK) {
831                $this->last_insert_id = $data[0];
832        } else {
833                $this->last_insert_id = null;
834        }
835
836        // Switch back to the original tube
837        if (!is_null($in_tube))
838            $this->use_tube($old_tube);
839
840        return $res;
841    }
842   
843    /**
844    * Express interest in a queue
845    *
846    * Add a queue to watch list.
847    *
848    * @param string $in_tube tube name to watch
849    * @return integer operation status
850    */
851    public function watch($in_tube)
852    {
853        if (BeanStalk::DEBUG) echo __METHOD__."\n";
854        $this->safe_send_message("watch {$in_tube}");
855        $res = $this->check_reply($this->safe_read_message(), '/WATCHING (\d+)/', $data);
856        if ($res == self::OPERATION_OK)
857        {
858            $this->recovery['watch'][$in_tube] = 1;
859            unset($this->recovery['ignore'][$in_tube]);
860        }
861
862        return $res;
863    }
864   
865    /**
866    * Express disinterest in a queue
867    *
868    * Remove a queue from watch list.
869    *
870    * @param string $in_tube tube name to ignore
871    * @return integer operation status
872    */
873    public function ignore($in_tube, &$out_reply)
874    {
875        if (BeanStalk::DEBUG) echo __METHOD__."\n";
876        $this->safe_send_message("ignore {$in_tube}");
877        $rm = $this->safe_read_message();
878        $res = $this->check_reply($rm, array(
879            '/WATCHING (\d+)/'  => self::OPERATION_OK,
880            '/NOT_IGNORED/'     => self::ERROR_LAST_TUBE
881        ), $data);
882        $out_reply = $data[0];
883        if ($res == self::OPERATION_OK)
884        {
885            $this->recovery['ignore'][$in_tube] = 1;
886            unset($this->recovery['watch'][$in_tube]);
887        }
888       
889        return $res;
890    }
891   
892    /**
893    * Bury a job
894    *
895    * Push the job down into the buried queue, only able to be recovered using the
896    * kick command.
897    *
898    * @param integer $in_job_id job to bury
899    * @param integer $in_pri new job priority
900    * @return boolean operation status
901    */
902    public function bury($in_job_id, $in_pri)
903    {
904        if (BeanStalk::DEBUG) echo __METHOD__."\n";
905        $this->safe_send_message("bury {$in_job_id} {$in_pri}");
906        $res = $this->check_reply($this->safe_read_message(), array(
907            '/BURIED/'          => self::OPERATION_OK,
908            '/NOT_IGNORED/'     => self::ERROR_LAST_TUBE
909        ), $data);
910        return $res;
911    }
912   
913    /**
914    * Kick some jobs into the ready queue
915    *
916    * @param integer $in_upper_bound max number of jobs to kick
917    * @return integer operation status
918    */
919    public function kick($in_upper_bound)
920    {
921        if (BeanStalk::DEBUG) echo __METHOD__."\n";
922        $this->safe_send_message("kick {$in_upper_bound}");
923        $res = $this->check_reply($this->safe_read_message(), '/KICKED (\d+)/', $data);
924        return $res;
925    }
926   
927    /**
928    * Reserve a job
929    *
930    * This method blocks while waiting for a job to become available. Once a new job is
931    * received, it is instantiated into a BeanQueueJob and returned. Otherwise false.
932    *
933    * @return BeanQueueJob or false
934    */
935    public function reserve()
936    {
937        if (BeanStalk::DEBUG) echo __METHOD__."\n";
938        $this->safe_send_message("reserve");
939        $real = $this->safe_read_message();
940        $res = $this->check_reply($real, array(
941            '/RESERVED (\d+) (\d+)/'    => self::OPERATION_OK,
942            '/DEADLINE_SOON/'           => self::ERROR_DEADLINE_SOON
943        ), $data);
944        if ($res == self::OPERATION_OK)
945        {
946            $jid = $data[0];
947            $bytes = $data[1];
948            $job = $this->safe_read_blob($bytes);
949            return BeanQueueJob::open($this, $jid, $job);
950        }   
951        return $res;
952    }
953   
954    /**
955    * Reserve a job, with a timeout
956    *
957    * This method blocks while waiting for a job to become available. Once a new job is
958    * received, it is instantiated into a BeanQueueJob and returned. Otherwise false.
959    *
960    * @return BeanQueueJob or false
961    */
962    public function reserve_with_timeout($in_timeout=0)
963    {
964        if (BeanStalk::DEBUG) echo __METHOD__."\n";
965        $this->safe_send_message("reserve-with-timeout {$in_timeout}");
966        $real = $this->safe_read_message();
967        $res = $this->check_reply($real, array(
968            '/RESERVED (\d+) (\d+)/'    => self::OPERATION_OK,
969            '/DEADLINE_SOON/'           => self::ERROR_DEADLINE_SOON,
970            '/TIMED_OUT/'               => self::ERROR_TIMED_OUT
971        ), $data);
972        if ($res == self::OPERATION_OK)
973        {
974            $jid = $data[0];
975            $bytes = $data[1];
976            $job = $this->safe_read_blob($bytes);
977            return BeanQueueJob::open($this, $jid, $job);
978        }   
979        return $res;
980    }
981   
982    /**
983    * Release a job
984    *
985    * Releases a job that has been reserved on this server, by ID. Usually called by
986    * BeanStalk::release(&BeanQueueJob)
987    *
988    * @param integer $in_job_id job id to delete
989    * @return integer operation status
990    */
991    public function release($in_job_id, $in_pri, $in_delay)
992    {
993        if (BeanStalk::DEBUG) echo __METHOD__."\n";
994        $this->safe_send_message("release {$in_job_id} {$in_pri} {$in_delay}");
995        $res = $this->check_reply($this->safe_read_message(), array(
996            '/RELEASED/'        => self::OPERATION_OK,
997            '/BURIED/'          => self::ERROR_BURIED,
998            '/NOT_FOUND/'       => self::ERROR_NOT_FOUND
999        ), $data);
1000        return $res;
1001    }
1002   
1003    /**
1004    * Delete a job
1005    *
1006    * Removes a job from this server by ID. Usually called by BeanStalk::delete(&BeanQueueJob)
1007    *
1008    * @param integer $in_job_id job id to delete
1009    * @return integer operation status
1010    */
1011    public function delete($in_job_id)
1012    {
1013        if (BeanStalk::DEBUG) echo __METHOD__."\n";
1014        $this->safe_send_message("delete {$in_job_id}");
1015        $res = $this->check_reply($this->safe_read_message(), array(
1016            '/DELETED/'         => self::OPERATION_OK,
1017            '/NOT_FOUND/'       => self::ERROR_NOT_FOUND
1018        ), $data);
1019        return $res;
1020    }
1021   
1022    public function peek($in_job_id, &$in_writeback=null)
1023    {
1024        if (BeanStalk::DEBUG) echo __METHOD__."\n";
1025        $this->safe_send_message("peek {$in_job_id}");
1026        $res = $this->check_reply($this->safe_read_message(), array(
1027            '/FOUND (\d+) (\d+)/'   => self::OPERATION_OK,
1028            '/NOT_FOUND/'           => self::ERROR_NOT_FOUND
1029        ), $data);
1030        $in_writeback = false;
1031        if ($res == self::OPERATION_OK)
1032        {
1033            $job = $this->safe_read_blob($data[1]);
1034            if (!is_null($in_writeback))
1035                $in_writeback = $job;
1036        }
1037        return $res;
1038    }
1039   
1040    public function peek_ready(&$in_writeback=null)
1041    {
1042        $this->safe_send_message("peek-ready");
1043        $res = $this->check_reply($this->safe_read_message(), array(
1044            '/FOUND (\d+) (\d+)/'   => self::OPERATION_OK,
1045            '/NOT_FOUND/'           => self::ERROR_NOT_FOUND
1046        ), $data);
1047        $in_writeback = false;
1048        if ($res == self::OPERATION_OK)
1049        {
1050            $job = $this->safe_read_blob($data[1]);
1051            if (!is_null($in_writeback))
1052                $in_writeback = $job;
1053        }
1054        return $res;
1055    }
1056   
1057    public function last_insert_id()
1058    {
1059            return $this->last_insert_id;
1060    }
1061
1062    public function noop()
1063    {
1064        if (BeanStalk::DEBUG) echo __METHOD__."\n";
1065        $res = false;
1066        usleep(1500);
1067        $this->alive();
1068        return $res;
1069    }
1070   
1071    /**
1072    * Check connection
1073    *
1074    * If not yet connected, attempt to do so. Else, check that the socket is still
1075    * valid.
1076    *
1077    * @return boolean connection status
1078    */
1079    public function alive($in_method = 'slow', $in_employ_lastcommand=true)
1080    {
1081        if (!$this->socket || @feof($this->socket))
1082        {
1083            $this->socket = false;
1084            $conn = false;
1085           
1086            while ($this->max_retries == -1 || $this->retries < $this->max_retries)
1087            {
1088                $conn = $this->connect($in_employ_lastcommand);
1089                if ($conn)
1090                    break;
1091               
1092                sleep(1);
1093                if ($this->max_retries == -1 && $in_method == 'fast')
1094                    break;
1095            }
1096            return ($conn !== false);
1097        }
1098       
1099        return true;
1100    }
1101   
1102    /**
1103    * Attempt to connect to beastalkd
1104    *
1105    * Tries to establish a tcp socket connection to the beastalkd server. If this server was
1106    * already tried too many times, simply returns false;
1107    *
1108    * @return boolean connection result
1109    */
1110    private function connect($in_employ_lastcommand=true)
1111    {
1112        if ($this->max_retries != -1 && $this->retries >= $this->max_retries)
1113            return false;
1114           
1115        $this->retries++;
1116        $this->socket = @fsockopen($this->ip, $this->port, $errno, $errstr);
1117        $this->preparing = true;
1118        if ($this->socket && !feof($this->socket))
1119        {
1120            stream_set_timeout($this->socket, $this->timeout);
1121            $this->mode = self::MODE_NORMAL;
1122           
1123            if ($this->recovery)
1124            {
1125                if ($this->recovery['use'])
1126                    $this->use_tube($this->recovery['use']);
1127                if (sizeof($this->recovery['watch']))
1128                    foreach ($this->recovery['watch'] as $watched => $tr)
1129                        $this->watch($watched);
1130                       
1131                if (sizeof($this->recovery['ignore']))
1132                    foreach ($this->recovery['ignore'] as $ignored => $tr)
1133                        $this->ignore($ignored,$trash);
1134                       
1135                if (isset($this->recovery['lastcommand']))
1136                {
1137                    $lc = $this->recovery['lastcommand'];
1138                    unset($this->recovery['lastcommand']);
1139                    if ($in_employ_lastcommand)
1140                        $this->safe_send_message($lc);
1141                }
1142            }
1143            $this->preparing = false;
1144            return true;
1145        }
1146        $this->preparing = false;
1147        return false;
1148    }
1149   
1150    /**
1151     * Read a message from the socket
1152     *
1153     * If the read fails because of an error (like a server that went away) and the command is recoverable,
1154     * then the connection will be restored and the last command re-sent in order to get a reply.
1155     *
1156     * Reading always ends on an \r\n (actually, \n )
1157     *
1158     * @param boolean recoverable Reconnect and re-send command on error
1159     * @return mixed A message, or self::READ_ERROR on an unrecoverable error
1160     */
1161    private function safe_read_message($in_recoverable = true)
1162    {
1163            $message = '';
1164
1165            while (strpos($message, self::MSG_DELIM) === false) {
1166                    $data = fgets($this->socket);
1167                    $message .= $data;
1168
1169                    if ($data === false || $data == '' || (feof($this->socket) && strpos($message, self::MSG_DELIM) === false)) {
1170                            if ($in_recoverable) {
1171                                    $message = '';
1172                                    $this->alive();
1173                                    continue;
1174                            }
1175
1176                            return self::READ_ERROR;
1177                    }
1178            }
1179
1180            if (BeanStalk::DEBUG)
1181                    echo __METHOD__."({$message})\n";
1182
1183            return rtrim($message);
1184    }
1185
1186    /**
1187     * Read an opaque blob from the socket
1188     *
1189     * @param int $in_buf_size The length of the blob to read.
1190     * @return mixed The message blob, or self::READ_ERROR on an unrecoverable error
1191     */
1192    private function safe_read_blob($in_buf_size)
1193    {
1194            $in_buf_size += strlen(self::MSG_DELIM);
1195            $message = '';
1196
1197            while ($in_buf_size > 0) {
1198                    $data = fread($this->socket, min($in_buf_size, 8192));
1199
1200                    if ($data === false || feof($this->socket)) {
1201                            return self::READ_ERROR;
1202                    }
1203
1204                    $message .= $data;
1205                    $in_buf_size -= strlen($data);
1206            }
1207
1208            if (BeanStalk::DEBUG)
1209                    echo __METHOD__."({$message})\n";
1210
1211            return substr($message, 0, -2);
1212    }
1213
1214    private function safe_send_message($in_message)
1215    {
1216        if (!$this->preparing)
1217            $this->recovery['lastcommand'] = $in_message;
1218           
1219        do
1220        {
1221            $sent = $this->send_data($in_message);
1222            if ($sent === false)
1223                $connected = $this->alive('slow',false);
1224            else
1225                break;
1226        } while (1);
1227       
1228        return $sent;
1229    }
1230   
1231    /**
1232    * Write to the server
1233    *
1234    * @param mixed $in_message message to send
1235    * @return boolean Success
1236    */
1237    private function send_data($in_message)
1238    {
1239        if (BeanStalk::DEBUG)
1240            echo __METHOD__."({$in_message})\n";
1241        $tosend = $in_message.self::MSG_DELIM;
1242        $tl = strlen($tosend);
1243
1244        while ($tl > 0) {
1245                $b = @fwrite($this->socket,$tosend,$tl);
1246
1247                if ($b === false) {
1248                        return false;
1249                }
1250
1251                $tl -= $b;
1252                $tosend = substr($tosend, $b);
1253        }
1254
1255        return true;
1256    }
1257   
1258    private function log($in_message)
1259    {
1260        if ($this->log === false) return;
1261        $f = fopen($this->log,'a');
1262        $in_message = "[".posix_getpid()."] {$in_message}\n";
1263        @fwrite($f,$in_message,strlen($in_message));
1264        fclose($f);
1265    }
1266   
1267    private function say($in_message)
1268    {
1269        echo date('H:i:s')." ".$in_message."\n";
1270    }
1271}
1272
1273/**
1274* Class: BeanQueueJob
1275*
1276* Returned by BeanQueue::reserve()
1277*
1278* @package BeanQueueJob
1279*/
1280class BeanQueueJob
1281{
1282   
1283    private $alive;             // Job is able to take commands
1284    private $jid;               // Job ID
1285    private $payload;           // Job data
1286    private $server;            // Reference to owning server
1287   
1288    /**
1289    * Private BeanQueueJob constructor
1290    *
1291    * @param BeanQueue $in_server reference to the server that owns this job
1292    * @param integer $in_job_id job id
1293    * @param mixed $in_job_payload job body
1294    * @return BeanQueueJob
1295    */
1296    private function __construct(&$in_server, $in_job_id, $in_job_payload)
1297    {
1298        $this->server = $in_server;
1299        $this->jid = $in_job_id;
1300        $this->payload = $in_job_payload;
1301        $this->alive = true;
1302    }
1303   
1304    /**
1305    * BeanQueueJob factory
1306    *
1307    * @param BeanQueue $in_server reference to the server that owns this job
1308    * @param integer $in_job_id job id
1309    * @param mixed $in_job_payload job body
1310    * @return BeanQueueJob
1311    */
1312    public static function open(&$in_server, $in_job_id, $in_job_payload)
1313    {
1314        if ($in_server->alive() !== true)
1315            return false;
1316           
1317        return new BeanQueueJob(&$in_server, $in_job_id, $in_job_payload);
1318    }
1319   
1320    /**
1321    * Check if job is still alive
1322    *
1323    * @return boolean status of this job
1324    */
1325    public function alive()
1326    {
1327        return $this->alive;
1328    }
1329   
1330    /**
1331    * Delete this job
1332    *
1333    * Called when a job has been executed. This calls back to the owning server
1334    * and deletes the job from memory
1335    *
1336    * @return integer operation status
1337    */
1338    public function delete()
1339    {
1340        if ($this->alive !== true)
1341            return true;
1342        $this->alive = false;
1343        return $this->server->delete($this->jid);
1344    }
1345   
1346    /**
1347    * Release this job
1348    *
1349    * Called when the client wishes to send the job back into the ready queue, incomplete.
1350    * Calls back to the owning server and releases the job there.
1351    *
1352    * @param integer $in_pri new priority level
1353    * @param integer $in_delay seconds until job will be ready
1354    * @return interger operation status
1355    */
1356    public function release($in_pri, $in_delay)
1357    {
1358        if ($this->alive !== true)
1359            return true;
1360        $this->alive = false;
1361        return $this->server->release($this->jid, $in_pri, $in_delay);
1362    }
1363   
1364    /**
1365    * Bury this job
1366    *
1367    * Sends the job into the bury queue. Can only be unearthed with the kick command from
1368    * that same server.
1369    *
1370    * @param integer $in_pri new priority level
1371    * @return integer operation status
1372    */
1373    public function bury($in_pri)
1374    {
1375        if ($this->alive !== true)
1376            return true;
1377        $this->alive = false;
1378        return $this->server->bury($this->jid, $in_pri);
1379    }
1380   
1381    /**
1382    * Retrieve this job's payload
1383    *
1384    * @return mixed job payload
1385    */
1386    public function get()
1387    {
1388        return $this->payload;
1389    }
1390   
1391    /**
1392    * Retrieve the job id
1393    *
1394    * @return integer job id
1395    */
1396    public function get_jid()
1397    {
1398        return $this->jid;
1399    }
1400   
1401    public static function check($in_check_job)
1402    {
1403        $c = __CLASS__;
1404        if ($in_check_job instanceof $c)
1405            return true;
1406        return false;
1407    }
1408   
1409}
1410
1411class BeanQueueNoServersSuppliedException extends Exception{}
1412class BeanQueueNoValidServersException extends Exception{}
1413class BeanQueueInvalidServerException extends Exception{}
1414class BeanQueueInvalidSelectorBadReserverException extends Exception{}
1415class BeanQueueInvalidSelectorBadServerPickerException extends Exception{}
1416class BeanQueueJobServerDiedException extends Exception{}
1417class BeanQueueCouldNotConnectException extends Exception{}
1418
1419?>
Note: See TracBrowser for help on using the repository browser.