multithreading.php
author Dan
Tue, 23 Sep 2008 23:26:18 -0400
changeset 50 1b4288399b1f
parent 48 d643bfb862d8
child 53 a6b339665650
permissions -rw-r--r--
Added graphical configuration, at this point only for the grey theme but others will follow soon. (This has been nearly done for two weeks or more but was on hold due to the bugs with multithreading)

<?php

/**
 * Multi-threading (well sort of) tools
 * 
 * Greyhound - real web management for Amarok
 * Copyright (C) 2008 Dan Fuhry
 *
 * This program is Free Software; you can redistribute and/or modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
 * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
 */

require_once('json.php');

/**
 * Global signal handler for SIGCHLD.
 */

function Threader_SigChld()
{
  global $threader_instances;
  foreach ( $threader_instances as &$mt )
  {
    if ( is_object($mt) )
    {
      $mt->event_sigchld();
    }
  }
}

/**
 * Global signal handler for SIGUSR2.
 */

function Threader_SigUsr2()
{
  global $threader_instances;
  foreach ( $threader_instances as &$mt )
  {
    if ( is_object($mt) )
    {
      $parchild = $mt->is_child() ? 'child' : 'parent';
      $mt->event_sigusr2();
    }
  }
}

/**
 * List of Threader instances. Needed for global handling of signals.
 * @var array
 */

global $threader_instances;
$threader_instances = array();

/**
 * Tools for emulating multi-threaded operation in PHP scripts.
 * @package Amarok
 * @subpackage WebControl
 * @author Dan Fuhry
 * @license GNU General Public License <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>
 */

class Threader
{
  
  /**
   * Return value of fork() if the process is a child.
   * @const int
   */
  
  const FORK_CHILD = -1;
  
  /**
   * Set to true if this is a child process. No exceptions.
   * @var bool
   * @access private
   */
  
  private $is_child = false;
  
  /**
   * Sockets for inter-process communication.
   * @var array
   * @access private
   */
  
  protected $ipc_sockets = array();
  
  /**
   * Socket for communication with the parent. Obviously only used after calling fork().
   * @var resource
   * @access private
   */
  
  protected $parent_sock = false;
  
  /**
   * Services_JSON instance.
   * @var object
   * @access private
   */
  
  protected $json = false;
  
  /**
   * PID of the parent process.
   * @var int
   * @access private
   */
  
  protected $parent_pid = 1;
  
  /**
   * List of actions for IPC events.
   * @var array
   * @access private
   */
  
  protected $ipc_actions = array();
  
  /**
   * Constructor. Sets up signal handlers. Nothing to see here, move along.
   */
  
  public function __construct()
  {
    global $threader_instances;
    
    declare(ticks=1);
    
    $threader_instances[] =& $this;
    
    pcntl_signal(SIGUSR2, 'Threader_SigUsr2');
    pcntl_signal(SIGCHLD, 'Threader_SigChld');
    
    $this->json = new Services_JSON(SERVICES_JSON_LOOSE_TYPE);
    $this->parent_pid = getmypid();
  }
  
  /**
   * Forks the current process. See your system's fork(2) man page for details.
   * @return int FORK_CHILD if child process, PID of child if parent process. Returns false on failure.
   */
  
  public function fork()
  {
    // create our new sockets for IPC
    $socket_pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
    // fork (emoticon of the day: --<E)
    $fork_result = pcntl_fork();
    if ( $fork_result == -1 )
    {
      // fork failed.
      return false;
    }
    else if ( $fork_result )
    {
      // we are the parent - register the child
      fclose($socket_pair[0]);
      $this->ipc_sockets[$fork_result] = $socket_pair[1];
      return $fork_result;
    }
    else
    {
      // we are the child.
      fclose($socket_pair[1]);
      $this->parent_sock = $socket_pair[0];
      $this->is_child = true;
      return FORK_CHILD;
    }
  }
  
