multithreading.php
changeset 48 d643bfb862d8
child 53 a6b339665650
equal deleted inserted replaced
47:b7f1952cef8d 48:d643bfb862d8
       
     1 <?php
       
     2 
       
     3 /**
       
     4  * Multi-threading (well sort of) tools
       
     5  * 
       
     6  * Greyhound - real web management for Amarok
       
     7  * Copyright (C) 2008 Dan Fuhry
       
     8  *
       
     9  * This program is Free Software; you can redistribute and/or modify it under the terms of the GNU General Public License
       
    10  * as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.
       
    11  *
       
    12  * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
       
    13  * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
       
    14  */
       
    15 
       
    16 require_once('json.php');
       
    17 
       
    18 /**
       
    19  * Global signal handler for SIGCHLD.
       
    20  */
       
    21 
       
    22 function Threader_SigChld()
       
    23 {
       
    24   global $threader_instances;
       
    25   foreach ( $threader_instances as &$mt )
       
    26   {
       
    27     if ( is_object($mt) )
       
    28     {
       
    29       $mt->event_sigchld();
       
    30     }
       
    31   }
       
    32 }
       
    33 
       
    34 /**
       
    35  * Global signal handler for SIGUSR2.
       
    36  */
       
    37 
       
    38 function Threader_SigUsr2()
       
    39 {
       
    40   global $threader_instances;
       
    41   foreach ( $threader_instances as &$mt )
       
    42   {
       
    43     if ( is_object($mt) )
       
    44     {
       
    45       $parchild = $mt->is_child() ? 'child' : 'parent';
       
    46       $mt->event_sigusr2();
       
    47     }
       
    48   }
       
    49 }
       
    50 
       
    51 /**
       
    52  * List of Threader instances. Needed for global handling of signals.
       
    53  * @var array
       
    54  */
       
    55 
       
    56 global $threader_instances;
       
    57 $threader_instances = array();
       
    58 
       
    59 /**
       
    60  * Tools for emulating multi-threaded operation in PHP scripts.
       
    61  * @package Amarok
       
    62  * @subpackage WebControl
       
    63  * @author Dan Fuhry
       
    64  * @license GNU General Public License <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>
       
    65  */
       
    66 
       
    67 class Threader
       
    68 {
       
    69   
       
    70   /**
       
    71    * Return value of fork() if the process is a child.
       
    72    * @const int
       
    73    */
       
    74   
       
    75   const FORK_CHILD = -1;
       
    76   
       
    77   /**
       
    78    * Set to true if this is a child process. No exceptions.
       
    79    * @var bool
       
    80    * @access private
       
    81    */
       
    82   
       
    83   private $is_child = false;
       
    84   
       
    85   /**
       
    86    * Sockets for inter-process communication.
       
    87    * @var array
       
    88    * @access private
       
    89    */
       
    90   
       
    91   protected $ipc_sockets = array();
       
    92   
       
    93   /**
       
    94    * Socket for communication with the parent. Obviously only used after calling fork().
       
    95    * @var resource
       
    96    * @access private
       
    97    */
       
    98   
       
    99   protected $parent_sock = false;
       
   100   
       
   101   /**
       
   102    * Services_JSON instance.
       
   103    * @var object
       
   104    * @access private
       
   105    */
       
   106   
       
   107   protected $json = false;
       
   108   
       
   109   /**
       
   110    * PID of the parent process.
       
   111    * @var int
       
   112    * @access private
       
   113    */
       
   114   
       
   115   protected $parent_pid = 1;
       
   116   
       
   117   /**
       
   118    * List of actions for IPC events.
       
   119    * @var array
       
   120    * @access private
       
   121    */
       
   122   
       
   123   protected $ipc_actions = array();
       
   124   
       
   125   /**
       
   126    * Constructor. Sets up signal handlers. Nothing to see here, move along.
       
   127    */
       
   128   
       
   129   public function __construct()
       
   130   {
       
   131     global $threader_instances;
       
   132     
       
   133     declare(ticks=1);
       
   134     
       
   135     $threader_instances[] =& $this;
       
   136     
       
   137     pcntl_signal(SIGUSR2, 'Threader_SigUsr2');
       
   138     pcntl_signal(SIGCHLD, 'Threader_SigChld');
       
   139     
       
   140     $this->json = new Services_JSON(SERVICES_JSON_LOOSE_TYPE);
       
   141     $this->parent_pid = getmypid();
       
   142   }
       
   143   
       
   144   /**
       
   145    * Forks the current process. See your system's fork(2) man page for details.
       
   146    * @return int FORK_CHILD if child process, PID of child if parent process. Returns false on failure.
       
   147    */
       
   148   
       
   149   public function fork()
       
   150   {
       
   151     // create our new sockets for IPC
       
   152     $socket_pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
       
   153     // fork (emoticon of the day: --<E)
       
   154     $fork_result = pcntl_fork();
       
   155     if ( $fork_result == -1 )
       
   156     {
       
   157       // fork failed.
       
   158       return false;
       
   159     }
       
   160     else if ( $fork_result )
       
   161     {
       
   162       // we are the parent - register the child
       
   163       fclose($socket_pair[0]);
       
   164       $this->ipc_sockets[$fork_result] = $socket_pair[1];
       
   165       return $fork_result;
       
   166     }
       
   167     else
       
   168     {
       
   169       // we are the child.
       
   170       fclose($socket_pair[1]);
       
   171       $this->parent_sock = $socket_pair[0];
       
   172       $this->is_child = true;
       
   173       return FORK_CHILD;
       
   174     }
       
   175   }
       
   176   
       
   177   /**
       
   178    * Are we a child?
       
   179    * @return bool
       
   180    */
       
   181   
       
   182   public function is_child()
       
   183   {
       
   184     return $this->is_child;
       
   185   }
       
   186   
       
   187   /**
       
   188    * Register an action so that when it is fired over IPC, a custom function can be called.
       
   189    * @param string Action
       
   190    * @param callback Function to call
       
   191    * @return true on success, false on failure
       
   192    */
       
   193   
       
   194   function ipc_register($action, $callback)
       
   195   {
       
   196     if ( !is_string($action) || empty($action) || !is_callable($callback) )
       
   197     {
       
   198       return false;
       
   199     }
       
   200     $this->ipc_actions[$action] = $callback;
       
   201     return true;
       
   202   }
       
   203   
       
   204   /**
       
   205    * Send through an IPC event. If this is a child, it only notifies the parent; if we're the parent, all children are notified. 
       
   206    * @param array Data to be sent through. This must be an associative array containing an "action" key at minimum. If this a key "propagate" set to true, a parent that receives this will propagate the message to all children.
       
   207    * @return null
       
   208    */
       
   209   
       
   210   function ipc_send($data)
       
   211   {
       
   212     if ( !isset($data['action']) )
       
   213     {
       
   214       return false;
       
   215     }
       
   216     $data = $this->json->encode($data);
       
   217     if ( $this->is_child() )
       
   218     {
       
   219       fwrite($this->parent_sock, "$data\n");
       
   220       // signal the parent that we've got an event
       
   221       posix_kill($this->parent_pid, SIGUSR2);
       
   222     }
       
   223     else
       
   224     {
       
   225       // signal each child
       
   226       foreach ( $this->ipc_sockets as $pid => $socket )
       
   227       {
       
   228         fwrite($socket, "$data\n");
       
   229         posix_kill($pid, SIGUSR2);
       
   230       }
       
   231     }
       
   232     return null;
       
   233   }
       
   234   
       
   235   /**
       
   236    * Handler for SIGCHLD events.
       
   237    * @access private
       
   238    */
       
   239   
       
   240   function event_sigchld()
       
   241   {
       
   242     // this should never happen to children.
       
   243     if ( $this->is_child() )
       
   244     {
       
   245       return null;
       
   246     }
       
   247     
       
   248     // wait for child to exit.
       
   249     pcntl_wait($status);
       
   250     // for each child PID, kill with signal 0 (effectively, test if process is alive)
       
   251     // if posix_kill fails, it's dead so remove it from the list.
       
   252     foreach ( $this->ipc_sockets as $pid => $socket )
       
   253     {
       
   254       if ( !@posix_kill($pid, 0) )
       
   255       {
       
   256         // signal failed.
       
   257         fclose($socket);
       
   258         unset($this->ipc_sockets[$pid]);
       
   259       }
       
   260     }
       
   261   }
       
   262   
       
   263   /**
       
   264    * Handler for IPC events.
       
   265    * @access private
       
   266    */
       
   267   
       
   268   function event_sigusr2()
       
   269   {
       
   270     if ( $this->is_child() )
       
   271     {
       
   272       // this is easy - the parent sent the signal.
       
   273       $command = rtrim(fgets($this->parent_sock, 102400), "\n");
       
   274     }
       
   275     else
       
   276     {
       
   277       // since we can't find which PID sent the signal, set the timeout to a very small amount
       
   278       // of time and try to read; if we get something, awesome.
       
   279       foreach ( $this->ipc_sockets as $pid => $socket )
       
   280       {
       
   281         // 1000 microseconds = 1/80th of the time it takes you to blink.
       
   282         @stream_set_timeout($socket, 0, 1000);
       
   283         $command = rtrim(@fgets($socket, 102400), "\n");
       
   284         if ( !empty($command) )
       
   285         {
       
   286           break;
       
   287         }
       
   288       }
       
   289     }
       
   290     if ( empty($command) )
       
   291     {
       
   292       // hmm, got a sigusr2 without an incoming command. oh well, ignore.
       
   293       return null;
       
   294     }
       
   295     $command = $this->json->decode($command);
       
   296     if ( !isset($command['action']) )
       
   297     {
       
   298       // no action = no way to figure out how to handle this.
       
   299       return null;
       
   300     }
       
   301     if ( !isset($this->ipc_actions[$command['action']]) )
       
   302     {
       
   303       // action not registered
       
   304       return null;
       
   305     }
       
   306     // should we propagate this event?
       
   307     if ( !$this->is_child() && isset($command['propagate']) && $command['propagate'] === true )
       
   308     {
       
   309       $this->ipc_send($command);
       
   310     }
       
   311     // we're good
       
   312     @call_user_func($this->ipc_actions[$command['action']], $command, $this);
       
   313   }
       
   314   
       
   315   /**
       
   316    * Kills all child processes.
       
   317    * @access public
       
   318    */
       
   319   
       
   320   public function kill_all_children()
       
   321   {
       
   322     foreach ( $this->ipc_sockets as $pid => $socket )
       
   323     {
       
   324       $socklen = count($this->ipc_sockets);
       
   325       posix_kill($pid, SIGTERM);
       
   326       // wait until we are conscious of this child's death
       
   327       while ( count($this->ipc_sockets) >= $socklen )
       
   328       {
       
   329         usleep(20000);
       
   330       }
       
   331     }
       
   332   }
       
   333   
       
   334 }
       
   335 
       
   336 ?>