  /**
   * Are we a child?
   * @return bool
   */
  
  public function is_child()
  {
    return $this->is_child;
  }
  
  /**
   * Register an action so that when it is fired over IPC, a custom function can be called.
   * @param string Action
   * @param callback Function to call
   * @return true on success, false on failure
   */
  
  function ipc_register($action, $callback)
  {
    if ( !is_string($action) || empty($action) || !is_callable($callback) )
    {
      return false;
    }
    $this->ipc_actions[$action] = $callback;
    return true;
  }
  
  /**
   * Send through an IPC event. If this is a child, it only notifies the parent; if we're the parent, all children are notified. 
   * @param array Data to be sent through. This must be an associative array containing an "action" key at minimum. If this a key "propagate" set to true, a parent that receives this will propagate the message to all children.
   * @return null
   */
  
  function ipc_send($data)
  {
    if ( !isset($data['action']) )
    {
      return false;
    }
    $data = $this->json->encode($data);
    if ( $this->is_child() )
    {
      fwrite($this->parent_sock, "$data\n");
      // signal the parent that we've got an event
      posix_kill($this->parent_pid, SIGUSR2);
    }
    else
    {
      // signal each child
      foreach ( $this->ipc_sockets as $pid => $socket )
      {
        fwrite($socket, "$data\n");
        posix_kill($pid, SIGUSR2);
      }
    }
    return null;
  }
  
  /**
   * Handler for SIGCHLD events.
   * @access private
   */
  
  function event_sigchld()
  {
    // this should never happen to children.
    if ( $this->is_child() )
    {
      return null;
    }
    
    // wait for child to exit.
    pcntl_wait($status);
    // for each child PID, kill with signal 0 (effectively, test if process is alive)
    // if posix_kill fails, it's dead so remove it from the list.
    foreach ( $this->ipc_sockets as $pid => $socket )
    {
      if ( !@posix_kill($pid, 0) )
      {
        // signal failed.
        fclose($socket);
        unset($this->ipc_sockets[$pid]);
      }
    }
  }
  
  /**
   * Handler for IPC events.
   * @access private
   */
  
  function event_sigusr2()
  {
    if ( $this->is_child() )
    {
      // this is easy - the parent sent the signal.
      $command = rtrim(fgets($this->parent_sock, 102400), "\n");
    }
    else
    {
      // since we can't find which PID sent the signal, set the timeout to a very small amount
      // of time and try to read; if we get something, awesome.
      foreach ( $this->ipc_sockets as $pid => $socket )
      {
        // 1000 microseconds = 1/80th of the time it takes you to blink.
        @stream_set_timeout($socket, 0, 1000);
        $command = rtrim(@fgets($socket, 102400), "\n");
        if ( !empty($command) )
        {
          break;
        }
      }
    }
    if ( empty($command) )
    {
      // hmm, got a sigusr2 without an incoming command. oh well, ignore.
      return null;
    }
    $command = $this->json->decode($command);
    if ( !isset($command['action']) )
    {
      // no action = no way to figure out how to handle this.
      return null;
    }
    if ( !isset($this->ipc_actions[$command['action']]) )
    {
      // action not registered
      return null;
    }
    // should we propagate this event?
    if ( !$this->is_child() && isset($command['propagate']) && $command['propagate'] === true )
    {
      $this->ipc_send($command);
    }
    // we're good
    @call_user_func($this->ipc_actions[$command['action']], $command, $this);
  }
  
  /**
   * Kills all child processes.
   * @access public
   */
  
  public function kill_all_children()
  {
    foreach ( $this->ipc_sockets as $pid => $socket )
    {
      $socklen = count($this->ipc_sockets);
      posix_kill($pid, SIGTERM);
      // wait until we are conscious of this child's death
      while ( count($this->ipc_sockets) >= $socklen )
      {
        usleep(20000);
      }
    }
  }
  
}

?>